admin管理员组文章数量:1321401
So I have some data that might include something like this:
"foo": ["a","b","c","d","e"]
And another document might look like this:
"foo": ["b","c","e","a","d"]
What I want to do in a query is be able to query for something like:
"query": {
"bool": {
"terms": {
"foo": [ "c" ]
}
}
}
And then aggregate based on the item that comes immediately before and, separately, immediately after c
in the foo
array.
So I'd expect something like this in the output:
"aggregations": {
"previous": {
"buckets": [
{
"key":"b",
"doc_count": 2
}
]
},
"next": {
"buckets": [
{
"key":"d",
"doc_count": 1
},
{
"key":"e",
"doc_count": 1
}
]
}
}
Is there some clever aggregation that might pull this off? Or else, should I restructure the data in some way to make this doable. For example, maybe I add something like this to the data (for my first example data):
"following": {
"a":"b",
"b":"c",
"c":"d",
"d":"e"
}
And then maybe I could aggregate over the second item in those pairs given a particular first item? I'm not 100% sure what that query would need to look like either, at the moment. And this would obviously blow up my data a bit with a lot of redundancy.
The only other option I can think of, that I'd like to try to avoid, is pulling all the docs that match the query in to my app and sorting them myself there.
So I have some data that might include something like this:
"foo": ["a","b","c","d","e"]
And another document might look like this:
"foo": ["b","c","e","a","d"]
What I want to do in a query is be able to query for something like:
"query": {
"bool": {
"terms": {
"foo": [ "c" ]
}
}
}
And then aggregate based on the item that comes immediately before and, separately, immediately after c
in the foo
array.
So I'd expect something like this in the output:
"aggregations": {
"previous": {
"buckets": [
{
"key":"b",
"doc_count": 2
}
]
},
"next": {
"buckets": [
{
"key":"d",
"doc_count": 1
},
{
"key":"e",
"doc_count": 1
}
]
}
}
Is there some clever aggregation that might pull this off? Or else, should I restructure the data in some way to make this doable. For example, maybe I add something like this to the data (for my first example data):
"following": {
"a":"b",
"b":"c",
"c":"d",
"d":"e"
}
And then maybe I could aggregate over the second item in those pairs given a particular first item? I'm not 100% sure what that query would need to look like either, at the moment. And this would obviously blow up my data a bit with a lot of redundancy.
The only other option I can think of, that I'd like to try to avoid, is pulling all the docs that match the query in to my app and sorting them myself there.
Share Improve this question asked Jan 22 at 16:12 Matt BurlandMatt Burland 45.2k18 gold badges107 silver badges179 bronze badges4 Answers
Reset to default 1You can use an ingest pipeline and enrich your data by creating previous
and next
fields. You can keep the values in those fields. Here is an example for you.
PUT _ingest/pipeline/neighbor_aggregator
{
"description": "Extracts items before and after a target value in an array",
"processors": [
{
"script": {
"source": """
def target = 'c'; // Target value to search for
if (ctx.foo != null && ctx.foo instanceof List) {
def index = ctx.foo.indexOf(target);
if (index != -1) {
ctx.previous = index > 0 ? ctx.foo[index - 1] : null;
ctx.next = index < ctx.foo.size() - 1 ? ctx.foo[index + 1] : null;
}
}
"""
}
}
]
}
POST _ingest/pipeline/neighbor_aggregator/_simulate
{
"docs": [
{
"_source": {
"foo": ["a", "b", "c", "d", "e"]
}
},
{
"_source": {
"foo": ["b", "c", "e", "a", "d"]
}
},
{
"_source": {
"foo": ["c", "b", "a"]
}
}
]
}
POST _bulk?pipeline=neighbor_aggregator
{ "index": { "_index": "your_index_name" } }
{ "foo": ["a", "b", "c", "d", "e"] }
{ "index": { "_index": "your_index_name" } }
{ "foo": ["b", "c", "e", "a", "d"] }
{ "index": { "_index": "your_index_name"} }
{ "foo": ["c", "b", "a"] }
GET your_index_name/_search
{
"size": 0,
"aggs": {
"previous": {
"terms": {
"field": "previous.keyword"
}
},
"next": {
"terms": {
"field": "next.keyword"
}
}
}
}
The result will be like this:
"aggregations": {
"previous": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "b",
"doc_count": 2
}
]
},
"next": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "b",
"doc_count": 1
},
{
"key": "d",
"doc_count": 1
},
{
"key": "e",
"doc_count": 1
}
]
}
}
Approach #1. scripted_metric
Yes, you could solve this issue through re-anizing data into a new index. Every array item is a document with item itself, its previous and next items. It's possible by transform
The following scripted_metric
aggregation you could use separately or inside the transform
query
Sample documents (adapted from the Musab’s answer)
POST /previous_next/_bulk
{"create":{}}
{"foo":["a","b","c"]}
{"create":{}}
{"foo":["b","c","e","a","d"]}
{"create":{}}
{"foo":["c","b","a"]}
{"create":{}}
{"foo":["d","b","a"]}
Aggregation query
GET /previous_next/_search?filter_path=aggregations
{
"query": {
"term": {
"foo": "c"
}
},
"aggs": {
"previous_next": {
"scripted_metric": {
"init_script": """
state.keys = new HashMap();
state.keys['previous'] = new HashMap();
state.keys['next'] = new HashMap();
""",
"map_script": """
void incrementMapValue(Map map, String key, String previousOrNext) {
int count = map[previousOrNext].getOrDefault(key, 0);
map[previousOrNext][key] = count + 1;
}
String item = params.item;
List keys = params['_source']['foo'];
String previousKey;
String nextKey;
if (keys.contains(item)) {
for (int i = 0; i < keys.size(); i++) {
if (keys.get(i).equals(item)) {
previousKey = (i == 0) ? "first" : keys.get(i - 1);
incrementMapValue(state.keys, previousKey, 'previous');
nextKey = (i == (keys.size() - 1)) ? "last" : keys.get(i + 1);
incrementMapValue(state.keys, nextKey, 'next');
}
}
}
""",
"combine_script": "return state.keys",
"reduce_script": """
Map nextPreviousTotal = new HashMap();
for (map in states) {
nextPreviousTotal.putAll(map);
}
return nextPreviousTotal;
""",
"params": {
"item": "c"
}
}
}
}
}
Query filters documents with the c
item. The previous_next
aggregation puts the previous and next keys into the map with a structure as below in a response. You should duplicate the item in the params
section of the aggregation. I've tested query on the single-sharded cluster
Response
{
"aggregations" : {
"previous_next" : {
"value" : {
"next" : {
"b" : 1,
"last" : 1,
"e" : 1
},
"previous" : {
"b" : 2,
"first" : 1
}
}
}
}
}
Approach #2. Runtime fields
Or you could use a runtime fields for the previous and next keys. The fields contain divided scripts from the scripted_metric
aggregation (see Approach #1). Documents with the foo
fields are filtered by the exists
query. Sample documents are in the Approach #1 answer as well
GET /previous_next/_search?filter_path=aggregations
{
"runtime_mappings": {
"previous": {
"type": "keyword",
"script": {
"source": """
String previousKey = 'missing';
String item = params.item;
List keys = params['_source']['foo'];
if (!keys.contains(item)) {
emit(previousKey);
}
for (int i = 0; i < keys.size(); i++) {
if (keys.get(i).equals(item)) {
previousKey = (i == 0) ? "first" : keys.get(i - 1);
emit(previousKey)
}
}
""",
"params": {
"item": "c"
}
}
},
"next": {
"type": "keyword",
"script": {
"source": """
String nextKey = 'missing';
String item = params.item;
List keys = params['_source']['foo'];
if (!keys.contains(item)) {
emit(nextKey);
}
for (int i = 0; i < keys.size(); i++) {
if (keys.get(i).equals(item)) {
nextKey = (i == (keys.size() - 1)) ?
"last" : keys.get(i + 1);
emit(nextKey)
}
}
""",
"params": {
"item": "c"
}
}
}
},
"query": {
"exists": {
"field": "foo"
}
},
"aggs": {
"previous": {
"terms": {
"field": "previous"
}
},
"next": {
"terms": {
"field": "next"
}
}
}
}
Response
{
"aggregations" : {
"next" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "b",
"doc_count" : 1
},
{
"key" : "e",
"doc_count" : 1
},
{
"key" : "last",
"doc_count" : 1
},
{
"key" : "missing",
"doc_count" : 1
}
]
},
"previous" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "b",
"doc_count" : 2
},
{
"key" : "first",
"doc_count" : 1
},
{
"key" : "missing",
"doc_count" : 1
}
]
}
}
}
The answers here weren't exactly what I needed, but thanks to everybody who contributed.
What I ended up with was something like this:
"aggs": {
"prev": {
"terms": {
"script": {
"source": "
def index = doc[`foo`].indexOf(params.target);
if (index != -1) {
return index > 0 ? doc[`foo`][index - 1] : null;
}
else {
return null;
}
",
"params": {
"target": "C"
}
},
"size": 5
}
},
"next": {
"terms": {
"script": {
"source": "
def index = doc[`foo`].indexOf(params.target);
if (index != -1) {
return index < doc[`foo`].size() - 1 ? doc[`foo`][index + 1] : null;
}
else {
return null;
}
",
"params": {
"target": "C"
}
},
"size": 5
}
}
}
Which seems to do the job, for now at least. If it turns out to be too slow when it's finally implemented on real data, then it might need to be revisted.
One thing I'm not particularly happy about is this repeated line:
def index = doc[`foo`].indexOf(params.target);
It would be nice if I could do that indexOf
only once. Although in the real data we have the foo
field is probably relatively small.
本文标签: elasticsearchAggregating over previous and next item in an arrayStack Overflow
版权声明:本文标题:elasticsearch - Aggregating over previous and next item in an array - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742034330a2417050.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论