wechat-admin:Celery使用篇
/ / / 阅读数:5061Celery 是一个专注于实时处理和任务调度分布式任务队列。通过 RabbitMQ、Redis、MongoDB 等消息代理,把任务发给执行任务的 Worker 以达到异步执行。
我写的那本《Python Web 开发实战》的样章就是 《使用 Celery》 ,建议看下面内容之前先读一下这篇文章。
接下来的内容假设你已经对 Celery 有了一定的了解。对 wechat-admin 项目来说,使用 Celery 要做如下事情:
- 更新项目数据库中的联系人、群聊和公众号等相关内容
- 监听 wxpy 进程,处理自动加群、接受消息、踢人以及各种插件功能等
- 自动重启上述的监听进程
- 发送新消息数量提醒
首先我们创建一个目录(wechat),专门用来存放 celery 任务相关的内容,目录下文件列表如下:
❯ tree wechat wechat ├── __init__.py ├── celery.py # 名为celery.py是主程序,启动的时候可以直接`celery -A wechat worker -l info -B` ├── celeryconfig.py # 配置文件 └── tasks.py # 存放任务逻辑 0 directories, 4 files |
我们挨个看看
celeryconfig.py
看文件名字就知道了,这个是放配置的文件:
❯ cat celeryconfig.py from config import REDIS_URL BROKER_URL = REDIS_URL CELERY_RESULT_BACKEND = REDIS_URL CELERY_TASK_SERIALIZER = 'msgpack' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] |
指定消息代理和执行结果都使用 Redis,任务(消息)使用 msgpack 序列化,结果使用 json 序列化,任务结果保存时间 24 小时等
celery.py
主程序有点 Flask 的 app.py 的感觉:
❯ cat celery.py from celery import Celery from celery.signals import worker_ready from models.redis import db, LISTENER_TASK_KEY app = Celery('wechat', include=['wechat.tasks']) app.config_from_object('wechat.celeryconfig') @worker_ready.connect def at_start(sender, **k): with sender.app.connection() as conn: # noqa task_id = sender.app.send_task('wechat.tasks.listener') db.set(LISTENER_TASK_KEY, task_id) if __name__ == '__main__': app.start() |
这段代码有 2 点需要解释一下:
- 调用 send_task 会返回任务 id,存在 LISTENER_TASK_KEY 里面用于未来重启时直接通过这个任务 id
- 使用了 Celery 的信号系统,listener 这个异步任务需要在 worker 启动之后就运行,使用 worker_ready 这个信号就可以。
tasks.py
tasks.py 这个文件包含了很多业务逻辑,为了演示我省略部分代码。不过代码还是很长,所以我直接在对应行数的代码上加注释来解释了:
❯ cat tasks.py from datetime import timedelta from celery.task import periodic_task from celery.task.control import revoke from wechat.celery import app from wxpy.exceptions import ResponseError from itchat.signals import logged_out def restart_listener(sender, **kw): # 重启tasks.listener这个任务 task_id = r.get(LISTENER_TASK_KEY) if task_id: revoke(str(task_id, 'utf-8')) task_id = app.send_task('wechat.tasks.listener') r.set(LISTENER_TASK_KEY, task_id) logged_out.connect(restart_listener) from wxpy.signals import stopped from libs.wx import get_bot from views.api import json_api from models.redis import db as r, LISTENER_TASK_KEY from app import app as sse_api stopped.connect(restart_listener) bot = get_bot() def _retrieve_data(update=False): _update_contact(bot, update) _update_group(bot, update) _update_mp(bot, update) @app.task def listener(): # 不用全局的bot,因为在import listener的过程中会 # 注册各种函数(处理自动加群、接受消息、踢人以及各种插件功能) from libs.listener import bot with json_api.app_context(): bot.join() @app.task def retrieve_data(): # 使用Flask应用中的方法都需要放在对应的上下文内 with json_api.app_context(): _retrieve_data(True) @app.task def update_contact(update=False): # 都是业务逻辑,就省略了,这样分开写是可以单独的更新一种类型的数据 ... @app.task def update_group(update=False): ... @app.task def update_mp(update=False): ... # periodic_task就是定时任务,表示周期性的执行某任务 @periodic_task(run_every=timedelta(seconds=60), time_limit=5) def send_notify(): # 发送新消息数量提醒 ... |
上一篇我说 SSE 的时候忘说了一点,就是更新消息提醒。在 Web 页面标记已读的时候,会 POST 到 /readall 接口,后端清空新通知数量。这是由于 SSE 的单向特点造成的,如果使用 socketio (WebSocket) 的话可以直接 emit 到后端,就不用 HTTP 这种方案了