知乎Live全文搜索之完成爬虫
/ / / 阅读数:5129看这篇文章前推荐阅读相关的如下文章:
在[知乎 Live 全文搜索之模型设计和爬虫实现」里面我已经说过这是本年度最后一次说爬虫,今天就啪啪的打脸了。主要现在的爬虫有比较大的改变,由于微信公众号文章长度限制一篇放不上,只能专门写一篇啦。
抓取话题信息
给新增的 Topic 提供数据。在 parse_live_link 中,解析到 Live 数据中包含了 topic 的 id, 基于这个 id 拼链接,然后在 fetch 方法中添加对 topic 页面的处理,新增 parse_topic_link 方法:
TOPIC_API_URL = 'https://api.zhihu.com/topics/{}'
class Crawler:
def __init__(self, max_redirect=10, max_tries=4,
max_tasks=10, *, loop=None):
...
self.seen_topics = set()
async def parse_topic_link(self, response):
rs = await response.json()
if response.status == 200:
rs['avatar_url'] = await self.convert_local_image(
rs['avatar_url'].replace('_s', '_r'))
Topic.add_or_update(**flatten_live_dict(rs, TOPIC_KEYS))
async def parse_live_link(self, response):
...
topics = live_dict.pop('topics')
for topic in topics:
topic_id = topic['id']
if topic_id not in self.seen_topics:
self.seen_topics.add(topic_id)
self.add_url(TOPIC_API_URL.format(topic_id),
self.max_redirect)
...
async def fetch(self, url, max_redirect):
try:
if 'api.zhihu.com' in url:
parse_func = (self.parse_topic_link if 'topics' in url
else self.parse_live_link)
next_url = await parse_func(response)
else:
next_url = await self.parse_zhuanlan_link(response)
...
思考下,这是不是一种套路 (模式):
- 初始化一个已完成的 url 的集合
- 启动一定量的 worker,每个 worker 都在等待从队列获取要抓取的 url
- 一次性添加足量要抓取的链接到队列中,让每个 worker 都忙起来 (执行前要确认之前没有抓取过)
- worker 在 parse 处理中又会解析到新的要抓取的链接,放入队列
- worker 消费任务,过程中也可能生产任务给自己或者其他 worker 来消费
- 全部任务都完成了,通过
self.q.join ()
结束 - 停止这些 worker,任务完成
修改 live 灌 suggest 数据的方式
在上上篇我把相关字段的文本用 analyze 接口拆分成不同的 token 成为一个列表赋值给 live_suggest,其实完全不需要这样,因为Completion(analyzer=ik_analyzer)
就是做这个的。gen_suggests 用最简单的 input+weight 就可以:
def gen_suggests(topics, tags, outline, username, subject): suggests = [{'input': item, 'weight': weight} for item, weight in ((topics, 10), (subject, 5), (outline, 3), (tags, 3), (username, 2)) if item] return suggests |
下载主讲人头像
小程序开发工具中不能直接使用知乎的图片资源,所以我只能下载下来并生成一个本地的图片地址:
import os IMAGE_FOLDER = 'static/images/zhihu' if not os.path.exists(IMAGE_FOLDER): os.mkdir(IMAGE_FOLDER) class Crawler: ... async def convert_local_image(self, pic): pic_name = pic.split('/')[-1] path = os.path.join(IMAGE_FOLDER, pic_name) if not os.path.exists(path): async with self.session.get(pic) as resp: content = await resp.read() with open(path, 'wb') as f: f.write(content) return path async def parse_live_link(self, response): ... for live in rs['data']: ... speaker = live.pop('speaker') speaker_id = speaker['member']['id'] speaker['member']['avatar_url'] = await self.convert_local_image( # noqa speaker['member']['avatar_url']) ... |
这样 User 类中的 avatar_url 最后会变成static/images/zhihu/v2-4db301967fffa08dfa727ff467170e_s.jpg
这样的地址了。未来我们将让 sanic 来提供静态资源服务。当然,也可以只存文件名,在接口返回前再去拼图片地址。
抓取专栏信息
知乎 Live 申请通过之后,主讲人可以写一篇专栏介绍 Live,文章中带上 Live 的链接来导流,知乎 Live 官方也会收录这个 Live 的专栏文章。为了让微信小程序的效果更好,我想要抓专栏头图,并且保存专栏链接,希望在小城中能给跳转进去(以证明不可行)。下面我将遍历 知乎 Live 官方专栏 收录的专栏,解析每个专栏的标题,去 ES 里面匹配是不是有对应的 subject 匹配,如果匹配还会验证用户的 hash 值确保正确,如果没找到还会从 Live 正文中搜索 live 的链接的方式来匹配。
看起来很绕,但是没办法啦,因为专栏和 live 没有明确的匹配关系,目测是知乎 2 个部门写的不同的系统。
最后要提的是专栏的抓取和 live 的 api 不同,它不提供 paging 键,也就是返回内容中并不告诉你下一页的地址,所以需要我们人工的修改链接,这需要一个转化的函数:
from urllib.parse import urlparse, parse_qsl, urlunparse, urlencode def get_next_url(url): url_parts = list(urlparse(url)) query = dict(parse_qsl(url_parts[4])) query['offset'] = int(query['offset']) + int(query['limit']) url_parts[4] = urlencode(query) return urlunparse(url_parts) |
这个方法在我实际工作中很常用:
In : get_next_url('http://dongwm.com?offset=10&limit=20') Out: 'http://dongwm.com?offset=30&limit=20' In : get_next_url('http://dongwm.com?offset=20&limit=30') Out: 'http://dongwm.com?offset=50&limit=30' |
ZHUANLAN_API_URL = 'https://zhuanlan.zhihu.com/api/columns/zhihulive/posts?limit=20&offset={offset}' LIVE_REGEX = re.compile(r'<a href="https://(www.)?zhihu.com/lives/(\d+)(.*)?"') # noqa class Crawler: def __init__(self, max_redirect=10, max_tries=4, max_tasks=10, *, loop=None): ... self.seen_zhuanlan = set() ... async def parse_zhuanlan_link(self, response): posts = await response.json() if response.status == 200 and posts: for post in posts: cover = post['titleImage'] if not cover: continue s = Live.search() title = post['title'] for sep in ('-', '—'): if sep in title: title = title.split(sep)[-1].strip() speaker_id = post['author']['hash'] zid = post['url'].split('/')[-1] s = s.query(Q('match_phrase', subject=title)) lives = await s.execute() for live in lives: if live.speaker and live.speaker.speaker_id == speaker_id: await self.update_live(zid, cover, live) break else: match = LIVE_REGEX.search(post['content']) if match: live_id = match.group(2) try: live = await Live.get(live_id) except NotFoundError: pass else: await self.update_live(zid, cover, live) return get_next_url(response.url) async def update_live(self, zid, cover, live): if live.id in self.seen_zhuanlan: return self.seen_zhuanlan.add(live.id) zhuanlan_url = ZHUANLAN_URL.format(zid) cover = await self.convert_local_image(cover) await live.update(cover=cover, zhuanlan_url=zhuanlan_url) def add_zhuanlan_urls(self): for offset in range(self.max_tasks): self.add_url(ZHUANLAN_API_URL.format(offset=offset * 20)) async def crawl(self): self.__workers = [asyncio.Task(self.work(), loop=self.loop) for _ in range(self.max_tasks)] self.t0 = time.time() await self.q.join() self.add_zhuanlan_urls() await self.q.join() self.t1 = time.time() for w in self.__workers: w.cancel() |
其中 crawl 方法中用 2 次 join 用来确保先抓取全部 live 信息之后再去抓专栏信息,因为得先确保 live 内容足够完整才能搜索匹配,其次由于 parse_live_link 和 parse_zhuanlan_link 都涉及到 Live 的更新,在并发中容易造成同时更新某些 live 而触发版本冲突的 ConflictError。
我使用s = s.query(Q('match_phrase', subject=title))
进行标题匹配,首先我们先聊聊在 ES 中 match 和 term 的区别,简单的说:
term 用于精确查询,match 用于全文检索
我们要把标题和 Live 的 subject 字段去匹配,但是由于 subject 设置了 analyzer,所以无法使用 term。除非新加一个字段,修改成类似 cover 的那种Text(index='not_analyzed')
。但是这样新增字段实在有点浪费,用 math 会把要匹配的标题分词之后挨个去匹配,匹配其中一个或多个的文档就会被搜索出来,显然不满足「精确」,所以我想到了「短语匹配」(Phrase Matching)。
短语匹配和 match 查询类似,match_phrase 查询首先解析查询字符串产生一个词条列表。但只保留含有了所有搜索词条的文档,并且还要求这些词条的顺序也一致。就是相当于虽然分词了但是词的顺序是有要求的,效果类似于精确匹配。
支持自动刷新知乎的 token
在调用知乎 API 的时候,会经常告诉我 token 过期了。我得删掉原来的 token.json 然后重新生成,这样很不爽。所以抓包分析后,通过已有的 refresh_token 自动刷新 token:
class ZhihuClient: def refresh_token(self): data = LOGIN_DATA.copy() data['grant_type'] = 'refresh_token' data['refresh_token'] = self._token.refresh_token gen_signature(data) auth = ZhihuOAuth(self._token) self.save_token(auth, data) def save_token(self, auth, data): res = self._session.post(LOGIN_URL, auth=auth, data=data) try: json_dict = res.json() if 'error' in json_dict: raise LoginException(json_dict['error']['message']) self._token = ZhihuToken.from_dict(json_dict) except (ValueError, KeyError) as e: raise LoginException(str(e)) else: ZhihuToken.save_file(self.token_file, json_dict) |
启动爬虫首先会去 get 一个 url,看看返回的状态码是不是 401,如果是就执行 refresh_token 方法获得新的 token:
class Crawler: ... async def check_token(self): async with self.session.get( LIVE_API_URL.format(type='ended', offset=0)) as resp: if resp.status == 401: self.client.refresh_token() async def crawl(self): await self.check_token() ... PS: 今天试用好像不对 |
更新用户举办的 Live 数量
之前我们给 User 添加了 incr_live_count 这个方法,调用一次 live_count 就 + 1,由于这个爬虫每次都是重新过一遍,所以需要仅在创建 live 的时候才更新:
async def parse_live_link(self, response): ... result = await Live.add(**live_dict) if result.meta['version'] == 1: user.incr_live_count() |
ES 每次每次都会返回添加 / 更新的结果,其中的 version 字段正好被我们利用。
优化抓取
终于到最后一个小节了。再次道歉,之前分享的爬虫其中有一句检查要抓取的 url 是不是在 self.seen_uls 里面的判断,如果已经抓取过就 assert 抛了异常,这其实造成最后就剩下一个协程在执行任务了。
现在我重构了这部分的内容,大家看代码体会吧:
class Crawler: def __init__(self, max_redirect=10, max_tries=4, max_tasks=10, *, loop=None): self.__stopped = {}.fromkeys(['ended', 'ongoing', 'posts'], False) async def fetch(self, url, max_redirect): ... if next_url is not None: self.add_url(next_url, max_redirect) else: # 如果live或者知乎官方专栏接口不再返回下一页,这个类型的任务就停止 for type in self.__stopped: if type in url: self.__stopped[type] = True async def work(self): try: while 1: url, max_redirect = await self.q.get() if url in self.seen_urls: type = url.split('/')[-1].split('?')[0] # 如果一个接口返回的next_url已经抓去过,自动添加next_url的下一页 # 除非这个类型的任务停止状态为True if not type.isdigit() and not self.__stopped[type]: self.add_url(get_next_url(url), max_redirect) await self.fetch(url, max_redirect) self.q.task_done() asyncio.sleep(1) except asyncio.CancelledError: pass |
这样就既不会重复抓取,也能保证 worker 都能正常工作。
截止发稿,抓取到的 Live 1967 个, 话题 656 个 完整抓取一次大概调用约 950 次 API(1967 / 10 + 1967 / 20 + 656), 在我 MacBook 上耗时 70 - 90 s。