Elasticsearch 入门教程 – 组合搜索

ES中的聚合被分为两大类:Metric度量和bucket桶, metric很像SQL中的avg、max、min等方法,而bucket就有点类似group by了。

   http://blog.csdn.net/wwd0501/article/details/78501548

本篇就简单的介绍一下metric聚合的用法。

     metric的聚合按照值的返回类型可以分为两种:单值聚合 和 多值聚合。

单值聚合

Sum 求和

 这个聚合返回的是单个值,dsl可以参考如下:

GET /ecommerce/product/_search
{
  "aggs": {
    "sum_aggs": {
      "sum": {
        "field": "price"
      }
    }
  }
}

Java 客户端代码:

SumAggregationBuilder builder = AggregationBuilders.sum("sum_aggs").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(builder)
.execute().actionGet();
   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
   InternalSum internalSum= (InternalSum) aggrMap.get("sum_aggs");
   System.out.println("   {name:"+ internalSum.getName() +",sum:"+internalSum.getValue() +"}");

返回的是price 字段的和:

返回:

"aggregations": {

    "sum_aggs": {

      "value": 95

    }

  }

其中sum_aggs 是聚合的名字,同时也会作为请求返回的id值。另外,聚合中是支持脚本的,这里就不过多赘述了,详细参考官方文档即可。

Min 求最小值

GET /ecommerce/product/_search
{
  "aggs": {
     "min_price": {
      "min": {
             "field": "price"
          }
    }
  }
}

Java客户端代码 :

MinAggregationBuilder builder = AggregationBuilders.min("min_aggs").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(builder)
.execute().actionGet();
   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
   InternalMin internalMin= (InternalMin) aggrMap.get("min_aggs");
   System.out.println("   {name:"+ internalMin.getName() +",min:"+internalMin.getValue() +"}");

Max 求最大值

GET /ecommerce/product/_search
{
  "aggs": {
    "max_price": {
      "max": {
        "field": "price"
      }
    }
  }
}

Java客户端代码 :

MaxAggregationBuilder builder = AggregationBuilders.max("max_aggs").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(builder)
.execute().actionGet();
   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
   InternalMax internalMax= (InternalMax) aggrMap.get("max_aggs");
   System.out.println("   {name:"+ internalMax.getName() +",max:"+internalMax.getValue() +"}");

avg 求平均值

GET /ecommerce/product/_search
{
  "aggs": {
    "avg_price": {
      "avg": {
        "field": "price"
      }
    }
  }
}

Java 客户端:

AvgAggregationBuilder builder = AggregationBuilders.avg("avg_aggs").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(builder)
.execute().actionGet();
   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
   InternalAvg internalAvg= (InternalAvg) aggrMap.get("avg_aggs");
   System.out.println("   {name:"+ internalAvg.getName() +",avg:"+internalAvg.getValue() +"}");

Elasticsearch 两种近似聚合查询cardinality(基数) 和 percentiles(百分位数)

cardinality 基数:查找唯一值的数目

  Elasticsearch 提供的首个近似聚合是基数度量。它提供一个字段的基数,即该字段的唯一值的数目。可能会对 SQL 形式比较熟悉:

   SELECT DISTINCT(color) FROM cars

  Distinct 计数是一个普通的操作,可以回答很多基本的商业问题:

    • 网站的独立访问用户(UVs)是多少?

    • 卖了多少种汽车?

    • 每月有多少独立用户购买了商品?

    我们可以用基数度量确定经销商销售汽车颜色的种类:

  GET /cars/transactions/_search
  {
      "size" : 0,
      "aggs" : {
          "distinct_colors" : {
              "cardinality" : {
                "field" : "color"
              }
          }
      }
  }

返回的结果表明已经售卖了三种不同颜色的汽车:

… "aggregations": { "distinct_colors": { "value": 3 } } …

可以让我们的例子变得更有用:每月有多少颜色的车被售出?为了得到这个度量,我们只需要将一个 cardinality 度量嵌入一个 date_histogram:

GET /cars/transactions/_search
  {
    "size" : 0,
    "aggs" : {
        "months" : {
          "date_histogram": {
            "field": "sold",
            "interval": "month"
          },
          "aggs": {
            "distinct_colors" : {
                "cardinality" : {
                  "field" : "color"
                }
            }
          }
        }
    }
  }

Percentiles Aggregation #百分比统计。可以看出你网站的所有页面。加载时间的差异。

{
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time" 
            }
        }
    }
}

ElasticSearch – 信息聚合系列之近似聚合

stats 统计 (直接统计该字段的最大值/最小值/平均值/)

GET /ecommerce/product/_search
  {
    "aggs": {
      "grades_stats": {
        "stats": {
          "field": "price"
        }
      }
    }
  }

请求后会直接显示多种聚合结果:

"aggregations": {

    "grades_stats": {

      "count": 3,

      "min": 25,

      "max": 40,

      "avg": 31.666666666666668,

      "sum": 95

    }

Java客户端代码:

StatsAggregationBuilder builder = AggregationBuilders.stats("stats_aggs").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(builder)
.execute().actionGet();
   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
   InternalStats internalStats= (InternalStats) aggrMap.get("stats_aggs");
   System.out.println("   {name:"+ internalStats.getName() +",avg:"+internalStats.getAvg() +"}");

extend stats 扩展统计

{ "aggs" : { "grades_stats" : { "extended_stats" : { "field" : "grade" } } } }

在统计的基础上还增加了多种复杂的统计信息:

{ … "aggregations": { "grade_stats": { "count": 9, "min": 72, "max": 99, "avg": 86, "sum": 774, "sum_of_squares": 67028, "variance": 51.55555555555556, "std_deviation": 7.180219742846005, "std_deviation_bounds": { "upper": 100.36043948569201, "lower": 71.63956051430799 } } } }

jAVA相关API:

1. ValueCountBuilder vcb=  AggregationBuilders.count("count_uid").field("uid");  

2. (1)统计某个字段的数量  

3.  ValueCountBuilder vcb=  AggregationBuilders.count("count_uid").field("uid");  

4. (2)去重统计某个字段的数量(有少量误差)  

5. CardinalityBuilder cb= AggregationBuilders.cardinality("distinct_count_uid").field("uid");  

6. (3)聚合过滤  

7. FilterAggregationBuilder fab= AggregationBuilders.filter("uid_filter").filter(QueryBuilders.queryStringQuery("uid:001"));  

8. (4)按某个字段分组  

9. TermsBuilder tb=  AggregationBuilders.terms("group_name").field("name");  

10. (5)求和  

11. SumBuilder  sumBuilder= AggregationBuilders.sum("sum_price").field("price");  

12. (6)求平均  

13. AvgBuilder ab= AggregationBuilders.avg("avg_price").field("price");  

14. (7)求最大值  

15. MaxBuilder mb= AggregationBuilders.max("max_price").field("price");   

16. (8)求最小值  

17. MinBuilder min= AggregationBuilders.min("min_price").field("price");  

18. (9)按日期间隔分组  

19. DateHistogramBuilder dhb= AggregationBuilders.dateHistogram("dh").field("date");  

20. (10)获取聚合里面的结果  

21. TopHitsBuilder thb=  AggregationBuilders.topHits("top_result");  

22. (11)嵌套的聚合  

23. NestedBuilder nb= AggregationBuilders.nested("negsted_path").path("quests");  

24. (12)反转嵌套  

25. AggregationBuilders.reverseNested("res_negsted").path("kps ");  

上面并没有列举全面,还支持多值的percentile Rank百分比排名,Geo Bounds地理位置信息,Scripted Metric脚本;单值的top hits等等。

• 在性能上,ES也做了很多的优化:比如max和min,如果对于排序的字段,那么就直接跳过了计算的步骤,直接取出目标值即可。

• 当然有些聚合也是需要特定的场合的,比如cardinality计算唯一值是通过哈希的方式,如果字段数据规模很大,那么会消耗很多的性能。

• 另外桶之间是可以嵌套的,比如在range聚合下嵌套了一个max聚合,那么会在range得到的每个结果组上,再次进行max的统计。

• 在聚合中支持脚本的使用,可以增加统计的灵活度。

很多内容还需要在实践中使用,才能了解它的优势。

案例分析:

#计算商品总的平均价格:

GET /ecommerce/product/_search
{
  "aggs": {
    "avgs":{
      "avg": {
        "field": "price"
      }
    }
  }
}

Java客户端代码:

SearchResponse response = client.prepareSearch("ecommerce").setTypes("product").addAggregation(
AggregationBuilders.avg("avgs").field("price")
).execute().actionGet();

InternalAvg avgOfInbyte  = response.getAggregations().get("avgs");
    System.out.println("平均值是:"+avgOfInbyte.getValue());

第一个分析需求:计算每个tag下的商品数量

GET /ecommerce/product/_search
{
  "aggs": {
    "group_by_tags": {
      "terms": { "field": "tags" }
    }
  }
}

Mysql的写法:

select tag,count(*)  from ls_prod group by tag ;

直接查询会需要如下问题:

https://stackoverflow.com/questions/38145991/how-to-set-fielddata-true-in-kibana

解决方式:

PUT /ecommerce/_mapping/product
{
  "properties": {
    "tags": {
      "type": "text",
      "fielddata": true
    }
  }
}

Java客户端代码:

SearchResponse response = client.prepareSearch("ecommerce").setTypes("product").addAggregation(AggregationBuilders.terms("group_by_tags").field("tags")).execute().actionGet();

   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
StringTerms teamAgg = (StringTerms) aggrMap.get("group_by_tags");

Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();  

System.out.println("buckets: [");
while (teamBucketIt .hasNext()) {  
            Bucket buck = teamBucketIt .next(); 
            
            String key=(String) buck.getKey();
            //记录数  
            long count = buck.getDocCount(); 
            
            System.out.println("{key:"+ key +",doc_count:"+count +"}");
        }  
System.out.println("]");

第二个聚合分析的需求:对名称中包含yagao的商品,计算每个tag下的商品数量

GET /ecommerce/product/_search

{

  "size": 0,

  "query": {

    "match": {

      "name": "yagao"

    }

  },

  "aggs": {

    "all_tags": {

      "terms": {

        "field": "tags"

      }

    }

  }

}

Java客户端代码:

SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.setQuery(QueryBuilders.matchQuery("name","yagao"))
.addAggregation(AggregationBuilders.terms("group_by_tags").field("tags")).execute().actionGet();

   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
StringTerms teamAgg = (StringTerms) aggrMap.get("group_by_tags");

Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();  

System.out.println("buckets: [");
while (teamBucketIt .hasNext()) {  
            Bucket buck = teamBucketIt .next(); 
            
            String key=(String) buck.getKey();
            //记录数  
            long count = buck.getDocCount(); 
            
            System.out.println("{key:"+ key +",doc_count:"+count +"}");
        }  
System.out.println("]");

第三个聚合分析的需求:先分组,再算每组的平均值,计算每个tag下的商品的平均价格

GET /ecommerce/product/_search

{

    "size": 0,

    "aggs" : {

        "group_by_tags" : {

            "terms" : { "field" : "tags" },

            "aggs" : {

                "avg_price" : {

                    "avg" : { "field" : "price" }

                }

            }

        }

    }

}

Java客户端代码:

AvgAggregationBuilder builder = AggregationBuilders.avg("avg_by_tags").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product").addAggregation(AggregationBuilders.terms("group_by_tags").field("tags").subAggregation(builder)).execute().actionGet();


   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
StringTerms teamAgg = (StringTerms) aggrMap.get("group_by_tags");
Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();  
System.out.println("buckets: [");
while (teamBucketIt .hasNext()) {  
            Bucket buck = teamBucketIt .next(); 
            String key=(String) buck.getKey();
            long count = buck.getDocCount(); 
            System.out.println("{key:"+ key +",doc_count:"+count +"}");
            
            Map<String, Aggregation> map =  buck.getAggregations().asMap();
            InternalAvg internalAvg= (InternalAvg) map.get("avg_by_tags");
            System.out.println("   {name:"+ internalAvg.getName() +",avg:"+internalAvg.getValue() +"}");
        }  
System.out.println("]");

第四个数据分析需求:计算每个tag下的商品的平均价格,并且按照平均价格降序排序

GET /ecommerce/product/_search
{
    "size": 0,
    "aggs" : {
        "all_tags" : {
            "terms" : { "field" : "tags", "order": { "avg_price": "desc" } },
            "aggs" : {
                "avg_price" : {
                    "avg" : { "field" : "price" }
                }
            }
        }
    }
}

Java客户端代码:

AvgAggregationBuilder builder = AggregationBuilders.avg("avg_by_tags").field("price");
SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(
  AggregationBuilders.terms("group_by_tags").field("tags").order(Order.aggregation("avg_by_tags ", true))
  .subAggregation(builder)
)
.execute().actionGet();


   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
StringTerms teamAgg = (StringTerms) aggrMap.get("group_by_tags");
Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();  
System.out.println("buckets: [");
while (teamBucketIt .hasNext()) {  
            Bucket buck = teamBucketIt .next(); 
            String key=(String) buck.getKey();
            long count = buck.getDocCount(); 
            System.out.println("{key:"+ key +",doc_count:"+count +"}");
            
            Map<String, Aggregation> map =  buck.getAggregations().asMap();
            InternalAvg internalAvg= (InternalAvg) map.get("avg_by_tags");
            System.out.println("   {name:"+ internalAvg.getName() +",avg:"+internalAvg.getValue() +"}");
        }  
System.out.println("]");

第五个数据分析需求:按照指定的价格范围区间进行分组,然后在每组内再按照tag进行分组,最后再计算每组的平均价格

GET /ecommerce/product/_search
{
  "size": 0,
  "aggs": {
    "group_by_price": {
      "range": {
        "field": "price",
        "ranges": [
          {
            "from": 0,
            "to": 20
          },
          {
            "from": 20,
            "to": 40
          },
          {
            "from": 40,
            "to": 50
          }
        ]
      },
      "aggs": {
        "group_by_tags": {
          "terms": {
            "field": "tags"
          },
          "aggs": {
            "average_price": {
              "avg": {
                "field": "price"
              }
            }
          }
        }
      }
    }
  }
}

Java客户端代码:

AvgAggregationBuilder builder = AggregationBuilders.avg("average_price").field("price");
TermsAggregationBuilder termsAggregationBuilder= AggregationBuilders.terms("group_by_tags").field("tags").order(Order.aggregation("average_price ", true));
termsAggregationBuilder.subAggregation(builder);

RangeAggregationBuilder RangeAggregation=
AggregationBuilders.range("group_by_price").field("price").addUnboundedTo(20)
 .addRange(20, 40).addRange(40, 50).subAggregation(termsAggregationBuilder);


SearchResponse response = client.prepareSearch("ecommerce").setTypes("product")
.addAggregation(RangeAggregation)
.execute().actionGet();


   Map<String, Aggregation> aggrMap = response.getAggregations().asMap();
   Range teamAgg = (Range) aggrMap.get("group_by_price");
   
Iterator<Range.Bucket> teamBucketIt = (Iterator<org.elasticsearch.search.aggregations.bucket.range.Range.Bucket>) teamAgg.getBuckets().iterator();  
System.out.println("buckets: [");
while (teamBucketIt .hasNext()) {  
Range.Bucket buck = teamBucketIt .next(); 
            String key=(String) buck.getKey();
            long count = buck.getDocCount(); 
            System.out.println("{key:"+ key +",doc_count:"+count +"}");
            Map<String, Aggregation> aggrbuckMap = buck.getAggregations().asMap();
            StringTerms stringTerms = (StringTerms) aggrbuckMap.get("group_by_tags");
            Iterator<Bucket>  iterator= stringTerms.getBuckets().iterator();  
            System.out.println("buckets: [");
            while (iterator .hasNext()) {  
                Bucket _bucket = iterator .next(); 
                String _key=(String) _bucket.getKey();
                long _count = _bucket.getDocCount(); 
                System.out.println("{key:"+ _key +",doc_count:"+_count +"}");
                Map<String, Aggregation> _map =  _bucket.getAggregations().asMap();
                InternalAvg internalAvg= (InternalAvg) _map.get("average_price");
                System.out.println("   {name:"+ internalAvg.getName() +",avg:"+internalAvg.getValue() +"}");
            }  
        }  
System.out.println("]");

发表评论