Elasticsearch 入门教程 – 批量操作bulk

    bulk API可以帮助我们同时完成执行多个请求,比如:createindexupdate以及delete。当你在处理类似于log等海量数据的时候,你就可以一下处理成百上千的请求,这个操作将会极大提高效率。

        bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行

        bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志

        bulk request会加载到内存里,如果数据量太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从10005000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在515MB之间

 一般语法

   {"action": {"metadata"}}

   {"data"}

示例

POST /_bulk

    { "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} 

    { "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}

    { "test_field":    "test12" }

    { "index":  { "_index": "test_index", "_type": "test_type", "_id": "2" }}

    { "test_field":    "replaced test2" }

    { "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3}}

    { "doc" : {"test_field2" : "bulk test1"}}

相关解释:

    1. delete:删除一个文档,只要1个json串即可

    2. create:与 PUT /index/type/id/_create相等,强制创建

    3. index:普通的put操作,可以创建文档,如果存在则是全量替换

    4. update:执行partial update操作

举个比较简单的示例:

POST /_bulk?pretty
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title":    "My first blog post", "readCount" : 10 }
{ "index":  { "_index": "website", "_type": "blog" }}
{ "title":    "My second blog post" , "readCount" : 20 }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }

bulk每次传多少命令(数据)合适

     整个批量请求需要被加载到接受我们请求节点的内存里,所以请求越大,给其它请求可用的内存就越小。有一个最佳的bulk请求大小。超过这个大小,性能不再提升而且可能降低。

     最佳大小,当然并不是一个固定的数字。它完全取决于你的硬件、你文档的大小和复杂度以及索引和搜索的负载。幸运的是,这个最佳点(sweetspot)还是容易找到的:

     试着批量索引标准的文档,随着大小的增长,当性能开始降低,说明你每个批次的大小太大了。开始的数量可以在1000~5000个文档之间,如果你的文档非常大,可以使用较小的批次。

     通常着眼于你请求批次的物理大小是非常有用的。一千个1kB的文档和一千个1MB的文档大不相同。一个好的批次最好保持在5-15MB大小间。

Java客户端演示:

BulkRequestBuilder bulkRequest = client.prepareBulk();
       bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
               .setSource(XContentFactory.jsonBuilder()
                       .startObject()
                           .field("user", "kimchy")
                           .field("postDate", new Date())
                           .field("message", "trying out Elasticsearch")
                       .endObject()));
       bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
               .setSource(XContentFactory.jsonBuilder()
                       .startObject()
                           .field("user", "kimchy")
                           .field("postDate", new Date())
                           .field("message", "another post")
                       .endObject()));
       BulkResponse response = bulkRequest.get();
       
       BulkItemResponse [] bulkItemResponse=response.getItems();
       for(BulkItemResponse response2:bulkItemResponse){
       	System.out.println(response2.getResponse());
       }

Java客户端演示2:

Long count = 100000L;
String index = "bigdata";
String type = "student1";
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < count; i++) {
    Map<String, Object> ret = new HashMap<String, Object>();
    ret.put("recordtime", 11);
    ret.put("area", 22);
    ret.put("usertype", 33);
    ret.put("count", 44);
    bulkRequest.add(client.prepareIndex(index, type).setSource(ret));
    // 每10000条提交一次
    if (i % 10000 == 0) {
        bulkRequest.execute().actionGet();
    }
}

发表评论