知乎Live全文搜索之模型设计和爬虫实现
/ / / 阅读数:4561晒一下知乎 Live 团队送的新年礼物:
感谢。所以我将送上一系列知乎 Live 主题的文章
在我的 知乎 Live 里面,我留下了调查问卷,其中一项是可以反馈自己感兴趣的 Python 相关技术。意料之内,最受大家欢迎的就是爬虫。所以最近开始写了一些和爬虫有关的内容。不过虽然现在还是 1 月,但是我发誓这是本年最后一次写爬虫文章了(要写吐了)。
模型设计
做知乎 Live 的全文搜索,首先要抓取到全部 Live 信息。我尝试了下,只要是登录状态就可以使用知乎提供的 API 获取。
接口分为 2 种:
- https://api.zhihu.com/lives/ongoing?purchasable=0&limit=10&offset=10(未结束)
- https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=10(已结束)
每个 Live 包含非常多的字段,还包含了一些预留字段。我们简单分析下并实现对应的模型。
主讲人模型
主讲人字段如下:
speaker: { member: { name: "Flood Sung", url: "https://api.zhihu.com/people/23deec836a24f295500a6d740011359c", type: "people", user_type: "people", headline: "Deep Learning/Reinforcement Learning/Robotics/", avatar_url: "https://pic3.zhimg.com/73a71f47d66e280735a6c786131bdfe2_s.jpg", gender: 1, url_token: "flood-sung", id: "23deec836a24f295500a6d740011359c" }, bio: "Deep Learning/Reinforcement Learning/Robotics/", description: "我是 Flood Sung ..." } |
主讲人部分我单独存储,不直接放进 elasticsearch 的原因有 2 个:
- 全文搜索和主讲人有关的字段只有主讲人的名字和描述,其他内容没有意义
- 主讲人可能会有多个 Live,重复存储浪费空间
所以我还是按着惯例,存在主流数据库中,由于目前 Live 总体才 2k 多,数据量太小,且更新机会不多(爬虫抓取是离线的)直接放进了 SQLite 就好了。
为了未来在数据量变大需要改变存储的数据库时可以灵活切换,我使用提供对象关系映射(ORM)的 SQLAlchemy 来写模型(models/speaker.py):
from sqlalchemy import Column, String, Integer, create_engine, SmallInteger from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base DB_URI = 'sqlite:///user.db' Base = declarative_base() engine = create_engine(DB_URI) Base.metadata.bind = engine Session = sessionmaker(bind=engine) session = Session() class User(Base): __tablename__ = 'live_user' id = Column(Integer, unique=True, primary_key=True, autoincrement=True) speaker_id = Column(String(40), index=True, unique=True) name = Column(String(40), index=True, nullable=False) gender = Column(SmallInteger, default=2) headline = Column(String(200)) avatar_url = Column(String(100), nullable=False) bio = Column(String(200)) description = Column(String()) @classmethod def add(cls, **kwargs): speaker_id = kwargs.get('speaker_id', None) if id is not None: r = session.query(cls).filter_by(speaker_id=speaker_id).first() if r: return r try: r = cls(**kwargs) session.add(r) session.commit() except: session.rollback() raise else: return r Base.metadata.create_all() |
speaker_id 就是上面 API 中的 speaker['member']['id'],但是由于它是一个字符串,和常见的主键使用整数不一样(虽然也可以),所以多加一个字段存储。另外额外的写了一个 add 方法,相当于创建之前通过 speaker_id 先检查下库中是否存在对应条目。
Live 模型
Live 字段比较多,为了未来字段格式改变造成文本不可用,得列一下:
{ "liked": false, "conv_id": "58747b778d6d81006c40cee7", "topics": [ { "url": "https://api.zhihu.com/topics/20038840", "avatar_url": "https://pic3.zhimg.com/50/d9e63efc57a9f07378ae9d5416ecf85a_s.png", "type": "topic", "id": "20038840", "name": "阿尔法围棋(AlphaGo)" } ], "seats": { "taken": 286, "max": 500 }, "duration": 5400, "id": "802155571712253952", "subject": "从 AlphaGo 看人工智能前沿技术", "feedback_score": 4.5, "fee": { "amount": 1900, "unit": "RMB" }, "purchasable": true, "has_feedback": false, "note": "版权声明:...", "source": "admin", "cospeakers": [ ], "speaker": { }, "income": null, "role": "visitor", "in_promotion": false, "badge": { "avatar_url": "https://pic1.zhimg.com/8e3b9024d62aa293d8ba7235701a9a08_r.png", "id": 0, "name": "普通票" }, "status": "ended", "ends_in": 0, "description": "我是 Flood Sung ...", "speaker_message_count": 127, "tags": [ { "score": 0, "name": "互联网", "short_name": "互联网", "available_num": 191, "created_at": 1469691129, "id": 105, "live_num": 195 } ], "is_muted": false, "liked_num": 547, "alert": "从看到听...", "can_speak": false, "artwork": "", "is_public": 1, "ends_at": 1484920522, "outline": "* AlphaGo 为什么能在围棋上取得如此重大的突破?...", "is_anonymous": false, "created_at": 1484028791, "related_members": [ ], "product_list": [ ], "starts_at": 1484913600, "is_admin": false } |
这些字段通过名字能比较清晰的了解用途。接着我们考虑用什么 elasticsearch 的 Python 客户端。官方提供了 elasticsearch-py 这个低级别客户端。
最开始我使用了 elasticsearch-py,所谓低级别,就是各种操作未做或者少做封装,比如搜索用到的参数就要手动拼成一个字典。相信如果你之前了解或者用过 elasticsearch,就会知道它的搜索参数多的令人发指。如果业务需求比较简单,elasticsearch-py 还是可满足的。随着需求变多变复杂,你会发现拼这样一个多键值、多层嵌套字典的逻辑变得越来越不可维护,且写错了不容易发现,如果你对业务和 elasticsearch 不熟悉,非常容易掉坑儿。
那有没有什么更好写搜索和模型的方式嘛?
官方提供了基于 elasticsearch-py 的高级别客户端 elasticsearch-dsl-py 。DSL 是领域专用语言(Domain Specific Language)的简称,也就是专门针对某一特定问题的计算机语言,特点是「求专不求全」。elasticsearch-dsl-py 就是针对 elasticsearch 的特定语言。
它允许我们用一种非常可维护的方法来组织字典:
In : from elasticsearch_dsl.query import Q In : Q('multi_match', subject='python').to_dict() Out: {'multi_match': {'subject': 'python'}} |
允许用一种函数式的方法来做查询:
In : from elasticsearch import Elasticsearch In : from elasticsearch_dsl import Search, Q In : s = Search(using=client, index='live') In : s = s.query('match', subject='python').query(~Q('match', description='量化')) In : s.execute() Out: <Response: [<Hit(live/live/789840559912009728): {'subject': 'Python 工程师的入门和进阶', 'feedback_score': 4.5, 'stat...}>]> |
上述例子表示从 live 这个索引(类似数据库中的 Database)中找到 subject 字典包含 python,但是 description 字段不包含量化的 Live。
当然这个 DSL 也支持用类代表一个 doc_type(类似数据库中的 Table),实现 ORM 的效果。我们就用它来写 Live 模型:
from elasticsearch_dsl import DocType, Date, Integer, Text, Float, Boolean from elasticsearch_dsl.connections import connections from elasticsearch_dsl.query import SF, Q from config import SEARCH_FIELDS from .speaker import User, session connections.create_connection(hosts=['localhost']) class Live(DocType): id = Integer() speaker_id = Integer() feedback_score = Float() # 评分 topic_names = Text(analyzer='ik_max_word') # 话题标签名字 seats_taken = Integer() # 参与人数 subject = Text(analyzer='ik_max_word') # 标题 amount = Float() # 价格(RMB) description = Text(analyzer='ik_max_word') status = Boolean() # public(True)/ended(False) starts_at = Date() outline = Text(analyzer='ik_max_word') # Live内容 speaker_message_count = Integer() tag_names = Text(analyzer='ik_max_word') liked_num = Integer() class Meta: index = 'live' @classmethod def add(cls, **kwargs): id = kwargs.pop('id', None) if id is None: return False live = cls(meta={'id': id}, **kwargs) live.save() return live |
爬虫
在之前我已经分享了一个功能比较齐备的 基于 aiphttp 的爬虫实现 ,还分享过如何 使用 API 登录知乎并获得 token 。今天用之前的积累,实现这个爬虫。由于之前已经展示用完整代码,今天只介绍如何让它们「串」起来。
首先是初始化部分:
... from models import User, Live, session from client import ZhihuClient from utils import flatten_live_dict from config import SPEAKER_KEYS, LIVE_KEYS LIVE_API_URL = 'https://api.zhihu.com/lives/{type}?purchasable=0&limit=10&offset={offset}' # noqa LIVE_TYPE = frozenset(['ongoing', 'ended']) class Crawler: def __init__(self, max_redirect=10, max_tries=4, max_tasks=10, *, loop=None): ... for t in LIVE_TYPE: for offset in range(self.max_tasks): self.add_url(LIVE_API_URL.format(type=t, offset=offset * 10)) client = ZhihuClient() self.headers = {} client.auth(self) |
初始化的时候通过 add_url 添加 2 种 API、共 2 倍 max_tasks 个 url。由于 aiohttp 的 auth 参数不支持原来 requests 的 ZhihuOAuth,所以添加一个值为空字典的 self.headers 方便生成包含知乎 API 请求需要的 headers。
知乎 API 设计的不错,返回的结果中包含了上一页和下一页的地址和是否结束:
paging: { is_end: false, next: "https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=20", previous: "https://api.zhihu.com/lives/ended?purchasable=0&limit=10&offset=0" } |
所以 parse_link 的结果也就是只是一个 url 了。我把存储爬取下来的 Live 和主讲人信息的方法也放进了这个方法:
async def parse_link(self, response): rs = await response.json() if response.status == 200: for live in rs['data']: speaker = live.pop('speaker') speaker_id = speaker['member']['id'] user = User.add(speaker_id=speaker_id, **flatten_live_dict(speaker, SPEAKER_KEYS)) live_dict = flatten_live_dict(live, LIVE_KEYS) if live_dict['id'] == LAST_INSERT_ID: self._stopped = True return live_dict['speaker_id'] = user.id live_dict['topic_names'] = ' '.join( [t['name'] for t in live_dict.pop('topics')]) live_dict['seats_taken'] = live_dict.pop('seats')['taken'] live_dict['amount'] = live_dict.pop('fee')['amount'] / 100 live_dict['status'] = live_dict['status'] == 'public' live_dict['tag_names'] = ' '.join( set(sum([(t['name'], t['short_name']) for t in live_dict.pop('tags')], ()))) live_dict['starts_at'] = datetime.fromtimestamp( live_dict['starts_at']) Live.add(**live_dict) paging = rs['paging'] if not paging['is_end']: next_url = paging['next'] return paging['next'] else: print('HTTP status_code is {}'.format(response.status)) |
其中 flatten_live_dict 是从嵌套的字典里面把需要的键值抽出来:
def flatten_live_dict(d, keys=[]):
def items():
for key, value in d.items():
if key in keys:
yield key, value
elif isinstance(value, dict):
for subkey, subvalue in flatten_live_dict(value, keys).items():
if subkey != 'id' and subkey in keys:
yield subkey, subvalue
return dict(items())
这个函数对 id 做了特殊处理,是因为在 flatten 过程中,有多个 id 会被无情替换,所以额外处理了。
其中参数 keys 分 2 种:
LIVE_KEYS = ['id', 'feedback_score', 'seats', 'subject', 'fee', 'description', 'status', 'starts_at', 'outline', 'speaker_message_count', 'liked_num', 'tags', 'topics'] SEARCH_FIELDS = ['subject', 'description', 'outline', 'tag_names', 'topic_names'] |
这样就完成了一个爬虫。我没有添加爬虫的断点续爬(发现之前爬过停止抓取)功能。这是因为 Live 一般在结束后都还是可以有人参与,去评价。反正总量也就是一分钟抓取完而已。如果想要添加这种减少重复抓取的功能,最简单的方式如下:
... search = Live.search() if search.count(): LAST_INSERT_ID = search.sort('starts_at').execute()[0]._id else: LAST_INSERT_ID = 0 class Crawler: def __init__(self, max_redirect=10, max_tries=4, max_tasks=10, *, loop=None): ... self._stopped = False async def parse_link(self, response): rs = await response.json() if response.status == 200: if live_dict['id'] == LAST_INSERT_ID: self._stopped = True return ... def add_url(self, url, max_redirect=None): if max_redirect is None: max_redirect = self.max_redirect if url not in self.seen_urls and not self._stopped: self.seen_urls.add(url) self.q.put_nowait((url, max_redirect)) |