admin管理员组

文章数量:1321401

So I have some data that might include something like this:

"foo": ["a","b","c","d","e"]

And another document might look like this:

"foo": ["b","c","e","a","d"]

What I want to do in a query is be able to query for something like:

"query": {
   "bool": {
      "terms": {
         "foo": [ "c" ]
      }
   }
}

And then aggregate based on the item that comes immediately before and, separately, immediately after c in the foo array.

So I'd expect something like this in the output:

"aggregations": {
   "previous": {
      "buckets": [
         {
            "key":"b",
            "doc_count": 2
         }
      ]
   },
   "next": {
      "buckets": [
         {
            "key":"d",
            "doc_count": 1
         },
         {
            "key":"e",
            "doc_count": 1
         }
      ]
   }
}

Is there some clever aggregation that might pull this off? Or else, should I restructure the data in some way to make this doable. For example, maybe I add something like this to the data (for my first example data):

"following": {
    "a":"b",
    "b":"c",
    "c":"d",
    "d":"e"
}

And then maybe I could aggregate over the second item in those pairs given a particular first item? I'm not 100% sure what that query would need to look like either, at the moment. And this would obviously blow up my data a bit with a lot of redundancy.

The only other option I can think of, that I'd like to try to avoid, is pulling all the docs that match the query in to my app and sorting them myself there.

So I have some data that might include something like this:

"foo": ["a","b","c","d","e"]

And another document might look like this:

"foo": ["b","c","e","a","d"]

What I want to do in a query is be able to query for something like:

"query": {
   "bool": {
      "terms": {
         "foo": [ "c" ]
      }
   }
}

And then aggregate based on the item that comes immediately before and, separately, immediately after c in the foo array.

So I'd expect something like this in the output:

"aggregations": {
   "previous": {
      "buckets": [
         {
            "key":"b",
            "doc_count": 2
         }
      ]
   },
   "next": {
      "buckets": [
         {
            "key":"d",
            "doc_count": 1
         },
         {
            "key":"e",
            "doc_count": 1
         }
      ]
   }
}

Is there some clever aggregation that might pull this off? Or else, should I restructure the data in some way to make this doable. For example, maybe I add something like this to the data (for my first example data):

"following": {
    "a":"b",
    "b":"c",
    "c":"d",
    "d":"e"
}

And then maybe I could aggregate over the second item in those pairs given a particular first item? I'm not 100% sure what that query would need to look like either, at the moment. And this would obviously blow up my data a bit with a lot of redundancy.

The only other option I can think of, that I'd like to try to avoid, is pulling all the docs that match the query in to my app and sorting them myself there.

Share Improve this question asked Jan 22 at 16:12 Matt BurlandMatt Burland 45.2k18 gold badges107 silver badges179 bronze badges
Add a comment  | 

4 Answers 4

Reset to default 1

You can use an ingest pipeline and enrich your data by creating previous and next fields. You can keep the values in those fields. Here is an example for you.

PUT _ingest/pipeline/neighbor_aggregator
{
  "description": "Extracts items before and after a target value in an array",
  "processors": [
    {
      "script": {
        "source": """
          def target = 'c';  // Target value to search for
          if (ctx.foo != null && ctx.foo instanceof List) {
            def index = ctx.foo.indexOf(target);
            if (index != -1) {
              ctx.previous = index > 0 ? ctx.foo[index - 1] : null;
              ctx.next = index < ctx.foo.size() - 1 ? ctx.foo[index + 1] : null;
            }
          }
        """
      }
    }
  ]
}

POST _ingest/pipeline/neighbor_aggregator/_simulate
{
  "docs": [
    {
      "_source": {
        "foo": ["a", "b", "c", "d", "e"]
      }
    },
    {
      "_source": {
        "foo": ["b", "c", "e", "a", "d"]
      }
    },
    {
      "_source": {
        "foo": ["c", "b", "a"]
      }
    }
  ]
}


POST _bulk?pipeline=neighbor_aggregator
{ "index": { "_index": "your_index_name" } }
{ "foo": ["a", "b", "c", "d", "e"] }
{ "index": { "_index": "your_index_name" } }
{ "foo": ["b", "c", "e", "a", "d"] }
{ "index": { "_index": "your_index_name"} }
{ "foo": ["c", "b", "a"] }

GET your_index_name/_search
{
  "size": 0,
  "aggs": {
    "previous": {
      "terms": {
        "field": "previous.keyword"
      }
    },
    "next": {
      "terms": {
        "field": "next.keyword"
      }
    }
  }
}

The result will be like this:

  "aggregations": {
    "previous": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "b",
          "doc_count": 2
        }
      ]
    },
    "next": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "b",
          "doc_count": 1
        },
        {
          "key": "d",
          "doc_count": 1
        },
        {
          "key": "e",
          "doc_count": 1
        }
      ]
    }
  }

Approach #1. scripted_metric

Yes, you could solve this issue through re-anizing data into a new index. Every array item is a document with item itself, its previous and next items. It's possible by transform

The following scripted_metric aggregation you could use separately or inside the transform query

Sample documents (adapted from the Musab’s answer)

POST /previous_next/_bulk
{"create":{}}
{"foo":["a","b","c"]}
{"create":{}}
{"foo":["b","c","e","a","d"]}
{"create":{}}
{"foo":["c","b","a"]}
{"create":{}}
{"foo":["d","b","a"]}

Aggregation query

GET /previous_next/_search?filter_path=aggregations
{
    "query": {
        "term": {
            "foo": "c"
        }
    },
    "aggs": {
        "previous_next": {
            "scripted_metric": {
                "init_script": """
                    state.keys = new HashMap();
                    state.keys['previous'] = new HashMap();
                    state.keys['next'] = new HashMap();
                """,
                "map_script": """
                    void incrementMapValue(Map map, String key, String previousOrNext) {
                        int count = map[previousOrNext].getOrDefault(key, 0);
                        map[previousOrNext][key] = count + 1;
                    }
                    
                    String item = params.item;
                    List keys = params['_source']['foo'];
                    String previousKey;
                    String nextKey;
                    if (keys.contains(item)) {
                        for (int i = 0; i < keys.size(); i++) {
                            if (keys.get(i).equals(item)) {
                                previousKey = (i == 0) ? "first" : keys.get(i - 1);
                                incrementMapValue(state.keys, previousKey, 'previous');
                                nextKey = (i == (keys.size() - 1)) ? "last" : keys.get(i + 1);
                                incrementMapValue(state.keys, nextKey, 'next');
                            }
                        }
                    }
                    """,
                "combine_script": "return state.keys",
                "reduce_script": """
                        Map nextPreviousTotal = new HashMap();
                        for (map in states) {
                            nextPreviousTotal.putAll(map);
                        }
                    return nextPreviousTotal;
                """,
                "params": {
                    "item": "c"
                }
            }
        }
    }
}

Query filters documents with the c item. The previous_next aggregation puts the previous and next keys into the map with a structure as below in a response. You should duplicate the item in the params section of the aggregation. I've tested query on the single-sharded cluster

Response

{
    "aggregations" : {
        "previous_next" : {
            "value" : {
                "next" : {
                    "b" : 1,
                    "last" : 1,
                    "e" : 1
                },
                "previous" : {
                    "b" : 2,
                    "first" : 1
                }
            }
        }
    }
}

Approach #2. Runtime fields

Or you could use a runtime fields for the previous and next keys. The fields contain divided scripts from the scripted_metric aggregation (see Approach #1). Documents with the foo fields are filtered by the exists query. Sample documents are in the Approach #1 answer as well

GET /previous_next/_search?filter_path=aggregations
{
    "runtime_mappings": {
        "previous": {
            "type": "keyword",
            "script": {
                "source": """
                    String previousKey = 'missing';
                    String item = params.item;
                    List keys = params['_source']['foo'];
                    if (!keys.contains(item)) {
                        emit(previousKey);
                    }
                    for (int i = 0; i < keys.size(); i++) {
                        if (keys.get(i).equals(item)) {
                            previousKey = (i == 0) ? "first" : keys.get(i - 1);
                            emit(previousKey)
                        }
                    }
                """,
                "params": {
                    "item": "c"
                }
            }
        },
        "next": {
            "type": "keyword",
            "script": {
                "source": """
                    String nextKey = 'missing';
                    String item = params.item;
                    List keys = params['_source']['foo'];
                    if (!keys.contains(item)) {
                        emit(nextKey);
                    }
                    for (int i = 0; i < keys.size(); i++) {
                        if (keys.get(i).equals(item)) {
                            nextKey = (i == (keys.size() - 1)) ? 
                                    "last" : keys.get(i + 1);
                            emit(nextKey)
                        }
                    }
                """,
                "params": {
                    "item": "c"
                }
            }
        }
    },
    "query": {
        "exists": {
            "field": "foo"
        }
    },
    "aggs": {
        "previous": {
            "terms": {
                "field": "previous"
            }
        },
        "next": {
            "terms": {
                "field": "next"
            }
        }
    }
}

Response

{
    "aggregations" : {
        "next" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
                {
                    "key" : "b",
                    "doc_count" : 1
                },
                {
                    "key" : "e",
                    "doc_count" : 1
                },
                {
                    "key" : "last",
                    "doc_count" : 1
                },
                {
                    "key" : "missing",
                    "doc_count" : 1
                }
            ]
        },
        "previous" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [
                {
                    "key" : "b",
                    "doc_count" : 2
                },
                {
                    "key" : "first",
                    "doc_count" : 1
                },
                {
                    "key" : "missing",
                    "doc_count" : 1
                }
            ]
        }
    }
}

The answers here weren't exactly what I needed, but thanks to everybody who contributed.

What I ended up with was something like this:

    "aggs": {
        "prev": {
            "terms": {
                "script": {
                    "source": "
          def index = doc[`foo`].indexOf(params.target);
            if (index != -1) {
              return index > 0 ? doc[`foo`][index - 1] : null;
            }
            else {
              return null;
            }
        ",
                    "params": {
                        "target": "C"
                    }
                },
                "size": 5
            }
        },
        "next": {
            "terms": {
                "script": {
                    "source": "
            def index = doc[`foo`].indexOf(params.target);
            if (index != -1) {
              return index < doc[`foo`].size() - 1 ? doc[`foo`][index + 1] : null;
            }
            else {
              return null;
            }
        ",
                    "params": {
                        "target": "C"
                    }
                },
                "size": 5
            }
        }
    }

Which seems to do the job, for now at least. If it turns out to be too slow when it's finally implemented on real data, then it might need to be revisted.

One thing I'm not particularly happy about is this repeated line:

def index = doc[`foo`].indexOf(params.target);

It would be nice if I could do that indexOf only once. Although in the real data we have the foo field is probably relatively small.

本文标签: elasticsearchAggregating over previous and next item in an arrayStack Overflow