見習いプログラミング日記

Javaを中心に色々なことを考えてみます。目指せ本物のプログラマ。

ElasticsearchのBulk API効果を実測する

Elasticsearchには、クライアントから複数のリクエストをまとめて送信するBulk API機能がある。JDBCのexecuteBatchのようなイメージで使える。

JDBCの場合、Oracleのドキュメントでは、バッチ単位のガイドライン*1として「バッチ・サイズを50から100の一般的な範囲に保つことをお薦めします」が示されている。Elasticsearchの場合、バルクサイズごとにどの程度スループットに差が出るのか実測してみた。

測定環境

  • Elasticsearch 6.0.0-rc2
  • Elasticsearch6で導入されたJava High Level REST Client
  • JUnit 5
  • Oracle JDK 1.8.0_152
  • MacBook Pro (13-inch, Late 2016)
  • テストアプリケーションからlocalhost上のElasticsearchに接続

テストの流れ

  1. @BeforeEachでテスト用のインデックスとそのマッピングを作成。自動マッピング作成によってテスト結果が揺れないようにするため。
  2. テスト実行。JIT影響を鑑みて10回実行する。10回中一番早かったレスポンスを測定対象とする。
  3. @AfterEachでテスト用インデックスを削除

テストパターン

  1. 通常のIndex API(Bulk API未使用)
  2. Bulk API - バルクサイズ 10/50/100/1000/10000 の5パターン

テストコード全体は以下の通り。
JSON処理の高速化についてはKOMIYAさんの記事を参考に、jacksonのAfterburnerModuleを使ったり、JSONをStringではなくbyte[]で扱っている。テストコードの全体像はGitHub - n-agetsu/elastic-restclientを参照。

class IndexPerformanceTest {
    private static final Logger LOG = LogManager.getLogger(IndexPerformanceTest.class);

    private static final HttpHost ES_HOST = new HttpHost("localhost", 9200, "http");
    private static final int SEND_DOC_NUM = 10000;
    private static final String INDEX = "posts";
    private static final String TYPE = "doc";

    private static ObjectMapper BURN_MAPPER;
    private static RestHighLevelClient highLevelClient;
    private static RestClient client;

    @BeforeAll
    static void setup() throws JsonProcessingException {
        highLevelClient = new RestHighLevelClient(RestClient.builder(ES_HOST));
        client = RestClient.builder(ES_HOST).build();
        BURN_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
    }

    @BeforeEach
    void beforeEach() throws IOException {
        // create mapping
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support Mapping API.
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("settings")
                     .field("number_of_shards", 1)
                   .endObject()
                    .startObject("mappings")
                      .startObject(TYPE)
                        .startObject("properties")
                          .startObject("user")
                            .field("type", "text")
                          .endObject()
                          .startObject("postDate")
                            .field("type", "date")
                          .endObject()
                          .startObject("message")
                            .field("type", "text")
                          .endObject()
                        .endObject()
                      .endObject()
                    .endObject();
        }
        builder.endObject();
        HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON);
        Response response = client.performRequest("PUT", "/" + INDEX, Collections.emptyMap(), entity);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("create mapping posts failed, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @AfterEach
    void afterEach() throws IOException {
        // delete index for cleanup
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support DELETE INDEX API.
        Response response = client.performRequest("DELETE", "/" + INDEX);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("delete index posts, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @DisplayName("Index Performance Test - Non Bulk API")
    @RepeatedTest(value = 10, name = RepeatedTest.LONG_DISPLAY_NAME)
    void testIndexRequest() throws IOException {
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            highLevelClient.index(request);
        }
    }

    @DisplayName("Index Performance Test - Bulk API")
    @ParameterizedTest(name = "run {0} with bulkSize {1}")
    @MethodSource("repeatCountAndBulkSize")
    void testIndexBulkRequest(int repeatCount, int bulkSize) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            bulkRequest.add(request);
            boolean last = i == SEND_DOC_NUM - 1;
            if (i % bulkSize == 0 || last) {
                highLevelClient.bulk(bulkRequest);
                if (!last) {
                    bulkRequest = new BulkRequest();
                }
            }
        }
    }

    private static Stream<Arguments> repeatCountAndBulkSize() {
        // test 10 times per batchSize
        Stream.Builder<Arguments> builder = Stream.builder();
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 50)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 100)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 1000)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10000)));
        return builder.build();
    }

    @AfterAll
    static void cleanup() {
        try {
            highLevelClient.close();
        } catch (IOException e) {
            LOG.error(e);
        }

        try {
            client.close();
        } catch (IOException e) {
            LOG.error(e);
        }
    }
}

測定結果

テスト項目 処理時間
通常のIndex API(Bulk API未使用) 6.46 s
Bulk API(バルクサイズ10) 1.08s
Bulk API(バルクサイズ50) 492ms
Bulk API(バルクサイズ100) 383ms
Bulk API(バルクサイズ1000) 266ms
Bulk API(バルクサイズ10000) 254ms

Bulk APIの利用により、バッチサイズが10のような小さな値でも、有効なことがわかる。このケースではバルクサイズ1000ドキュメント程度が効率が良かった。

バルクサイズを大きな値にするとドキュメントが貯まるまでクライアントでデータ保持して、データの発生からElasticsearchで検索可能となるまで遅延が大きくなる。このため、いくつが良いとは一概には言えないが、まとまった単位のリクエストを投げる時には値はいずれにしても、Bulk APIを使った方が良いことは上記の値から判断できる。

Elasticsearch側の最大リクエストサイズに上限としてhttp.max_content_lengthがあるが、デフォルトは100MBと非常に大きな値である。Elasticsearchへの同時並行リクエストが多い場合は、大きなリクエストを並行に受け付けることでOutOfMemoryErrorに繋がる可能性があるが、並列数が少ない場合は積極的にバッチサイズを拡大できると考える。

おまけ Javaの性能測定時はJIT影響に注意

Bulk API未使用時の10回テストした時の値を以下のグラフに示すが、JavaVMを再起動せずにmaven-surefire-pluginによって繰り返しテスト実行していくうちに、最初の測定値の2倍近く性能向上した (13秒→6秒)。デフォルトでは-XX:CompileThreshold=10000により、10000回のメソッド実行でJITコンパイル対象となり、この影響は性能に大きな差を与える。

f:id:n_agetsuma:20171113210142p:plain

Javaのアプリケーションでは、起動直後にパフォーマンスを測定せず、必ずウォームアップしてから測定する必要がある。