ElasticsearchのBulk API効果を実測する
Elasticsearchには、クライアントから複数のリクエストをまとめて送信するBulk API機能がある。JDBCのexecuteBatchのようなイメージで使える。
JDBCの場合、Oracleのドキュメントでは、バッチ単位のガイドライン*1として「バッチ・サイズを50から100の一般的な範囲に保つことをお薦めします」が示されている。Elasticsearchの場合、バルクサイズごとにどの程度スループットに差が出るのか実測してみた。
測定環境
- Elasticsearch 6.0.0-rc2
- Elasticsearch6で導入されたJava High Level REST Client
- JUnit 5
- Oracle JDK 1.8.0_152
- MacBook Pro (13-inch, Late 2016)
- テストアプリケーションからlocalhost上のElasticsearchに接続
テストの流れ
- @BeforeEachでテスト用のインデックスとそのマッピングを作成。自動マッピング作成によってテスト結果が揺れないようにするため。
- テスト実行。JIT影響を鑑みて10回実行する。10回中一番早かったレスポンスを測定対象とする。
- @AfterEachでテスト用インデックスを削除
テストパターン
テストコード全体は以下の通り。
JSON処理の高速化についてはKOMIYAさんの記事を参考に、jacksonのAfterburnerModuleを使ったり、JSONをStringではなくbyte[]で扱っている。テストコードの全体像はGitHub - n-agetsu/elastic-restclientを参照。
class IndexPerformanceTest { private static final Logger LOG = LogManager.getLogger(IndexPerformanceTest.class); private static final HttpHost ES_HOST = new HttpHost("localhost", 9200, "http"); private static final int SEND_DOC_NUM = 10000; private static final String INDEX = "posts"; private static final String TYPE = "doc"; private static ObjectMapper BURN_MAPPER; private static RestHighLevelClient highLevelClient; private static RestClient client; @BeforeAll static void setup() throws JsonProcessingException { highLevelClient = new RestHighLevelClient(RestClient.builder(ES_HOST)); client = RestClient.builder(ES_HOST).build(); BURN_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); } @BeforeEach void beforeEach() throws IOException { // create mapping // Use low level REST client, // because Elasticsearch6.0-rc2 high level REST Client doesn't support Mapping API. XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.startObject("settings") .field("number_of_shards", 1) .endObject() .startObject("mappings") .startObject(TYPE) .startObject("properties") .startObject("user") .field("type", "text") .endObject() .startObject("postDate") .field("type", "date") .endObject() .startObject("message") .field("type", "text") .endObject() .endObject() .endObject() .endObject(); } builder.endObject(); HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON); Response response = client.performRequest("PUT", "/" + INDEX, Collections.emptyMap(), entity); boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean(); if (!acknowledged) { LOG.error("create mapping posts failed, cause {}" + EntityUtils.toString(response.getEntity())); System.exit(1); } } @AfterEach void afterEach() throws IOException { // delete index for cleanup // Use low level REST client, // because Elasticsearch6.0-rc2 high level REST Client doesn't support DELETE INDEX API. Response response = client.performRequest("DELETE", "/" + INDEX); boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean(); if (!acknowledged) { LOG.error("delete index posts, cause {}" + EntityUtils.toString(response.getEntity())); System.exit(1); } } @DisplayName("Index Performance Test - Non Bulk API") @RepeatedTest(value = 10, name = RepeatedTest.LONG_DISPLAY_NAME) void testIndexRequest() throws IOException { for (int i = 0; i < SEND_DOC_NUM; i++) { Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch"); IndexRequest request = new IndexRequest() .index(INDEX).type(TYPE).id(String.valueOf(i)) .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON); highLevelClient.index(request); } } @DisplayName("Index Performance Test - Bulk API") @ParameterizedTest(name = "run {0} with bulkSize {1}") @MethodSource("repeatCountAndBulkSize") void testIndexBulkRequest(int repeatCount, int bulkSize) throws IOException { BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < SEND_DOC_NUM; i++) { Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch"); IndexRequest request = new IndexRequest() .index(INDEX).type(TYPE).id(String.valueOf(i)) .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON); bulkRequest.add(request); boolean last = i == SEND_DOC_NUM - 1; if (i % bulkSize == 0 || last) { highLevelClient.bulk(bulkRequest); if (!last) { bulkRequest = new BulkRequest(); } } } } private static Stream<Arguments> repeatCountAndBulkSize() { // test 10 times per batchSize Stream.Builder<Arguments> builder = Stream.builder(); IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10))); IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 50))); IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 100))); IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 1000))); IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10000))); return builder.build(); } @AfterAll static void cleanup() { try { highLevelClient.close(); } catch (IOException e) { LOG.error(e); } try { client.close(); } catch (IOException e) { LOG.error(e); } } }
測定結果
テスト項目 | 処理時間 |
---|---|
通常のIndex API(Bulk API未使用) | 6.46 s |
Bulk API(バルクサイズ10) | 1.08s |
Bulk API(バルクサイズ50) | 492ms |
Bulk API(バルクサイズ100) | 383ms |
Bulk API(バルクサイズ1000) | 266ms |
Bulk API(バルクサイズ10000) | 254ms |
Bulk APIの利用により、バッチサイズが10のような小さな値でも、有効なことがわかる。このケースではバルクサイズ1000ドキュメント程度が効率が良かった。
バルクサイズを大きな値にするとドキュメントが貯まるまでクライアントでデータ保持して、データの発生からElasticsearchで検索可能となるまで遅延が大きくなる。このため、いくつが良いとは一概には言えないが、まとまった単位のリクエストを投げる時には値はいずれにしても、Bulk APIを使った方が良いことは上記の値から判断できる。
Elasticsearch側の最大リクエストサイズに上限としてhttp.max_content_lengthがあるが、デフォルトは100MBと非常に大きな値である。Elasticsearchへの同時並行リクエストが多い場合は、大きなリクエストを並行に受け付けることでOutOfMemoryErrorに繋がる可能性があるが、並列数が少ない場合は積極的にバッチサイズを拡大できると考える。