学习东西一定要给自己找个相对复杂到能比较深入了解要学习的内容的项目。「知乎 Live 全文搜索」是我用来学习 elasticsearch、elasticsearch_dsl、asyncio、sanic 和微信小程序项目。我想做时知乎还不提供 Live 的搜索,现在虽然提供了一个入口,但是感觉搜索的效果我觉得并不好。

前面在公众号我已经发了很多相关的 asyncio、elasticsearch_dsl 和 sanic 的内容,前面那些全部是给从今天开始的内容做铺垫。下面是这样文章的快速链接,如果你没看过或者忘记了可以回顾一下:

  1. 使用 Python 进行并发编程 - asyncio 篇 (一)
  2. 使用 Python 进行并发编程 - asyncio 篇 (二)
  3. 使用 Python 进行并发编程 - asyncio 篇 (三)
  4. 使用 API 登录知乎并获得 token
  5. 知乎 Live 全文搜索之模型设计和爬虫实现

现在列一下需求清单,并挨个完成。这里插一句,需求是否清晰明确是任务按期完成的最重要的条件之一,所以细化需求能力是每一个工程师都要熟悉和掌握的,这要求工程师对业务和所需要的技术很熟悉。

现在我先大致描绘要做的 4 件事情:

  • 继续完善爬虫。主要是以下 2 点:

    1. 之前分享过的爬虫的协程带来并发速度的提升优势还没有完全发挥出来。
    2. 之前的爬虫是纯信息抓取,没有考虑数据的可视化(比如没有考虑 Live 背景图,用户头像,Live 所属话题信息等),需要增加字段并重新抓取。
  • 让 elasticsearch_dsl 支持 asyncio,虽然官方提供了 elasticsearch-py-async 这个包,但是没有对 elasticsearch_dsl 的异步封装,而且由于我们将要使用 sanic 这个使用了第三方 uvloop 的 Web 框架,不能通过 asyncio.get_event_loop 的方式获取到 loop 而给 elasticsearch_dsl 用,需要处理。

  • 完成微信小程序需要的数据 model 接口。
  • 实现一个 RESTful 的 sanic API 功能,让 elasticsearch 和 sqlalchemy 的返回内容能非常方便的通用处理。

当然也并不是能预先清晰制定好 100% 需求,因为会有一些外部不可控的因素或者不小心踩了个深坑儿。比如一开始我只是抓知乎的图片链接,但是后来发现在微信小程序开发工具中这些图片资源是不能访问的(可能是没有 Referfer),所以后来又改成把图片都下载到本地。另外一个是当时存储用户用了 SQLite,现在要存话题数据,发现 SQLite 不支持并发,这对 asyncio 来说是不合理的选择,所以后来换成了 MySQL。看,这就是选择 sqlallchemy 的好处,我完全不用改 model,只是把 DB_URI 改一下就好了。

本文是上面的第二项,也就是让 elasticsearch_dsl 支持 asyncio。我并且尽量拆分任务成可以团队多成员可以同时协作完成,也就是不用等着 A 做完 B 才能开始去做。我继续细节需求,并挨个完成

让 sanic 暴露 loop 给 elasticsearch_dsl 用

通过看 sanic 和 elasticsearch_dsl 的源码,我用 sanic 提供的 before_start 事件就可以完成:

from sanic import Sanic
from elasticsearch_dsl.connections import connections

app = Sanic(__name__)

def set_loop(sanic, loop):
    conns = connections._conns
    for c in conns:
        conns[c].transport.loop = loop


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8300,
            before_start=[set_loop], workers=4, debug=True)

elasticsearch 创建链接后会保存在 connections._conns,在 Sanic 启动后把这些链接中用的 loop 替换成 sanic 创建那个就可以了。

让 elasticsearch-py-async 支持 Sanic

我 fork 了 elasticsearch-py-async ,虽然它已经是支持 asyncio 的了,但是还不支持外部 loop,我的修改具体的可以看 这里 。说白了也是为了成替换 sanic 创建的那个 loop。

elasticsearch-py-async 加载的时候 loop 已经创建了,所以需要 hack 一下,让 main_loop 使用我设置的那个:

from elasticsearch import Transport


class AsyncTransport(Transport):
    ...
    @asyncio.coroutine
    def main_loop(self, method, url, params, body, ignore=(), timeout=None):
        for attempt in range(self.max_retries + 1):
            connection = self.get_connection()
            connection.loop = self.loop

            try:
                status, headers, data = yield from connection.perform_request(
                        method, url, params, body, ignore=ignore, timeout=timeout)
            except TransportError as e:
                if method == 'HEAD' and e.status_code == 404:
                    return False
            ...

@asyncio.coroutineyield from是老式用法了,我新写的代码都统一都改成 Python 3.5 增加的 async/await 关键字了。

其中我加了一句connection.loop = self.loop,这样之前做的conns[c].transport.loop = loop就有意义的。另外一个问题是 aiohttp.session 的用法,会造成在类初始化的时候就生成了 self.session 被未来使用,现在需要换成我们的 loop,就需要让 self.session 在用时才生成,而且类初始化时候就创建 session 是一种不好的用法,我在多个地方见过,包含之前提到的 500lines 项目中的爬虫也是这样用的,本来是这样用的:

class Crawler:
    def __init__(self, roots,
                 exclude=None, strict=True,  # What to crawl.
                 max_redirect=10, max_tries=4,  # Per-url limits.
                 max_tasks=10, *, loop=None):
        ...
        self.session = aiohttp.ClientSession(loop=self.loop)
        ...

现在 aiohttp 会抛Creating a client session outside of coroutine的警告,也就是还没有开始协程就创建了。好的用法应该是这样的:

class Crawler:
    def __init__(self, roots,
                 exclude=None, strict=True,  # What to crawl.
                 max_redirect=10, max_tries=4,  # Per-url limits.
                 max_tasks=10, *, loop=None):
        ...
        self._session = None

    @property
    def session(self):
        if self._session is None:
            self.session = aiohttp.ClientSession(loop=self.loop)
            self._session = session
        return self._session

这种编程方式使用非常广泛,体会一下。

让 elasticsearch-dsl-py 支持 Sanic

前面说到 elasticsearch-dsl-py 底层用的是 elasticsearch-py,并不支持异步化。所以我还是 fork 了 一份 ,目前除了为支持 Sanic,还修了一个我认为的 BUG, 已经提了 PR 还没有被处理,有兴趣的同学可以移步 Fix IllegalOperation when use custom analyzer

我发现有的同学很惧怕 Python 3 内置的协程方案 asyncio。我想原因主要是 2 点:

  1. 对 async/await 这种异步编程方式不习惯
  2. asyncio 的生态还不够丰富,非常有可能你要自己去封装还不支持的客户端

人都是习惯在自己的舒适区,但其实可能没那么难习惯。这个就像成天用 python 写代码,突然要写个 shell 脚本或者就像没做过组件式开发第一次用 React 一样的不适应。这个时候需要你忍着不习惯,坚持的做 1-2 个项目,等你了解了,熟悉了,就会发现新的编程方式其实也就是那么回事,但是不去尝试和深入永远也学不会。

大家没事可以翻翻 aio-libs 这个汇集了多个常用工具的 asyncio 的封装库,其实不复杂,把一个同步的程序改成 asyncio 的,我总结就是一句话:

给内部有异步操作的函数添加 async 关键字,在有网络请求和 I/O 操作并且希望协程化的地方添加 await 关键字

对于 elasticsearch-dsl-py 的修改,差不多就是一种模式。举个 get 方法的例子。原来是这样写的:

@add_metaclass(DocTypeMeta)
class DocType(ObjectBase):
    ...
    @classmethod
    def get(cls, id, using=None, index=None, **kwargs):
        es = connections.get_connection(using or cls._doc_type.using)
        doc = es.get(
            index=index or cls._doc_type.index,
            doc_type=cls._doc_type.name,
            id=id,
            **kwargs
        )
        if not doc['found']:
            return None
        return cls.from_es(doc)

现在改成这样:

@add_metaclass(DocTypeMeta)
class DocType(ObjectBase):
    ...
    @classmethod
    async def get(cls, id, using=None, index=None, **kwargs):
        es = connections.get_connection(using or cls._doc_type.using)
        doc = await es.get(
            index=index or cls._doc_type.index,
            doc_type=cls._doc_type.name,
            id=id,
            **kwargs
        )
        if not doc['found']:
            return None
        return cls.from_es(doc)

所以不是每一步都需要加 await,只有 es.get 是有网络请求的,其他的地方加了也没有意义。

另外的一个地方是把初始化 Elasticsearch 对象的地方改成 elasticsearch-py-async 的封装后的类,原来是:

from elasticsearch import Elasticsearch

class Connections(object):
    def create_connection(self, alias='default', **kwargs):
        kwargs.setdefault('serializer', serializer)
        conn = self._conns[alias] = Elasticsearch(**kwargs)
        return conn

改成:

from elasticsearch_async import AsyncElasticsearch

class Connections(object):
    def create_connection(self, alias='default', **kwargs):
        kwargs.setdefault('serializer', serializer)
        conn = self._conns[alias] = AsyncElasticsearch(**kwargs)
        return conn

明天我将分享完善后的爬虫,以及第三项「完成微信小程序需要的数据 model 接口」。