wechat-admin:SSE
/ / / 阅读数:4503在上一篇 项目设计 中,我说到了 SSE(Server-Sent Events)是为了实现单方向的消息推送,今天介绍下实际的使用。
我直接用了现成的 Flask-SSE ,其实 SSE 实现的原理比较简单:
- 借用 Redis 的发布 / 订阅模式创建一个方法,方法内会调用 pubsub.listen 监听新的发布数据。
- 使用 Flask 提供的 stream_with_context,不断的从上面的方法中获取数据。
使用起来分 2 部分
前端
在前端页面添加一个函数:
function eventSourceListener() { let source = new EventSource(`${API_URL}/stream`); let self = this; source.addEventListener('login', function(event) { let data = JSON.parse(event.data); if (data.type == 'scan_qr_code') { self.uuid = data.uuid; self.qrCode = `data:image/png;base64,${data.extra}`; } else if (data.type == 'confirm_login') { self.sub_title = 'Scan successful'; self.sub_desc = 'Confirm login on mobile WeChat'; self.qrCode = data.extra; } else if (data.type == 'logged_in') { sessionStorage.setItem('user', JSON.stringify(data.user)); self.$router.push({ path: '/main' }); } else if (data.type == 'logged_out') { sessionStorage.removeItem('user'); self.$router.push('/login'); } }, false); source.addEventListener('notification', function(event) { let data = JSON.parse(event.data); self.notificationCount = data.count; }, false); source.addEventListener('error', function(event) { console.log("Failed to connect to event stream"); }, false); } |
这段代码放在一个自定义的 Vue 的插件里面,这样在所有页面上都要自动包含这部分代码了。source.addEventListener 用来添加事件监听,它监听了 3 种类型的消息:
- login 登录,也就是在页面反映当前微信的登录状态(等待扫码 / 扫码完成等待确认 / 确认完成)。不同的消息会执行不同的操作,页面也会立刻渲染出最新的结果。
- notification 消息提醒,会有一个异步任务定期检查新入库的消息,有新的消息就是发布出来通知新消息数。
- error 内置的错误消息,当然这个加不加倒还好
另外在登陆后执行sessionStorage.setItem('user', JSON.stringify(data.user));
会设置浏览器的 session,下次自动登录后右侧就显示头像了,这样能减少后端的负担,退出时 removeItem 方法再删掉。
后端
后端包含 2 部分,第一部分是用 Flask 实现上面说的${API_URL}/stream
这个接口,这是一个长连接,消息就是从这里推送出去的。由于第一部分是阻塞的,我们需要异步的方式往这个阻塞进程里面推送数据,也就是开头说的,利用 Redis 的发布 / 订阅模式发布消息。比如通知调用起来是 这样的 :
from app import app as sse_api with sse_api.app_context(): sse.publish({'count': count}, type='notification') |
登陆过程要复杂一些,我之前说过在我 fork 的 ItChat 和 wxpy 分支里面添加了信号的支持,这个信号是需要「注册」的,也就是在 import 之前就要注册,效果要类似 这样 :
from itchat.signals import scan_qr_code, confirm_login, logged_out def publish(uuid, **kw): from app import app with app.app_context(): params = {'uuid': uuid, 'extra': kw.pop('extra', None), 'type': kw.pop('type', None)} params.update(kw) sse.publish(params, type='login') scan_qr_code.connect(publish) confirm_login.connect(publish) logged_out.connect(publish) from wxpy import * # noqa |
这里用了信号的 connect 方法。举个 logged_out 的例子,在 ItChat 里面,首先 定义这个信号 :
from blinker import Namespace _signals = Namespace() logged_out = _signals.signal('logged-out') |
需要在对应发信号的地方调用 send 方法 :
logged_out.send(self.uuid, type='logged_out') |
另外有个坑儿,首次打开 Web 页面的是一个铺满 div 的 gif 图片,一开始设想的是在下载二维码图片之后,通过修改 img 的 src 属性指到这个图片,实际开发中发现,这个二维码图片被会更新不及时,会使用缓存的就图片所以发送信号的时候不使用图片 HTTP 地址,而是 Data URLs,这就需要把图片内容编码一下:
encoded = base64.b64encode(qrStorage.getvalue()).decode('ascii') scan_qr_code.send(self.uuid, extra=encoded, type='scan_qr_code') |
结语
这样借助 Redis 和 Celery 就实现了 SSE 的使用,下一节我将介绍 Celery 的使用。