知乎Live全文搜索之使用Elasticsearch做聚合分析
/ / / 阅读数:5099ES 除了全文搜索以外还有一个主要功能,就是数据的聚合分析。我会在微信小程序里用到聚合功能。今天先介绍一下。
目前 DSL 库支持如下三种常用聚合模式
Metrics Aggregations
顾名思义,主要是用于计算特定的度量字段,Metric 很像 SQL 中的 avg、max、min 等方法。我们找一下最多的 Live 有多少人感兴趣:
In : from elasticsearch_dsl import A In : s = Live.search() In : s.aggs.metric('max_liked_num', A('max', field='liked_num')) Out: <elasticsearch_dsl.search.Search at 0x10a47b550> In : r = s.execute() In : r.aggregations.max_liked_num Out: {'value': 6918.0} |
嚯。看看是哪一个吧:
In [27]: s.query('match', liked_num=6918).execute()[0].subject
Out[27]: '致所有近视想摘掉眼镜的你们'
有点出人意料哦~
其实度量的也不一定是文档的某个特定字段值,可以是文档通过脚本生成的值。比如我们看看全部 Live 平均收入,收入 = 票价* 参与人数,是 2 个字段。要这样用:
In : from elasticsearch_dsl import A In : s = Live.search() In : s.aggs.metric('avg_income', A('scripted_metric', init_script="params._agg['incomes'] = []", map_script="params._agg.incomes.add(doc.amou ...: nt.value * doc.seats_taken.value)", combine_script='double total=0; int num_of_income=0; for (i in params._agg.incomes) { total += i; nu ...: m_of_income += 1 } return [total, num_of_income]', reduce_script='double total=0; int num_of_income=0; for (item in params._aggs) { tota ...: l += item[0]; num_of_income += item[1]} return total / num_of_income')) Out: <elasticsearch_dsl.search.Search at 0x10a512898> In : rs = s.execute() In : rs.aggregations.avg_income Out: {'value': 32934.61029513591} |
有点长,我们把 scripted_metric 的参数分开说:
- init_script。 初始化时运行,一般是设置初始的全局变量
- map_script。会对每个文档做循环,把每个计算好的收入用 add 方法加到每个分片的 params._agg.incomes 里面。
- combine_script。我们知道 ES 是分布式的,数据有多个分片,当 map_script 完成后,它用来对每个分片的那部分结果做求和和计数的预处理
- reduce_script。如果你了解 MapReduce,我想对 2 和 4 步就能更好的理解了,这一步能通过 params._aggs 把每个分片的预处理结果拿来再做处理,最后通过总收入和 live 数求得平均值。
很庆幸没有给平均值拖后腿。BTW,有兴趣的同学可以继续挖掘为啥平均收入这么高。而且注意额,我考虑的只是普通票价,没有算那些「聊表心意」、「鼎力支持」的票,这会让平均值更高一些。
上面的例子也的好长啊。我不太满意,那么是不是可以简化一下呢?也就是 combine_script 不预计算,统一在 reduce_script 计算:
In : s = Live.search() In : s.aggs.metric('avg_income', A('scripted_metric', init_script="params._agg['incomes'] = []", map_script="params._agg.incomes.add(doc.amou ...: nt.value * doc.seats_taken.value)", combine_script='return params._agg.incomes', reduce_script='double total=0; int num_of_income=0; for ...: (shard in params._aggs) { for (income in shard) {total += income; num_of_income += 1}} return total / num_of_income')) Out: <elasticsearch_dsl.search.Search at 0x10a53a470> In : r = s.execute() In : r.aggregations.avg_income Out: {'value': 32934.61029513591} |
只是在 combine_script 用了个嵌套循环。
Bucket Aggregations
Bucket 在英语里面有「桶」的意思,Bucket Aggregations 会把符合某种条件的文档丢进一个 Bucket,而且还可以实现子聚合(sub-aggregations)。
Elasticsearch 是基于 Lucene 构建的。如果你了解过 Lucene,相信知道 docValue,它节省内存、做排序,分组等聚合操作时能够大大提升性能。我们之前的 model 里面大多使用了文本字段(Text),这是用作进行全文搜索的,而希望做聚合计算,需要使用 Keyword 类型的字段。所以我添加了一个 topics 字段:
from elasticsearch_dsl import Keyword, DocType class Live(DocType): ... topic_names = Text(analyzer='ik_max_word') topics = Keyword() # 新增 |
其实 DSL 还支持一种用子字段的写法:
topic_names = Text(analyzer='ik_max_word', fields={'raw': Keyword()}) |
由于担心未来 Live 的 Topic 会有多个,所以 topic_names 是一个用 join 把 topic 列表串起来的字符串,而需求上 topics 是一个或者多个 topic 的列表,还是额外新加一个字段吧。
这样重新跑爬虫,补充下 topics 字段之后,按 toics 符合数量排序,看看 live 中那些类型的 Live 更多:
In : s = Live.search() In : s.aggs.bucket('categories', A('terms', field='topics')) Out: <elasticsearch_dsl.search.Search at 0x10a532d30> In : r = s.execute() In : r.aggregations.categories.buckets Out: [{'key': '生活方式', 'doc_count': 145}, {'key': '金融', 'doc_count': 94}, {'key': '音乐', 'doc_count': 87}, {'key': '艺术', 'doc_count': 73}, {'key': '教育', 'doc_count': 59}, {'key': '科技', 'doc_count': 59}, {'key': '心理学', 'doc_count': 56}, {'key': '职业', 'doc_count': 56}, {'key': '互联网', 'doc_count': 48}, {'key': '医学', 'doc_count': 42}] |
看到了吧,彰显逼格的「生活方式」话题的 Live 最多,象征更多钱的金融话题次之...
现在是不是有种熟悉的感觉:聚合短语 terms 这不是 SQL 里面的 group by 嘛?
PS: 如果要使结果返回所有聚合结果的话,需要加上 size 参数:
s.aggs.bucket('categories', A('terms', field='topics', size=20)) |
PS: 从 ES5.0 开始,size 不再能指定 0 而返回全部结果了,需要明确指定一个大于 0 的整数。
Bucket 聚合支持多种类型,我们再演示下范围聚合。 现在把票价分成三个范围:
- 小于 20 的
- 20-100 之间的
- 大于 100 的
这样写:
In : s = Live.search()
In : s.aggs.bucket('amount_eq_100', A('range', field='amount', ranges=[{'from': 100}, {'from': 20, 'to': 100}, {'to': 20}]))
Out: Range(field='amount', ranges=[{'from': 100}, {'from': 20, 'to': 100}, {'to': 20}])
In : r = s.execute()
In : buckets = r.aggregations.amount_eq_100.buckets
In : for bucket in buckets:
...: print('{}: {}'.format(bucket['key'], bucket['doc_count']))
...:
*-20.0: 1159
20.0-100.0: 683
100.0-*: 20
最后演示下 date_histogram 型的聚合,histogram 顾名思义是直方图的意思,我们看看从 Live 诞生到现在,每个月(即将)举行 Live 的数量分别是多少:
In : s = Live.search() In : s.aggs.buckets('start_at', A('date_histogram', field='starts_at', interval='month')) Out: <elasticsearch_dsl.search.Search at 0x10a6ea3c8> In : r = s.execute() In : r.aggregations.start_at.buckets Out: [{'key_as_string': '2016-05-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-06-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-07-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-08-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-09-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-10-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-11-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-12-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-01-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-02-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-03-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-04-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-05-01T00:00:00.000Z', 'key': datetim...}] In : q = r.aggregations.start_at.buckets[0] In : q.doc_count Out: 20 In : q.key Out: datetime.datetime(2016, 5, 1, 0, 0) |
interval 支持多种类型:如 year, quarter, month, week, day, hour, minute, second 等。
Pipeline Aggregations
管道聚合是在 Elasticsearch 2.x 新增的一种聚合类型,可以在现有的聚合数据之上,再对其做一次运算。这类似 SQL 的 Subquery。
Pipeline 分为 2 类:
- parent。聚合的输入是非 Pipeline 聚合的输出,并对其进行进一步处理。一般不生成新的桶,而是对父聚合桶信息的 replace。
- sibling。聚合的输入是其他 Pipeline 聚合的输出。并能在同级上计算新的聚合。
管道聚合通过 buckets_path 参数指定他们要进行聚合计算的权值对象,格式如下:
AGG_SEPARATOR = '>' ; 指定父子聚合关系
METRIC_SEPARATOR = '.' ; 指定聚合的特定权值
AGG_NAME = <the name of the aggregation> ; 直接指定聚合的名称
METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ; 直接指定权值
PATH = <AGG_NAME> [ <AGG_SEPARATOR>, <AGG_NAME> ]* [ <METRIC_SEPARATOR>, <METRIC> ] ; 综合上面的方式指定完整路径
看 2 个例子就好懂了。首先演示 sibling 类型的,基于上节 date_histogram 聚合例子了,我们算一下每个月的 Live 总收入
In : agg = A('date_histogram', field='starts_at', interval='month') In : agg.bucket('incomes', A('sum', script={'inline': "doc['seats_taken'].value* doc['amount'].value"})) Out: Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}) In : s.aggs.bucket('incomes_per_month', agg) Out: DateHistogram(aggs={'incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"})}, field='starts_at', interval='month') |
为了构造更好理解的聚合语句,先生成一个 agg 变量,可以看到 Buckets 和 Metrics 可以用函数式的方式用多个,也要注意当需求复杂的时候都是可以通过 script 来实现的。接着加入 2 个管道,再分别获得最大月收入和全部月收入:
In : s.aggs.pipeline('max_monthly_incomes', agg_type='max_bucket', buckets_path='incomes_per_month>incomes') Out: <elasticsearch_dsl.search.Search at 0x10a75b518> In : s.aggs.pipeline('sum_monthly_incomes', agg_type='sum_bucket', buckets_path='incomes_per_month>incomes') # 注意agg_type不一样 Out: <elasticsearch_dsl.search.Search at 0x10a75b518> In : r = s.execute() In : r.aggregations.incomes_per_month.buckets Out: [{'key_as_string': '2016-05-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-06-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-07-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-08-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-09-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-10-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-11-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2016-12-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-01-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-02-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-03-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-04-01T00:00:00.000Z', 'key': datetim...}, {'key_as_string': '2017-05-01T00:00:00.000Z', 'key': datetim...}] In : a = r.aggregations.incomes_per_month.buckets[0] In : a.doc_count Out: 20 In : a.incomes Out: {'value': 210088.48763275146} In : r.aggregations.max_monthly_incomes Out: {'value': 18330342.949926138, 'keys': ['2016-10-01T00:00:00....} # 十月份收入最多 In : r.aggregations.sum_monthly_incomes Out: {'value': 61324244.369543076} # 满眼的钱,现金🐂啊 |
数完了钱,思考下。这个例子就是 sibling 聚合,因为 sum_monthly_incomes、max_monthly_incomes 和 incomes_per_month 在一个区间内的(都是 aggs 的键)。
我之前我们算过么每个月 Live 的总收入,全部 Live 的平均收入。我们现在算一下每个月 Live 的平均收入:
In : agg = A('date_histogram', field='starts_at', interval='month') In : agg.bucket('total_incomes', A('sum', script={'inline': "doc['seats_taken'].value* doc['amount'].value"})) Out: Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}) In : agg.pipeline('avg_income', agg_type='bucket_script', buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count') # _count是一个特殊的路径,表示当前bucket里面的文档数量 Out: DateHistogram(aggs={'total_incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}), 'avg_income': BucketScript(buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')}, field='starts_at', interval='month') In : s = Live.search() In : s.aggs.bucket('avg_income_per_month', agg) Out: DateHistogram(aggs={'total_incomes': Sum(script={'inline': "doc['seats_taken'].value* doc['amount'].value"}), 'avg_income': BucketScript(buckets_path={'total': 'total_incomes', 'count': '_count'}, script='params.total/params.count')}, field='starts_at', interval='month') In : r = s.execute() In : buckets = r.aggregations.avg_income_per_month.buckets In : b = buckets[0] In : b.total_incomes Out: {'value': 210088.48763275146} In : b.doc_count Out: 20 In : b.avg_income Out: {'value': 10504.424381637573} |
这就是 parent 类型的管道聚合了,它对每个桶自己去做运算。
今天先到这里了,下一篇将基于这几天对 ES 的学习实现一个对知乎 Live 进行全文搜索的微信小程序了