知乎Live全文搜索之模型接口
/ / / 阅读数:4725看这篇文章前推荐阅读相关的如下文章:
- 知乎 Live 全文搜索之模型设计和爬虫实现
- 知乎 Live 全文搜索之使用 Elasticsearch 做聚合分析
- 知乎 Live 全文搜索之使用 Elasticsearch 做搜索建议
- 知乎 Live 全文搜索之让 elasticsearch_dsl 支持 asyncio
在 知乎 Live 全文搜索之让 elasticsearch_dsl 支持 asyncio 一文中,我把后端工作分成了 4 步,今天是完成爬虫和模型接口这 2 步,接口返回的数据会被微信小程序使用。
详细的列一下接口需求:
- 搜索。搜索符合输入的关键字的 Live 和用户,按照之前提到的各种策略排序,也支持通过 status 状态过滤「已结束」和「未结束」2 种类型的 Live。支持分页。
- 搜索建议。提供符合输入的关键字的 Live 的建议。
- 发现。把全部的 Live 按照之前提到的各种策略排序,可以通过各种字段排序,可以选择 Live 开始的时间范围(默认是全部)。
- 获取热门话题。
- 获取某话题详细信息及话题下的 Live,支持分页、排序、时间范围。
- 获取全部用户,并且可以按照举办的 Live 数量、更新 Live 时间等条件排序。
- 获取单个用户信息。
- 根据各种策略排序,获取 7 天热门 Live,非知乎排序。
- 根据各种策略排序,获取 30 天热门 Live,非知乎排序。
添加 Topic 模型
由于 4 和 5 的需求,我添加了 Topic 这个模型,上篇文章说过 SQLite 不支持并发,所以替换成了 MySQL,要把 config 里面的 DB_URI 改成如下格式:
DB_URI = 'mysql+pymysql://localhost/test?charset=utf8mb4' |
其中 test 是库的名字,charset 要用 utf8mb4,因为有些用户信息甚至 Live 的标题里面包含 emoji。MySQL 的客户端用的是 PyMySQL,需要在 schema 上指出来。
Topic 类和之前的 User 格式差不多,只是不同的字段,限于篇幅就不列出来了。
实现用户相关方法
为了实现可以按照举办的 Live 数量、更新 Live 时间排序,我添加了 2 个字段,也改了字符集:
from config import SUGGEST_USER_LIMIT, PEOPLE_URL, LIVE_USER_URL class User(Base): __tablename__ = 'users' __table_args__ = { 'mysql_engine': 'InnoDB', 'mysql_charset': 'utf8mb4' } ... live_count = Column(Integer, default=0) updated_time = Column(DateTime, default=datetime.now) |
接着添加一些必要的方法:
class User(Base):
...
def incr_live_count(self):
self.live_count += 1
session.commit()
@property
def url(self):
return PEOPLE_URL.format(self.speaker_id)
@property
def lives_url(self):
return LIVE_USER_URL.format(self.speaker_id)
def to_dict(self):
d = {c.name: getattr(self, c.name, None)
for c in self.__table__.columns}
d.update({
'type': 'user',
'url': self.url,
'lives_url': self.lives_url
})
return d
我习惯给 model 添加一个 to_dict 方法,把需要的字段和值拼成一个 dict 返回。当然有些 API 实际不需要这么多的字段,在下一篇中我会介绍怎么处理 schema 的问题。
最后是 3 个接口方法:
class User(Base): @classmethod def add(cls, **kwargs): speaker_id = kwargs.get('speaker_id', None) r = None if id is not None: q = session.query(cls).filter_by(speaker_id=speaker_id) r = q.first() if r: q.update(kwargs) if r is None: r = cls(**kwargs) session.add(r) try: session.commit() except: session.rollback() else: return r @classmethod def suggest(cls, q, start=0, limit=SUGGEST_USER_LIMIT): query = session.query(User) users = query.filter(User.name.like('%{}%'.format(q))).offset( start).limit(limit).all() return [user.to_dict() for user in users] @classmethod def get_all(cls, order_by='id', start=0, limit=10, desc=False): ''' :param order_by: One of ``'id'``, ``'live_count'`` or ``'updated_time'`` ''' query = session.query(User) order_by = getattr(User, order_by) if desc: order_by = _desc(order_by) users = query.order_by(order_by).offset(start).limit(limit).all() return [user.to_dict() for user in users] |
需要注意 add 方法,其实叫做 add_or_update 更合适,需要使用 session 一定要 commit 才能提交数据。
sqlalchemy 没有自带的 suggest 功能,只能用 Like 来实现。get_all 方法就是上面第 6 个需求接口。
完成 Live 模型字段
首先道歉,之前我理解的自定义 analyzer 的用法是错误的,下面的才是正确的姿势:
from elasticsearch_dsl.analysis import CustomAnalyzer ik_analyzer = CustomAnalyzer( 'ik_analyzer', tokenizer='ik_max_word', filter=['lowercase'] ) |
tokenizer 字段是必选的,这里使用 ik 分词插件提供的 ik_max_word。我还给 Live 添加了 2 个字段:
class Live(DocType): cover = Text(index='not_analyzed') # 对应专栏头图(如果有) zhuanlan_url = Text(index='not_analyzed') # 对应专栏地址 |
加上参数index='not_analyzed'
是因为这 2 个字段不用于搜索和聚合,没必要分词,就当成数据库使用了。
也给 Live 添加一些属性和方法,方便最后用 to_dict () 生成需要的全部数据:
from .speaker import User, session class Live(DocType): @property def id(self): return self._id @property def speaker(self): return session.query(User).get(self.speaker_id) @property def url(self): return LIVE_URL.format(self.id) class Meta: index = 'live130' def to_dict(self, include_extended=True): d = super().to_dict() if include_extended: d.update({ 'id': self._id, 'type': 'live', 'speaker': self.speaker.to_dict(), 'url': self.url }) return d |
其中 speaker 属性是常见的关联多个 model 的快捷方式,但是需要注意,竟然不要设计成 A 的 model 里面某个方法返回了 B 的 model 数据,B 的 model 里面也返回了 A 的 model 的数据而造成只能进行方法内 import。
用super().to_dict()
的原因是 DocType 内置了 to_dict 方法,随便提一下,而且接收 include_meta 参数,为 True 会包含 index 和 doc_type 的元数据。
把 Live 设计成异步的
这个是今天的重点,昨天说的「让 elasticsearch_dsl 支持 asyncio」就是给今天做准备。换汤不换药,说白了就是在合适的地方添加 async/await 关键字,先看个 add 的:
class Live(DocType): ... @classmethod async def add(cls, **kwargs): id = kwargs.pop('id', None) if id is None: return False live = cls(meta={'id': int(id)}, **kwargs) await live.save() return live |
现在我们挨个实现需求,首先是搜索接口,由于 DocType 包含了 search 方法,得换个名字了:
class Live(DocType): ... async def _execute(cls, s, order_by=None): # 可以选择字段的排序,前面加-表示desc,不加就是默认的asc if order_by is not None: s = s.sort(order_by) lives = await s.execute() # 执行,要在这步之前拼好查询条件 return [live.to_dict() for live in lives] @classmethod def apply_weight(cls, s, start, limit): return s.query(Q('function_score', functions=[gauss_sf, log_sf])).extra( **{'from': start, 'size': limit}) @classmethod async def ik_search(cls, query, status=None, start=0, limit=10): s = cls.search() # 多字段匹配要搜索的内容,SEARCH_FIELDS中不同字段权重不同 s = s.query('multi_match', query=query, fields=SEARCH_FIELDS) if status is not None: # 根据结束状态过滤 s = s.query('match', status=status) # 搜索是带权重的,按照之前的设计做了时间衰减和归一化 s = cls.apply_weight(s, start, limit) return await cls._execute(s) |
就是根据需求,按照 DSL 的方式来拼。我添加了些注释,看不懂的话可以按照文章开始的链接去找找答案。
然后是发现接口,7/30 天热门都是基于这个接口,只不过划定了时间:
class Live(DocType): ... @classmethod async def explore(cls, from_date=None, to_date=None, order_by=None, start=0, limit=10, topic=None): s = cls.search() if topic is not None: s = s.query(Q('term', topic_names=topic)) starts_at = {} if from_date is not None: starts_at['from'] = from_date if to_date is not None: starts_at['to'] = to_date if starts_at: s = s.query(Q('range', starts_at=starts_at)) if order_by is None: s = cls.apply_weight(s, start, limit) return await cls._execute(s, order_by) @classmethod async def get_hot_weekly(cls): today = date.today() return await cls.explore(from_date=today - timedelta(days=7), to_date=today, limit=20) @classmethod async def get_hot_monthly(cls): today = date.today() return await cls.explore(from_date=today - timedelta(days=30), to_date=today, limit=50) |
注意,explore 方法如果指定了排序方案,就不会添加时间衰减和归一化的处理了。
然后是获取用户举报的全部 Live 的方法:
class Live(DocType): ... @classmethod async def ik_search_by_speaker_id(cls, speaker_id, order_by='-starts_at'): s = cls.search() s = s.query(Q('bool', should=Q('match', speaker_id=speaker_id))) return await cls._execute(s, order_by) |
可以看到_execute 方法抽象后被重复利用了。
再然后是 suggest 接口:
class Live(DocType): ... @classmethod async def ik_suggest(cls, query, size=10): s = cls.search() s = s.suggest('live_suggestion', query, completion={ 'field': 'live_suggest', 'fuzzy': {'fuzziness': 2}, 'size': size }) suggestions = await s.execute_suggest() matches = suggestions.live_suggestion[0].options ids = [match._id for match in matches] lives = await Live.mget(ids) return [live.to_dict() for live in lives] |
其中支持 2 个编辑距离的模糊搜索。这个实现的比较简单,没有考虑拼音,也没有考虑搜索用户。值得一提的是 DocType 提供了 mget 这个获取多个 id 的接口,请善用减少网络请求,也就是给 ES 后端减压。
第 4 个获得热门话题的需求是本项目唯一用到聚合功能的地方了:
from .topic import Topic class Live(DocType): @classmethod async def get_hot_topics(cls, size=50): s = cls.search() s.aggs.bucket('topics', A('terms', field='topics', size=size)) rs = await s.execute() buckets = rs.aggregations.topics.buckets topic_names = [r['key'] for r in buckets] topics = session.query(Topic).filter(Topic.name.in_(topic_names)).all() topics = sorted(topics, key=lambda t: topic_names.index(t.name)) return [topic.to_dict() for topic in topics] |
每个 Live 都会打话题标签,越多的 live 打这个话题就说明它越热门。
最后要说的是 init () 方法:
async def init(): await Live.init() |
原来 import 模块的时候直接就 init 了,现在由于异步化了,直接 init 没人所以要在 loop 中用,比如在爬虫中:
from models.live import init as live_init if __name__ == '__main__': loop = asyncio.get_event_loop() crawler = Crawler() loop.run_until_complete(live_init()) loop.run_until_complete(crawler.crawl()) print('Finished in {:.3f} secs'.format(crawler.t1 - crawler.t0)) crawler.close() loop.close() es.transport.close() |
理解了嘛?
好了全部接口都完成了,但是大家有木有感觉,异步编程调试起来很麻烦,我来教一个好用的方法.
调试 async 程序
asyncio 要求把需要协程化的函数都放进一个 loop,通过 run_until_complete 方法让它执行完成。
但是现在非常不好玩:
In : from models import Live In : live = Live.get(789840559912009728) In : live Out: <coroutine object DocType.get at 0x10a0d1fc0> In : live.subject --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-4-8c237874146c> in <module>() ----> 1 live.subject AttributeError: 'coroutine' object has no attribute 'subject' |
异步化的函数 (方法) 用起来很不直观。一开始可以写个脚本把要调试的东西放进去用 (test_es.py):
import asyncio from elasticsearch_dsl.connections import connections from models.live import Live, SEARCH_FIELDS, init as live_init s = Live.search() es = connections.get_connection(Live._doc_type.using) async def print_info(): rs = await s.query('multi_match', query='python', fields=SEARCH_FIELDS).execute() print(rs) loop = asyncio.get_event_loop() loop.run_until_complete(live_init()) loop.run_until_complete(print_info()) loop.close() es.transport.close() |
这样也是可以调试的,很麻烦,对吧?
抽象一下,其实写个函数就好了:
import asyncio def execute(coro): loop = asyncio.get_event_loop() rs = loop.run_until_complete(coro) return rs |
OK, 再用:
In : from models import Live, execute In : live = Live.get(789840559912009728) In : live = execute(live) In : live.subject Out: 'Python 工程师的入门和进阶' |
这样就方便多了。