看这篇文章前推荐阅读相关的如下文章:

  1. 知乎 Live 全文搜索之模型设计和爬虫实现
  2. 知乎 Live 全文搜索之使用 Elasticsearch 做聚合分析
  3. 知乎 Live 全文搜索之使用 Elasticsearch 做搜索建议
  4. 知乎 Live 全文搜索之让 elasticsearch_dsl 支持 asyncio

知乎 Live 全文搜索之让 elasticsearch_dsl 支持 asyncio 一文中,我把后端工作分成了 4 步,今天是完成爬虫和模型接口这 2 步,接口返回的数据会被微信小程序使用。

详细的列一下接口需求:

  1. 搜索。搜索符合输入的关键字的 Live 和用户,按照之前提到的各种策略排序,也支持通过 status 状态过滤「已结束」和「未结束」2 种类型的 Live。支持分页。
  2. 搜索建议。提供符合输入的关键字的 Live 的建议。
  3. 发现。把全部的 Live 按照之前提到的各种策略排序,可以通过各种字段排序,可以选择 Live 开始的时间范围(默认是全部)。
  4. 获取热门话题。
  5. 获取某话题详细信息及话题下的 Live,支持分页、排序、时间范围。
  6. 获取全部用户,并且可以按照举办的 Live 数量、更新 Live 时间等条件排序。
  7. 获取单个用户信息。
  8. 根据各种策略排序,获取 7 天热门 Live,非知乎排序。
  9. 根据各种策略排序,获取 30 天热门 Live,非知乎排序。

添加 Topic 模型

由于 4 和 5 的需求,我添加了 Topic 这个模型,上篇文章说过 SQLite 不支持并发,所以替换成了 MySQL,要把 config 里面的 DB_URI 改成如下格式:

DB_URI = 'mysql+pymysql://localhost/test?charset=utf8mb4'

其中 test 是库的名字,charset 要用 utf8mb4,因为有些用户信息甚至 Live 的标题里面包含 emoji。MySQL 的客户端用的是 PyMySQL,需要在 schema 上指出来。

Topic 类和之前的 User 格式差不多,只是不同的字段,限于篇幅就不列出来了。

实现用户相关方法

为了实现可以按照举办的 Live 数量、更新 Live 时间排序,我添加了 2 个字段,也改了字符集:

from config import SUGGEST_USER_LIMIT, PEOPLE_URL, LIVE_USER_URL


class User(Base):
    __tablename__ = 'users'
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'mysql_charset': 'utf8mb4'
    }

    ...
    live_count = Column(Integer, default=0)
    updated_time = Column(DateTime, default=datetime.now)

接着添加一些必要的方法:

class User(Base):
    ...

    def incr_live_count(self):
        self.live_count += 1
        session.commit()

    @property
    def url(self):
        return PEOPLE_URL.format(self.speaker_id)

    @property
    def lives_url(self):
        return LIVE_USER_URL.format(self.speaker_id)

    def to_dict(self):
        d = {c.name: getattr(self, c.name, None)
             for c in self.__table__.columns}
        d.update({
            'type': 'user',
            'url': self.url,
            'lives_url': self.lives_url
        })
        return d

我习惯给 model 添加一个 to_dict 方法,把需要的字段和值拼成一个 dict 返回。当然有些 API 实际不需要这么多的字段,在下一篇中我会介绍怎么处理 schema 的问题。

最后是 3 个接口方法:

class User(Base):
    @classmethod
    def add(cls, **kwargs):
        speaker_id = kwargs.get('speaker_id', None)
        r = None
        if id is not None:
            q = session.query(cls).filter_by(speaker_id=speaker_id)
            r = q.first()
            if r:
                q.update(kwargs)

        if r is None:
            r = cls(**kwargs)
            session.add(r)
        try:
            session.commit()
        except:
            session.rollback()
        else:
            return r

    @classmethod
    def suggest(cls, q, start=0, limit=SUGGEST_USER_LIMIT):
        query = session.query(User)
        users = query.filter(User.name.like('%{}%'.format(q))).offset(
            start).limit(limit).all()
        return [user.to_dict() for user in users]

    @classmethod
    def get_all(cls, order_by='id', start=0, limit=10, desc=False):
        '''
        :param order_by:  One of ``'id'``, ``'live_count'`` or
                          ``'updated_time'``
        '''
        query = session.query(User)
        order_by = getattr(User, order_by)
        if desc:
            order_by = _desc(order_by)
        users = query.order_by(order_by).offset(start).limit(limit).all()
        return [user.to_dict() for user in users]

需要注意 add 方法,其实叫做 add_or_update 更合适,需要使用 session 一定要 commit 才能提交数据。

sqlalchemy 没有自带的 suggest 功能,只能用 Like 来实现。get_all 方法就是上面第 6 个需求接口。

完成 Live 模型字段

首先道歉,之前我理解的自定义 analyzer 的用法是错误的,下面的才是正确的姿势:

from elasticsearch_dsl.analysis import CustomAnalyzer

ik_analyzer = CustomAnalyzer(
    'ik_analyzer', tokenizer='ik_max_word',
    filter=['lowercase']
)

tokenizer 字段是必选的,这里使用 ik 分词插件提供的 ik_max_word。我还给 Live 添加了 2 个字段:

class Live(DocType):
    cover = Text(index='not_analyzed')  # 对应专栏头图(如果有)
    zhuanlan_url = Text(index='not_analyzed') # 对应专栏地址

加上参数index='not_analyzed'是因为这 2 个字段不用于搜索和聚合,没必要分词,就当成数据库使用了。

也给 Live 添加一些属性和方法,方便最后用 to_dict () 生成需要的全部数据:

from .speaker import User, session


class Live(DocType):
    @property
    def id(self):
        return self._id

    @property
    def speaker(self):
        return session.query(User).get(self.speaker_id)

    @property
    def url(self):
        return LIVE_URL.format(self.id)

    class Meta:
        index = 'live130'

    def to_dict(self, include_extended=True):
        d = super().to_dict()
        if include_extended:
            d.update({
                'id': self._id,
                'type': 'live',
                'speaker': self.speaker.to_dict(),
                'url': self.url
            })
        return d

其中 speaker 属性是常见的关联多个 model 的快捷方式,但是需要注意,竟然不要设计成 A 的 model 里面某个方法返回了 B 的 model 数据,B 的 model 里面也返回了 A 的 model 的数据而造成只能进行方法内 import。

super().to_dict()的原因是 DocType 内置了 to_dict 方法,随便提一下,而且接收 include_meta 参数,为 True 会包含 index 和 doc_type 的元数据。

把 Live 设计成异步的

这个是今天的重点,昨天说的「让 elasticsearch_dsl 支持 asyncio」就是给今天做准备。换汤不换药,说白了就是在合适的地方添加 async/await 关键字,先看个 add 的:

class Live(DocType):
    ...
    @classmethod
    async def add(cls, **kwargs):
        id = kwargs.pop('id', None)
        if id is None:
            return False
        live = cls(meta={'id': int(id)}, **kwargs)
        await live.save()
        return live

现在我们挨个实现需求,首先是搜索接口,由于 DocType 包含了 search 方法,得换个名字了:

class Live(DocType):
    ...
    async def _execute(cls, s, order_by=None):
        # 可以选择字段的排序,前面加-表示desc,不加就是默认的asc 
        if order_by is not None:
            s = s.sort(order_by)
        lives = await s.execute()  # 执行,要在这步之前拼好查询条件
        return [live.to_dict() for live in lives]

    @classmethod
    def apply_weight(cls, s, start, limit):
        return s.query(Q('function_score', functions=[gauss_sf, log_sf])).extra(
            **{'from': start, 'size': limit})

    @classmethod
    async def ik_search(cls, query, status=None, start=0, limit=10):
        s = cls.search()
        # 多字段匹配要搜索的内容,SEARCH_FIELDS中不同字段权重不同
        s = s.query('multi_match', query=query,
                    fields=SEARCH_FIELDS)
        if status is not None:  # 根据结束状态过滤
            s = s.query('match', status=status)
        # 搜索是带权重的,按照之前的设计做了时间衰减和归一化
        s = cls.apply_weight(s, start, limit)
        return await cls._execute(s)

就是根据需求,按照 DSL 的方式来拼。我添加了些注释,看不懂的话可以按照文章开始的链接去找找答案。

然后是发现接口,7/30 天热门都是基于这个接口,只不过划定了时间:

class Live(DocType):
    ...

    @classmethod
    async def explore(cls, from_date=None, to_date=None, order_by=None,
                      start=0, limit=10, topic=None):
        s = cls.search()
        if topic is not None:
            s = s.query(Q('term', topic_names=topic))
        starts_at = {}
        if from_date is not None:
            starts_at['from'] = from_date
        if to_date is not None:
            starts_at['to'] = to_date
        if starts_at:
            s = s.query(Q('range', starts_at=starts_at))
        if order_by is None:
            s = cls.apply_weight(s, start, limit)
        return await cls._execute(s, order_by)

    @classmethod
    async def get_hot_weekly(cls):
        today = date.today()
        return await cls.explore(from_date=today - timedelta(days=7),
                                  to_date=today, limit=20)

    @classmethod
    async def get_hot_monthly(cls):
        today = date.today()
        return await cls.explore(from_date=today - timedelta(days=30),
                                 to_date=today, limit=50)

注意,explore 方法如果指定了排序方案,就不会添加时间衰减和归一化的处理了。

然后是获取用户举报的全部 Live 的方法:

class Live(DocType):
    ...
    @classmethod
    async def ik_search_by_speaker_id(cls, speaker_id, order_by='-starts_at'):
        s = cls.search()
        s = s.query(Q('bool', should=Q('match', speaker_id=speaker_id)))
        return await cls._execute(s, order_by)

可以看到_execute 方法抽象后被重复利用了。

再然后是 suggest 接口:

class Live(DocType):
    ...
    @classmethod
    async def ik_suggest(cls, query, size=10):
        s = cls.search()
        s = s.suggest('live_suggestion', query, completion={
            'field': 'live_suggest', 'fuzzy': {'fuzziness': 2}, 'size': size
        })
        suggestions = await s.execute_suggest()
        matches = suggestions.live_suggestion[0].options
        ids = [match._id for match in matches]
        lives = await Live.mget(ids)
        return [live.to_dict() for live in lives]

其中支持 2 个编辑距离的模糊搜索。这个实现的比较简单,没有考虑拼音,也没有考虑搜索用户。值得一提的是 DocType 提供了 mget 这个获取多个 id 的接口,请善用减少网络请求,也就是给 ES 后端减压。

第 4 个获得热门话题的需求是本项目唯一用到聚合功能的地方了:

from .topic import Topic


class Live(DocType):
    @classmethod
    async def get_hot_topics(cls, size=50):
        s = cls.search()
        s.aggs.bucket('topics', A('terms', field='topics', size=size))
        rs = await s.execute()
        buckets = rs.aggregations.topics.buckets
        topic_names = [r['key'] for r in buckets]
        topics = session.query(Topic).filter(Topic.name.in_(topic_names)).all()
        topics = sorted(topics, key=lambda t: topic_names.index(t.name))
        return [topic.to_dict() for topic in topics]

每个 Live 都会打话题标签,越多的 live 打这个话题就说明它越热门。

最后要说的是 init () 方法:

async def init():
    await Live.init()

原来 import 模块的时候直接就 init 了,现在由于异步化了,直接 init 没人所以要在 loop 中用,比如在爬虫中:

from models.live import init as live_init

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    crawler = Crawler()
    loop.run_until_complete(live_init())
    loop.run_until_complete(crawler.crawl())
    print('Finished in {:.3f} secs'.format(crawler.t1 - crawler.t0))
    crawler.close()

    loop.close()
    es.transport.close()

理解了嘛?

好了全部接口都完成了,但是大家有木有感觉,异步编程调试起来很麻烦,我来教一个好用的方法.

调试 async 程序

asyncio 要求把需要协程化的函数都放进一个 loop,通过 run_until_complete 方法让它执行完成。

但是现在非常不好玩:

In : from models import Live
In : live = Live.get(789840559912009728)

In : live
Out: <coroutine object DocType.get at 0x10a0d1fc0>

In : live.subject
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-8c237874146c> in <module>()
----> 1 live.subject

AttributeError: 'coroutine' object has no attribute 'subject'

异步化的函数 (方法) 用起来很不直观。一开始可以写个脚本把要调试的东西放进去用 (test_es.py):

import asyncio
from elasticsearch_dsl.connections import  connections

from models.live import Live, SEARCH_FIELDS, init as live_init


s = Live.search()
es = connections.get_connection(Live._doc_type.using)


async def print_info():
    rs = await s.query('multi_match', query='python',
                       fields=SEARCH_FIELDS).execute()
    print(rs)

loop = asyncio.get_event_loop()
loop.run_until_complete(live_init())
loop.run_until_complete(print_info())
loop.close()
es.transport.close()

这样也是可以调试的,很麻烦,对吧?

抽象一下,其实写个函数就好了:

import asyncio


def execute(coro):
    loop = asyncio.get_event_loop()
    rs = loop.run_until_complete(coro)
    return rs

OK, 再用:

In : from models import Live, execute
In : live = Live.get(789840559912009728)
In : live = execute(live)
In : live.subject
Out: 'Python 工程师的入门和进阶'

这样就方便多了。