ElasticsearchのBulk API効果を実測する

Elasticsearchには、クライアントから複数のリクエストをまとめて送信するBulk API機能がある。JDBCのexecuteBatchのようなイメージで使える。

JDBCの場合、Oracleのドキュメントでは、バッチ単位のガイドライン*1として「バッチ・サイズを50から100の一般的な範囲に保つことをお薦めします」が示されている。Elasticsearchの場合、バルクサイズごとにどの程度スループットに差が出るのか実測してみた。

測定環境

  • Elasticsearch 6.0.0-rc2
  • Elasticsearch6で導入されたJava High Level REST Client
  • JUnit 5
  • Oracle JDK 1.8.0_152
  • MacBook Pro (13-inch, Late 2016)
  • テストアプリケーションからlocalhost上のElasticsearchに接続

テストの流れ

  1. @BeforeEachでテスト用のインデックスとそのマッピングを作成。自動マッピング作成によってテスト結果が揺れないようにするため。
  2. テスト実行。JIT影響を鑑みて10回実行する。10回中一番早かったレスポンスを測定対象とする。
  3. @AfterEachでテスト用インデックスを削除

テストパターン

  1. 通常のIndex API(Bulk API未使用)
  2. Bulk API - バルクサイズ 10/50/100/1000/10000 の5パターン

テストコード全体は以下の通り。
JSON処理の高速化についてはKOMIYAさんの記事を参考に、jacksonのAfterburnerModuleを使ったり、JSONをStringではなくbyte[]で扱っている。テストコードの全体像はGitHub - n-agetsu/elastic-restclientを参照。

class IndexPerformanceTest {
    private static final Logger LOG = LogManager.getLogger(IndexPerformanceTest.class);

    private static final HttpHost ES_HOST = new HttpHost("localhost", 9200, "http");
    private static final int SEND_DOC_NUM = 10000;
    private static final String INDEX = "posts";
    private static final String TYPE = "doc";

    private static ObjectMapper BURN_MAPPER;
    private static RestHighLevelClient highLevelClient;
    private static RestClient client;

    @BeforeAll
    static void setup() throws JsonProcessingException {
        highLevelClient = new RestHighLevelClient(RestClient.builder(ES_HOST));
        client = RestClient.builder(ES_HOST).build();
        BURN_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
    }

    @BeforeEach
    void beforeEach() throws IOException {
        // create mapping
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support Mapping API.
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("settings")
                     .field("number_of_shards", 1)
                   .endObject()
                    .startObject("mappings")
                      .startObject(TYPE)
                        .startObject("properties")
                          .startObject("user")
                            .field("type", "text")
                          .endObject()
                          .startObject("postDate")
                            .field("type", "date")
                          .endObject()
                          .startObject("message")
                            .field("type", "text")
                          .endObject()
                        .endObject()
                      .endObject()
                    .endObject();
        }
        builder.endObject();
        HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON);
        Response response = client.performRequest("PUT", "/" + INDEX, Collections.emptyMap(), entity);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("create mapping posts failed, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @AfterEach
    void afterEach() throws IOException {
        // delete index for cleanup
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support DELETE INDEX API.
        Response response = client.performRequest("DELETE", "/" + INDEX);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("delete index posts, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @DisplayName("Index Performance Test - Non Bulk API")
    @RepeatedTest(value = 10, name = RepeatedTest.LONG_DISPLAY_NAME)
    void testIndexRequest() throws IOException {
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            highLevelClient.index(request);
        }
    }

    @DisplayName("Index Performance Test - Bulk API")
    @ParameterizedTest(name = "run {0} with bulkSize {1}")
    @MethodSource("repeatCountAndBulkSize")
    void testIndexBulkRequest(int repeatCount, int bulkSize) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            bulkRequest.add(request);
            boolean last = i == SEND_DOC_NUM - 1;
            if (i % bulkSize == 0 || last) {
                highLevelClient.bulk(bulkRequest);
                if (!last) {
                    bulkRequest = new BulkRequest();
                }
            }
        }
    }

    private static Stream<Arguments> repeatCountAndBulkSize() {
        // test 10 times per batchSize
        Stream.Builder<Arguments> builder = Stream.builder();
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 50)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 100)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 1000)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10000)));
        return builder.build();
    }

    @AfterAll
    static void cleanup() {
        try {
            highLevelClient.close();
        } catch (IOException e) {
            LOG.error(e);
        }

        try {
            client.close();
        } catch (IOException e) {
            LOG.error(e);
        }
    }
}

測定結果

テスト項目 処理時間
通常のIndex API(Bulk API未使用) 6.46 s
Bulk API(バルクサイズ10) 1.08s
Bulk API(バルクサイズ50) 492ms
Bulk API(バルクサイズ100) 383ms
Bulk API(バルクサイズ1000) 266ms
Bulk API(バルクサイズ10000) 254ms

Bulk APIの利用により、バッチサイズが10のような小さな値でも、有効なことがわかる。このケースではバルクサイズ1000ドキュメント程度が効率が良かった。

バルクサイズを大きな値にするとドキュメントが貯まるまでクライアントでデータ保持して、データの発生からElasticsearchで検索可能となるまで遅延が大きくなる。このため、いくつが良いとは一概には言えないが、まとまった単位のリクエストを投げる時には値はいずれにしても、Bulk APIを使った方が良いことは上記の値から判断できる。

Elasticsearch側の最大リクエストサイズに上限としてhttp.max_content_lengthがあるが、デフォルトは100MBと非常に大きな値である。Elasticsearchへの同時並行リクエストが多い場合は、大きなリクエストを並行に受け付けることでOutOfMemoryErrorに繋がる可能性があるが、並列数が少ない場合は積極的にバッチサイズを拡大できると考える。

おまけ Javaの性能測定時はJIT影響に注意

Bulk API未使用時の10回テストした時の値を以下のグラフに示すが、JavaVMを再起動せずにmaven-surefire-pluginによって繰り返しテスト実行していくうちに、最初の測定値の2倍近く性能向上した (13秒→6秒)。デフォルトでは-XX:CompileThreshold=10000により、10000回のメソッド実行でJITコンパイル対象となり、この影響は性能に大きな差を与える。

f:id:n_agetsuma:20171113210142p:plain

Javaのアプリケーションでは、起動直後にパフォーマンスを測定せず、必ずウォームアップしてから測定する必要がある。

Kafkaコンシューマのリバランスはいつ行われるか

Kafkaのコンシューマグループに、コンシューマを追加すると、各コンシューマがどのパーティションを読み取るか再割り当てするリバランスが行われる。

Kafka0.11.0.1を対象に、以下の観点でKafkaの実装を調べてみた結果をまとめる。

  • リバランスはいつ行われるのか
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
  • リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスはいつ行われるのか

KafkaConsumer.poll()メソッドの内部で行われる。

Kafkaのコンシューマの実装はだいたい以下のようになる。コード全体はgist参照。
Kafka Comsumer Sample · GitHub

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

コンシューマを起動し、どのパーティションのサブスクライブを始めるのかの決定は、poll(long timeout)メソッドの内部で行われる。このとき、コンシューマからKafkaプロトコルJoinGroupと呼ばれるリクエス*1をブローカーに送信し、その応答メッセージにどのパーティションのサブスクライブをすべきか含まれている。

リバランスの処理はpollメソッドを呼び出したスレッドで行われ、メッセージのフェッチ要求の前に行われる。ここまでの挙動は、KafkaConsumer.pollOnceメソッドを読み進めていくとわかる。

なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか

既存コンシューマグループにコンシューマが追加された場合、既にコンシューマグループに含まれている全てのコンシューマからJoinGroupリクエストが来るまで、JoinGroupレスポンスが返されず、パーティションの割り当てが行われないため。

既存のコンシューマによっては、メッセージ受信後の処理が重い、マシンリソース枯渇などの要因により、次のpoll(long timeout)が呼ばれるまで時間がかかるケースがあるだろう。

参加したいコンシューマグループに含まれる、全てのコンシューマでpoll(long timeout)が実行されるまでは、新規参加したコンシューマにパーティションが割り当てられず、メッセージの配信は始まらない。

リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスが起こった時に、オフセットコミットのタイミングによっては、既存コンシューマと新規コンシューマに重複メッセージが配信されないかについて。

これはオフセットのコミット方式にも依存する。

autoCommitの場合

既存コンシューマでは、poll()メソッド内部のリバランスの前処理として、commitSync()によるオフセットコミットを行うため、リバランスの発生を原因としたメッセージ重複は起こらない*2。新規に追加されたコンシューマでは、リバランス直前にコミットされたオフセットを読み込んでサブスクライブを開始する。

このリバランス直前のオフセットコミット処理はConsumerCoodinator.onJoinPrepareメソッドで実装されている。

手動コミットの場合

前述のコンシューマのコード例を再掲するが、以下のようにpoll()の直前でcommitSync()を実行して正常終了している場合、poll()内部でリバランスが発生した時には常に最新のオフセットがコミットされているため、メッセージ重複は起こらない。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

上記のコードのcommmitSync()をcommitAsync()に置き換えた場合、メッセージ重複が起こらないかについては、実装から読み取れなかった。

手元で調べる限りでは、Kafkaは同一クライアントからのリクエストを逐次的に処理するように見える。commitAsync()によるオフセットのコミット要求の処理が終わるまで、poll()メソッドから送られたJoin Groupリクエストのハンドリングを行わない。しかし、ネットワーク遅延などの要因により、オフセットコミット要求とJoin Group要求の順序が逆になってKafkaに届いた場合、Join Group要求時に最新オフセットがコミットされていない可能性がある。オライリーKafka: The Definitive Guideで紹介されているように、ConsumerRebalanceListenerを実装して、リバランス開始前にコールバックされるonPartitionsRevokedメソッドでcommitSync()を実行した方が安全だろう。

まとめ

  • リバランスはいつ行われるのか
    • KafkaConsumer.poll()メソッドの内部で行われる
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
    • 既存コンシューマがpoll()を実行してJoin Groupリクエストが投げられるのをKafkaが待っているため
  • リバランスが起こった時に重複メッセージ処理は発生しないか
    • autoCommitではリバランス直前にコミットされる。手動コミットでは、poll()の直前にcommitSync()を発行していれば、重複メッセージ処理は発生しない。

*1:他にKafkaにどのようなプロトコルがあるかは KafkaApis.scalaを読むと何となくわかる

*2:autoCommitではコンシューマプロセスクラッシュ時に、前回コミット以降に処理されたメッセージが重複メッセージ配信されるため、重複メッセージ配信が許容できない要件では利用できない。

Kafkaコンシューマのrecords-lagは何の値か

Kafkaコンシューマの監視した方が良いメトリクスとして、datadog*1のドキュメントや、HortonworkによるKafkaのベストプラクティス*2ではrecords-lagが紹介されている。

このコンシューマのlagとは具体的にどのように計算された値か、以降にまとめる。以下の内容は、KafkaのJavaクライアント0.11.0.0で調べている。

records-lagについて

records-lagとは、Kafkaのコンシューマのメトリクスで、コンシューマプロセスのJMX MBeanとして公開されている。この値は、Kafkaのブローカー側の最大オフセットと、コンシューマが処理済みのオフセットの差分を示す。jconsoleで見ると、以下のようにmy-topic-0.records-lagmy-topic-0.records-lag-avgmy-topic-0.records-lag-maxのように、トピック名-パーティション番号の命名規則で公開される。例の場合はコンシューマが1つのパーティションしかサブスクライブしていないが、複数のトピックやパーティションをサブスクライブしていた場合は、その分の属性が作られる。

f:id:n_agetsuma:20171009204750p:plain


例えば、ブローカー側のあるパーティションのオフセットが10000で、コンシューマが2000まで読み込んでいた場合、records-lagは8000で8000件のメッセージは未処理であることを示す。

records-lagは0は基本的に0であることが望ましい。このラグが定常的に発生し、かつ増え続けている場合、プロデューサからのメッセージ送信の頻度に対して、コンシューマの処理性能が足りていないことを示す。コンシューマの処理の中で受信したメッセージをDBに書き込んでおり、そのDBの処理性能が遅延している。または、そもそもコンシューマの処理性能が足りず、コンシューマグループにコンシューマを足してスケールアウトの必要がある等が、ラグが増え続ける一般的な原因である。何らかの対処を行わないと、まだコンシューマに処理されていないメッセージがlog.retention.hours(デフォルト1週間)以上経過して削除されてしまう。

以前のKafkaではコンシューマ全体で過去最も大きなラグが発生したかをrecords-lag-maxで確認できたが、それがどのトピックのどのパーティションが発生したものなのか、また現在のラグはいくつなのかを把握することはできなかった。しかし、現在ではKakfa0.10.2.0の以下のissueの対応により、サブスクライブしているパーティションごとに、現在値、平均値、過去の最大値が確認可能となっている。
[KAFKA-4381] Add per partition lag metric to KafkaConsumer. - ASF JIRA

records-lagはどうやって計算されているか

records-lagがいつ、どうやって計算された値なのかを把握するためには、Kafka Javaクライアントのコンシューマの仕組みを簡単に把握する必要がある。

以下のように、よくあるコンシューマのpollループのコードを例に考える。KafkaConsumer.pollメソッドがどうやってメッセージをブローカーから取得しているのか、実装を読んでみた。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpollメソッドは何をしてるのか?
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", 
            record.partition(), record.offset(), record.key(), record.value());
    }
}

全体を図にすると以下のようになる。
f:id:n_agetsuma:20171009211504p:plain

  1. pollメソッドを呼び出すと、Fetcherはコンシューマに割り当てられている各パーティションのデータをフェッチする。この時に、パーティションごとに最大でデフォルト1MB(max.partition.fetch.bytes)のメッセージをまとめてフェッチする。このフェッチのタイミングで、フェッチしたパーティションの現在の最大オフセット値を取得。
  2. フェッチ済みのメッセージの中から、最大でデフォルト500メッセージ(max.poll.records)をpollメソッドの呼び出し元に返す。
    • 既に500メッセージ以上をフェッチ済みであった場合は、新たなフェッチを行わず、フェッチ済みのバッファからメッセージを取得。
    • pollメソッドの呼び出し時に、フェッチ時に取得したパーティションの最大オフセットと、pollメソッドで返したオフセットの差分としてrecords-lagを計算。

例えば、フェッチ時に取得したパーティションの最大オフセットが10000であり、pollメソッドで500メッセージを返すと、records-lagは9500となる。もう一回pollメソッドを回すと、バッファ済みメッセージから500メッセージを取得して、records-lagは9000に縮小する。

遅延が発生してもrecords-lagが拡大しない注意したいパターン

先ほど最大オフセットはフェッチ時に取得、records-lagの取得はpoll時に行うと太字にしたのは意味がある。この仕組みにより、コンシューマで遅延が拡大していても、records-lagが拡大しないケースがある。

  1. 10000メッセージフェッチして、今のところ1000メッセージがpoll済み。この時点でのrecords-lagは9000。
  2. 新たなメッセージをフェッチする前に、ブローカー側に10000メッセージが追加。しかし、この時点で計算されるrecords-lagは9000。
    • 何故ならば、新たなフェッチが発生しておらず、records-lagの計算に使われるパーティションの最大オフセット値が10000のままであるため。フェッチ済みのメッセージの処理が終わるまでは、records-lagの値は減り続ける。
    • 次のフェッチのタイミングでrecords-lagが一気に拡大する。

records-lagはフェッチ時に取得したパーティションの最大オフセット値をベースとしているため、フェッチ後に大量にメッセージの追加が行われても、次のフェッチまでrecords-lagの値には反映されない仕組みになっている。

Burrowの場合

若干脱線するが、LinkedInが作っているコンシューマのラグ監視計測ツールBurrowでは、再現はできていないが、仕組み上このような問題は発生しないと思われる。Burrowではコンシューマのコミット済オフセットが入った__consumer_offsetトピックの値と、データが入ったトピックの最大オフセット値の両方を取得し、その値の差分でラグを計算しており、コンシューマのJMX MBeanの値を使っていない。

まとめ

  • Kafkaコンシューマの監視にはJMX Meanのrecords-lagが有効
  • Kakfa0.10.2.0からはパーティションごとにrecords-lagの監視が可能
  • メッセージのフェッチ直後に、大量のメッセージ追加があると、records-lagの値には反映されないので注意。

tcpdumpからKafkaのリトライ可能エラーを追う

Kafkaのメッセージロストの原因の1つに、プロデューサの設定のretries(デフォルト0:リトライしない)の未設定がある。

Kafkaクライアントの送信リトライ

プロデューサからメッセージ送信時のエラーには、retires設定によりKafkaのクライアントライブラリ内で自動リトライ可能なエラーと、リトライせずにProducer.sendメソッドの呼び出し元に例外を投げるエラーがある。自動リトライ可能なエラーで発生する例外は、org.apache.kafka.common.errors.RetriableExceptionの継承クラスである。代表的なものを以下に示す。

retiresによりリトライするエラー

  • NotLeaderForPartitionException
    • リーダーパーティションではないブローカーにメッセージが送られた場合のエラー応答。ブローカー障害によるフェールオーバ中にメッセージを送信すると起こりやすい。
  • NetworkException
    • ブローカーからレスポンスがない場合。メッセージを送ってから、ackが返される間にブローカーやNWに障害が発生すると起こりやすい。
  • 一時的な障害で、リトライすれば成功の見込みがある場合

リトライしないエラー

  • SerializationException
    • key.serializerやvalue.serializerのシリアライザでエラーが発生した場合
  • RecordTooLargeException
    • ブローカー設定のreplica.fetch.max.bytes(デフォルト1048576バイト)を超えるメッセージが送信された場合
  • リトライしても成功の見込みがないエラー

Kafkaとしてはリトライ可能なエラーを返しているが、クライアントがリトライしないでメッセージロストしている場合、クライアント側のログには出力されるが、Kafkaのブローカー側には特に何も出ていないこともある。Kafkaクラスタを運用管理する立場で、クライアント側より「たまにメッセージロストしているように見えます。クライアント側のログはありません。」と申告を受けても、tcpdumpの取得によりリトライエラーが返されていることを特定できる場合がある。

WiresharkによるKafkaプロトコル解析

WiresharkにはKafkaプロトコルを解釈する機能がある。0.10系までのプロトコル解釈には対応しているが、最新の0.11系には2017/09/26時点ではまだ対応していない。

デフォルトでは9092ポートの通信をKafkaの通信として解釈している。もし9092ポート以外もKafkaプロトコルとして解釈させたい場合は、Wiresharkの[Preferences] - [Protocols] - [Kafka] の設定で変更できる。
f:id:n_agetsuma:20170926171300p:plain

この状態でKafkaの通信を含むpcapファイルを読み込ませると、以下のようにKafkaプロトコルを解釈して表示される。
f:id:n_agetsuma:20170926171024p:plain

Kafkaは一般的に高トラフィックな環境で使われることが多いため、ローテーションでpcapを定期的に捨てるようにし、バッファも広げてなるべくdropped by kernelのパケットが増えないようにする。IO影響をなるべく避けるために、pcapファイルの出力先はlog.dirsの設定先以外にする。

tcpdump -w /path/kafka.pcap -i enp0s3 -C 1024 -W 10 -z gzip -nn -B 524288

取得したpcapファイルをwiresharkに読み込ませ、フィルタ設定にkafka.error > 0を設定すると、エラー応答の一覧が表示される。エラーコード0はNoErrorを示し、異常があると1以上のエラーコードを返すため、0より大きい値でフィルタする。

f:id:n_agetsuma:20170926132913p:plain

Kafkaのレスポンスメッセージはエラーコードが含まれており、上記の例の場合ではNot Leader For Partition (6)、リーダーパーティションではないブローカーにメッセージが送信されたため、エラーコード6を返している。

エラーコードの一覧と例外クラスとのマッピングは、KafkaのErrorMapping.scalaにある。今回の例であれば、エラーコード6がNotLeaderForPartitionExceptionに結びついており、NotLeaderForPartitionExceptionはRetriableExceptionの継承クラスのため、リトライ可能エラーが発生している。

ErrorMapping.scalaには、コンシューマリクエスト時などのすべてのエラーコードが記載されているが、その中でもプロデューサからのメッセージ送信時に発生するエラーコードの一覧は、ProduceResponseクラスにコメントしてまとめられている。

クライアント側ログによるリトライ発生の確認

前述のpcapの確認結果より、少なくともブローカー側からはリトライ可能なエラーが返されていることがわかった。クライアントにretriesの設定が有効となっており、retry.backoff.ms(デフォルト100ミリ秒)の間隔でリトライされているかは、クライアント側のWARNログ出力から確認できる。

2017-09-25 21:22:24.954 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - Got error produce response with correlation id 18504 on topic-partition my-topic-1, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION
2017-09-25 21:23:45.469 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - Got error produce response with correlation id 87895 on topic-partition my-topic-1, retrying (9 attempts left). Error: NETWORK_EXCEPTION

my-topicのパーティション番号1でリトライが発生しており、その原因となるエラーが出力されている。

まとめ

  • Kafkaのエラーにはプロデューサで送信リトライするエラーとしないエラーがある
  • プロデューサ設定のretriesはデフォルト0であり、自動リトライ可能なエラーが発生しても送信リトライしない
  • Kafka0.10系まであればWiresharkでKafkaプロトコルを解釈できる。自動リトライ可能なエラーの発生有無をpcapから特定できる。
  • 送信リトライの発生はクライアント側でWARNログとして出力される

Elastic {ON} 2017 3/7 ノート(1日目)

今日からElastic {ON}が始まった。会場はサンフランシスコのPier48という、AT&Tパーク近くの倉庫のようなイベント会場。キーノートを行う一番大きなセッション部屋はそれなりに大きい。
f:id:n_agetsuma:20170307123905j:plain

キーノート

日本のElastic {ON} Tourでもお見かけしたことのある、CTOのShay Banonさんを中心に話は進む。
f:id:n_agetsuma:20170307130639j:plain

新しい機能の紹介に絞ると、以下のような話題がある。

  • Kibanaの新しいVisualize
  • Filebeat module (coming in 5.3)
  • Machine Learning (coming in 5.4)
  • Elastic Cloud Enterprise (Beta now)
  • Elastic SQL (coming soon)
  • Kibana canvasプラグイン (coming soon)

Kibanaの新しいVisualize

導入時期については聞き取れず。以下のダッシュボードのような、今までにないVisualizeが追加される。
f:id:n_agetsuma:20170307133131j:plain

新しいVisualizeの1つであるtime series。Inboundのトラフィックを中心線より上、Outboundを下にグラフ化し、In/Outの割合がわかりやすくなる。
f:id:n_agetsuma:20170307133709j:plain

Filebeat module (coming in 5.3)

f:id:n_agetsuma:20170307132846j:plain

詳細はGithubのissuehttps://github.com/elastic/beats/issues/3159 に書かれていたが、Filebeat moduleとは、ざっくりいうと、以下のような設定をまとめたもの。

  • Filebeatのprospectorのyml設定
  • ElasticsearchのInjest Nodeのパイプライン定義
  • Elasticsearchのフィールド定義(各フィールドの型情報など)
  • Kibanaダッシュボード

これは、キーノートの後に行われたセッションで説明されていたが、今までのログの取り込みからは可視化までは、以下のようなフローを"ログごとに"繰り返し行う必要があった。f:id:n_agetsuma:20170307170629j:plain

  1. beatの設定をして
  2. LogstashでGrok書いて (またはInjest Nodeにパイプライン定義して)
  3. Elasticsearchのtemplate書いてスキーマ定義して
  4. KibanaでVisualizeとダッシュボード作る

Filebeat moduleはこれらの"繰り返し"行う作業を一つの既成のモジュールにまとめて、以下のようにbeatの-setupオプションを実行すると、今までやっていたような設定を1コマンドでやってくれるらしい。
f:id:n_agetsuma:20170307132431j:plain

Filebeat moduleのデモとして、Nginxログのダッシュボード、SSHアクセスログ(ログイン成功/失敗数など)、sudo実行履歴の分析が紹介されていた。
f:id:n_agetsuma:20170307132513j:plain

masterブランチのFilebeat moduleのドキュメントには、Apache2、MySQL、Nginx、Systemの各モジュールがあると示されている。

Machine Learning (comping in 5.4)

f:id:n_agetsuma:20170307135756j:plain
日本のElasticのイベントではPrelertという名前で紹介されていた、機械学習プラグインがMachine Learningという名前で紹介された。*1

IT Operationデータと呼ばれていた、仮想的な業務に対するデータ傾向分析のデモ。KibanaのMachine Learningプラグインでは、Jobという単位でタスクを作成。過去のデータから自動的に傾向を抽出し、特異点を発見する。デモの範囲では、いつもの折れ線グラフを作る場合と同じぐらいの入力をしただけで、過去のデータが傾向を抽出。左からデータが増えるごとに、傾向を示す薄い色の範囲が狭くなり、画面右側のようにデータが大きく動くと、また傾向の範囲の精度が荒くなっている。
f:id:n_agetsuma:20170307135107j:plain

特異点は、heat map chartのような図で、赤や青で上下に値が振れ過ぎているのを可視化していた。
f:id:n_agetsuma:20170307135521j:plain
ここは私の想像の部分だが、以下のような用途で使えるかなぁと思った。

  • 一般消費者向けのお店(スーパーとか衣類店など)で、雨降った日に必要以上に売り上げが落ち込んでいる店を一発で可視化する
  • システムリソース解析で、色んなサービスがRESTやSOAPで絡み合っているシステムにおいて、業務量がそれほど増えてないのに、他サービスと比較してリソース消費量が多いサービスを一発で抽出する(アプリケーションに性能バグがないか抽出)

Elastic Cloud Enterprise (Beta now)

f:id:n_agetsuma:20170307141825j:plain

Elastic Cloudはフルマネージドなサービスに対して、自分の好きなpublic or privateクラウド、またはベアメタルのハード上で、Elastic Cloudのような管理機能を持つクラスタを構築するためのもの。既にBeta1が出ており、ドキュメントは以下から参照できる。
What Is Elastic Cloud Enterprise? | Elastic Cloud Enterprise Reference [1.0.0-beta1] | Elastic

自分の所属している組織で考えると、Elasticスタックを初めて使うプロジェクトに対して、お試し環境として貸し出し用に使いたいなと考えた。外に出せないデータ向けの、オンプレミスのお試し環境は欲しい。プロダクション環境用途として使われると、一人で運用管理するのは少々荷が重い。

Elasticsearch SQL (coming soon)

f:id:n_agetsuma:20170307143800j:plain

ElasticsearchクエリにSQLが使えるAPI /_sql の導入。
SELECT文だけでなく、DESCRIBEによるスキーマ情報の確認、EXPLAINによるクエリ実行計画の確認もできる。RDBMSライクに操作可能。
f:id:n_agetsuma:20170307143324j:plain
f:id:n_agetsuma:20170307143458j:plain

Kibana canvasプラグイン (coming soon)

pptスライドやポスターのようなダッシュボードが作れるプラグイン。以下のスライドのようなものに含まれる、グラフはVisualizeで作成したもので、文字の部分はpptのようにオブジェクトして埋め込んで操作していた。
f:id:n_agetsuma:20170307144537j:plain
縦書きの例も紹介。
f:id:n_agetsuma:20170307144724j:plain
動画は撮れなかったため、文字では伝わりにくいが、通常のダッシュボードと同様に毎秒更新のような設定をしておくと、グラフが毎秒最新のデータで更新される。ライブデータでプレゼンできるので、人に見せる時にはインパクトがある。
今までのダッシュボードはKibanaっていうのが一目でわかる見た目しか実現できなかったため、システムにあまり詳しくはない、ビジネス寄りの方へのレポート作成として重宝できるプラグインと思われる。

各セッションについては後日追記予定。

What's Evolving in Elasticsearch?

TODO
セッションの前半はElasticsearch5.2までの振り返り。セッション後半が今後の話。
Elasticsearch 6 and beyond

  • Doc Value 2.0 (Lucene 7)
  • Index Sorting (Lucene 7) https://github.com/elastic/elasticsearch/issues/6720
  • Sequence Number (Internal use)
  • Rolling Upgrades 6.0
    • 5.x系最新から6.x系最新へのアップデート時はクラスタ全体のリスタートが不要に
    • ユーザがアップストリームについて来やすいようになる
  • Cross Manjor Version Search
  • 異なるメジャーバージョンの並行運用が可能。Kibana -> Elasticsearch6.x -> Elasticsearch5.xの流れのように、新しいElasticsearchから古いElasticsearchにリクエストを転送することで、メジャーバージョンが共存したクラスタへのクエリを実現する。

What's Cookin' in Kibana?

TODO
こちらもセッションの前半はKibana5.2までの振り返り。tag cloudやheatmap chart、Logstash Monitoring、I18N(ロシア語などへの一次対応)、5.1.1のKibana高速化など。
セッション後半が今後の話。

f:id:n_agetsuma:20170307162027j:plain

  • New Visualization (vector map, pivot table)
  • Timeseries Builder
  • ダッシュボードのView/Editモードの導入
  • Logstash Pipeline Monitoring
  • Ingest Node Pipeline UI (Ingest Nodeのパイプライン設定作成のGUIサポート。simurateの結果がすぐに出る)
  • 実装がAngularからReactに変更

What's Brewing in Beats?

TODO

  • Filebeat moduleのデモ (MySQL)
  • Metricbeatの応用
    • PrometheusとMetricbeatを連携させて、アプリケーションレイヤ(注文数など)のメトリクス収集
    • jolokiaと組み合わせてJMXメトリクス収集
  • Heartbeatの導入
    • ping監視(レイヤ3)、TCP/TLS接続(レイヤ4)、HTTPリクエスト(レイヤ7)によるサービス死活監視
    • HTTPレスポンス時間のheatmap chartを含むダッシュボード
  • DockerへのMetricbeat
    • DockerコンテナとしてMetricbeatを起動すると、同じホスト内のそれぞれのコンテナの状態を監視できる

将来的なテーマ

  • セキュリティ関連のユースケースへの機能拡充
    • 例えば、SSHやsudoログ監視のようなもの。ポーリングベースではなく、通知ベースでBeatで情報収集できないか。
  • more module
  • De-duplicate(一意なキーを生成して、Filebeatリスタート時の重複ログ登録が防げないか)
  • Central configuration management

*1:言葉で言ってたかは聞き取れなかったが、少なくともキーノートのスライドにはどこにもPrelertって書いてなかった気がする

PayaraのJMX Monitoring Service

この記事は Payara Advent Calendar 2016の12/23分の記事です。
昨日はsuke_masaさんのPayaraのバグを報告してみようでした。
明日は@yumix_hさんです。


2016/8/1にリリースされた Payara Server 4.1.1.163 より、指定したJMX MBeanの値を定期的にロギングできるJMX Monitoring Serviceが追加されました。

多くのAPサーバではJMX経由でリクエストスレッドプールや、JDBCコネクションプールなどの枯渇すると重大なトラブルに絡むことを多いメトリクスを収集できますが、jconsoleでは最新値を見れるだけなので事後障害解析には役立ちません。障害解析には虚無僧を使ってロギングしたり、jolokiaを追加してHTTP-API経由によるLogstashやFluentdで定期的に収集した値が必要です。

Payaraは虚無僧を加えなくてもJMX MBeanの値のロギングが可能です。

JMX Monitoring Serviceの使い方

Payaraの公式ドキュメントにはJavaヒープメモリ使用量の例が記載されています。
しかし、ヒープメモリのロギングであればGCログ(-Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps)の方がより詳細な情報が取得できます。JMXに期待するのはヒープ以外のメトリクスです。

GlassFishの頃より、PayaraにはMonitoring Serviceという仕組みがあり、デフォルトではオーバヘッド削減のためメトリクス収集は無効となっていますが、モジュールごとに有効化が可能です。JMX Monitoring Serviceでロギングするためには、事前に対象モジュールのモニタリングを有効化させる必要があります。

Monitoring Serviceの設定状況は asadmin get "server.monitoring-service.module-monitoring-levels.*" で一覧が取得できます。

./asadmin get "server.monitoring-service.module-monitoring-levels.*"
server.monitoring-service.module-monitoring-levels.cloud=OFF
server.monitoring-service.module-monitoring-levels.cloud-elasticity=OFF
server.monitoring-service.module-monitoring-levels.cloud-orchestrator=OFF
server.monitoring-service.module-monitoring-levels.cloud-tenant-manager=OFF
server.monitoring-service.module-monitoring-levels.cloud-virt-assembly-service=OFF
server.monitoring-service.module-monitoring-levels.connector-connection-pool=OFF
server.monitoring-service.module-monitoring-levels.connector-service=OFF
server.monitoring-service.module-monitoring-levels.deployment=OFF
server.monitoring-service.module-monitoring-levels.ejb-container=OFF
server.monitoring-service.module-monitoring-levels.http-service=HIGH
server.monitoring-service.module-monitoring-levels.jdbc-connection-pool=OFF
server.monitoring-service.module-monitoring-levels.jersey=OFF
server.monitoring-service.module-monitoring-levels.jms-service=OFF
server.monitoring-service.module-monitoring-levels.jpa=OFF
server.monitoring-service.module-monitoring-levels.jvm=OFF
server.monitoring-service.module-monitoring-levels.orb=OFF
server.monitoring-service.module-monitoring-levels.security=OFF
server.monitoring-service.module-monitoring-levels.thread-pool=HIGH
server.monitoring-service.module-monitoring-levels.transaction-service=OFF
server.monitoring-service.module-monitoring-levels.web-container=OFF
server.monitoring-service.module-monitoring-levels.web-services-container=OFF

今回はHTTPリクエスト処理のスレッドプールを例にするため、http-serviceの監視レベルを変更します。

./asadmin set "server.monitoring-service.module-monitoring-levels.http-service=HIGH"

jconsoleで接続すると、監視が有効化されている場合、amx配下にthread-pool-monが表示されます。xxx-mon配下のMBeanはメトリクスを属性に持っています。

f:id:n_agetsuma:20161223150553p:plain:w300

8080ポートで受付中のHTTPリクエストのスレッドプールの現在の利用状況であれば、thread-pool-mon/mon/server-mon[server]/network/http-listener-1/thread-pool/currentthreadbusy.countが現在払い出されているスレッド数を示します。

f:id:n_agetsuma:20161223151111p:plain

JMX Monitoring Serviceによるロギングを有効化は、asadmin set-monitoring-configuration --addpropertyをします。

  • --addproperty 'name=(attrname) value=(ObjectName)'
    • name=は監視したい属性名でcurrentthreadbusyのようなコンポジット型の場合は、.区切りで要素currentthreadsbusy.countを指定すると、該当の要素のみロギングできます。value=はObjectNameで以下のようにjconsoleで監視したいMBeanをクリックすると確認できます。

f:id:n_agetsuma:20161223151643p:plain

  • --enable true | false
    • falseを指定すると、次回起動時よりJMX Monitoring Serviceによるロギングが無効化されます。通常はtrueを指定します。
  • --amx true
    • Payara(GlassFish)はApplication Server Management Extension (AMX)という仕組みを利用してMBeanを外部に公開しています。AMXの詳細については、AMX: Design and Useに記載されています。jconsoleでamx配下にあるMBeanを対象にロギングしたい場合は --amx true を指定します
  • dynamic=true
    • オプションで再起動せずにすぐにset-monitoring-configurationした設定によるロギングを開始したい場合に設定します。
./asadmin set-monitoring-configuration --addproperty 'name=currentthreadsbusy.count value=amx:pp=/mon/server-mon[server],type=thread-pool-mon,name=network/http-listener-1/thread-pool' --enabled true --amx true --dynamic=true

設定が有効化されると、$DOMAIN_HOME/logs/server.logには以下のようにログが15秒間隔で出力されます。Filebeatなどで収集してElasticスタックで可視化しておくと、過去の値も振り返ることができます。

[2016-12-23T14:43:50.399+0900] [Payara 4.1] [INFO] [] [fish.payara.jmx.monitoring.MonitoringFormatter] [tid: _ThreadID=47 _ThreadName=payara-monitoring-service(1)] [timeMillis: 1482471830399] [levelValue: 800] [[PAYARA-MONITORING: countcurrentthreadsbusy=0 ]]

まとめ

虚無僧がなくても気軽にJMX MBeanがロギングできるPayaraのJMX Monitoring Serviceを紹介しました。上記以外にもいくつかのオプションがあるため、合わせて公式ドキュメントもご参照ください。

おまけ: MacでPayara起動が遅い、jconsoleで接続できない場合

macOS Sierraでハマったのが、hostnameコマンドで確認できるホスト名localhost.localが/etc/hostsになく、ローカルホストのDNS名が引けない状態だと、Payaraの起動に1分以上かかったり、jconsoleでローカル接続できません。起動中のスレッドダンプを取ると、以下のように名前解決でタイムアウトしているように見えます。

"main" #1 prio=5 os_prio=31 tid=0x00007fd7e1804000 nid=0x1c03 runnable [0x0000700008414000]
   java.lang.Thread.State: RUNNABLE
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
        at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
        at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
        - locked <0x00000007b8ea7eb8> (a java.lang.Object)
        at com.sun.enterprise.util.net.NetUtils.getCanonicalHostName(NetUtils.java:313)
        at com.sun.enterprise.universal.glassfish.ASenvPropertyReader$ASenvMap.getHostname(ASenvPropertyReader.java:310)

以下のようにlocalhost.localを/etc/hostsに加えると、1分間ほどかかるasadmin start-domainが4秒になりました。

$ sudo vim /etc/hosts
127.0.0.1       localhost localhost.local
255.255.255.255 broadcasthost
::1             localhost localhost.local

Payaraは軽快に起動するサーバです。起動が遅いと思った時は上記に当たっていないか試して見ると効果的です。

Commons Lang 3.5 でJava EEにBreakerを組み込む

この記事は Java EE Advent Calendar 2016の12/13分の記事です。
昨日はsuke_masaさんの必要最小限のサンプルでThymeleafを完全マスターでした。
明日は@kazuhira_rさんです。


Payara MicroやWildFly Swarmをベースとしたマイクロサービス対応の共通仕様の規定を目指しているmicroprofile.ioでは、Circuit Breakerの仕様盛り込みがこのスレッドで議論されています。

Circuit Breaker実装というとHystrixが有名ですが、上記スレッドでCommons Langに簡易なCircuit Breaker実装が追加されたことに言及があったため紹介します。

2016年10月にリリースされたCommons Lang 3.5より、LANG-1085により、シンプルなBreaker実装が加わっています。ドキュメントとしてはJavaDocがあります。

Commons LangのBreakerの特徴

  • Half Open状態がない
    • 状態はデフォルトのcloseと、異常が閾値を超えた場合のopen状態のみ
  • HystrixのようなWebダッシュボードはない
  • APIやコマンドで手動によるブレーカ切り替え機能はない
  • 非常にシンプルでCircuitBreakerインタフェースと、2つの実装クラスThresoldCircuitBreakerEventCountCircuitBreakerより構成

ThresholdCircuitBreakerクラスは単純に一定の回数、イベントが発生したらブレーカの開閉を行います。一般的なブレーカのイメージである、1分間で5回以上の例外が発生したらブレーカを開けてデフォルト応答を返し、1分間で例外が発生しなかったらブレーカを閉じて元に戻す実装はEventCountCircuitBreakerクラスです。

以下にJAX-WSクライアントに組み込んだ例を示します。

@ApplicationScoped
public class SoapClient {
    private static final Logger LOG = LoggerFactory.getLogger(SoapClient.class);
    private static final String DEFAULT_RESPONSE = "duke hello world!";
    private HelloEndPoint helloEndPoint;

    @PostConstruct
    public void init() {
        helloEndPoint = new HelloEndPointService().getHelloEndPointPort();
        ((BindingProvider) helloEndPoint).getRequestContext().put("javax.xml.ws.client.connectionTimeout", "5000");
        ((BindingProvider) helloEndPoint).getRequestContext().put("javax.xml.ws.client.receiveTimeout", "60000");
        // 第1引数は閾値で一定期間内で内部カウンタ値がこの値を超えたらブレーカを開け、この値を下回ったら再び閉じる
        // 第2、第3引数は閾値の集計時間単位で、以下の例では1分
        breaker = new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES);
    }

    public String helloWorld(String name) {
        if (breaker.checkState()) {
            try {
                return helloEndPoint.helloWorld(name);
            } catch (Exception e) {
                breaker.incrementAndCheckState();
            }
        } else {
            // ブレーカが開いた場合のデフォルト処理
           LOG.warn("breaker open");
           return DEFAULT_RESPONSE;
        }
    }
}

breaker.checkState()はブレーカが平常時を示すclose状態であればtrueを返し、異常時のopen状態であればfalseを返します。上記の例では、trueを返された場合はSOAPリクエストを投げています。falseが返された場合はブレーカが閾値に達しているため、SOAPリクエストを投げずにデフォルトの応答値を返しています。

例外発生時は、breaker.incrementAndCheckState()を実行し、カウンタ値をインクリメントします。上記の例ではSOAPFaultExceptionが投げられてもカウンタがインクリメントされるため、SOAPFaultが投げられる仕様のAPIでは、どの例外でincrementAndCheckState()を実行するか注意が必要です。

ブレーカはタイムアウトと組み合わせることで初めて効果を発揮します。上記の例においても、WildFly(JBoss)のSOAPクライアントのタイムアウト設定を盛り込んでいます。SOAPクライアントのタイムアウトは実装により異なります。例えば、Metroの場合はcom.sun.xml.ws.connect.timeoutとcom.sun.xml.ws.request.timeoutです。

  • javax.xml.ws.client.connectionTimeout
    • TCP接続時のタイムアウト。NW障害などにより、SYNを送っても応答がなく、TCP接続できない状態に有効。
  • javax.xml.ws.client.receiveTimeout
    • 応答を待つタイムアウトTCP接続後にHTTPリクエストは送信できたが、対向がスローダウンしてHTTP応答が遅い場合に有効。

タイムアウトのみ設定してブレーカがないと、タイムアウトを60秒などの長い時間を指定していた場合、あっという間にリクエスト処理スレッドのプールが枯渇して、SOAPリクエストと全く関係のない機能まで動かなくなる障害の連鎖が起こり得ます。障害が発生しているマシンにはアクセスせずに即座にデフォルト応答を返し、問題のないサービスを守ることがブレーカの目的です。

CDI Extensionでブレーカを組み込む

Commons Lang自体には@HystrixCommandのようなアノテーションによる宣言的なブレーカ設定の仕組みはありません。しかし、CDIインターセプタとCDI Extensionによるアノテーションスキャンの仕組みを用いると、Commons Langでも以下のように宣言的な@Breakerが実現できます。

@ApplicationScoped
public class SoapClient {
    // 一部省略。完全なコードはGitHub参照
    @Breaker(fallbackMethod = "fallback")
    public String helloWorld(String name) {
        return helloEndPoint.helloWorld(name);
    }

    String fallback(String name) {
        return DEFAULT_RESPONSE;
    }
}

作成したサンプルコードの一式はGitHubbreaker-sampleに置いています。
思ったよりクラス数が増えてしまったため、ポイントのみ示します。

ブレーカ実装はインターセプタで実現します。

@Interceptor
@Priority(Interceptor.Priority.APPLICATION)
@Breaker
public class BreakerInterceptor {

    @AroundInvoke
    public Object intercept(InvocationContext ic) throws Exception {
        BreakerHolder holder = Breakers.get().breaker(ic.getMethod());
        EventCountCircuitBreaker breaker = holder.breaker();

        if (breaker.checkState()) {
            try {
                return ic.proceed();
            } catch (Exception e) {
                breaker.incrementAndCheckState();
                throw e;
            }
        }

        // exec fallbackMethod
        Method fallback = holder.fallback();
        return fallback.invoke(ic.getTarget(), ic.getParameters());
    }
}

@Breakerはインターセプタバインディング型のユーザ定義アノテーションです。

@InterceptorBinding
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface Breaker {
    @Nonbinding int openingThreshold() default 5;
    @Nonbinding long checkInterval() default 1;
    @Nonbinding TimeUnit checkUnit() default TimeUnit.MINUTES;
    @Nonbinding int closingThreshold() default 5;
    @Nonbinding  String fallbackMethod() default "";
}

interceptメソッドの中でアノテーションが持つブレーカ設定のロードと、ブレーカの初期化を実装しても良いですが、初回リクエスト時のコストが高くなってしまいます。CDI Extensionを使うと、@Breakerが付与されたCDI Beanの一覧を取得して、デプロイ時にアノテーションスキャン処理が実装できます。

以下は@WithAnnotations({Breaker.class})を設定して、@Breakerが付与されたクラス・メソッドのクラス情報を引数にコールバックされるCDI Extensionです。EventCountCircuitBreakerの初期化やブレーカ開放時に実行するfallbackMethodが存在するかのチェック、リフレクションでfallbackMethodのMethodインスタンスを取得してキャッシュなどの処理をしています。クラスの全体はBreakerExtentionクラスより参照できます。

public class BreakerExtention implements Extension {
    ....
    <T> void processAnnotatedType(@Observes @WithAnnotations({Breaker.class}) ProcessAnnotatedType<T> pat) {
        Breakers breakers = Breakers.get();
        if (!pat.getAnnotatedType().getJavaClass().equals(BreakerInterceptor.class)) {
            pat.getAnnotatedType().getMethods().stream()
                    .filter(method -> method.isAnnotationPresent(Breaker.class))
                    .map(method -> method.getJavaMember())
                    .forEach(method -> {
                        BreakerConfig config = createConfig(method);
                        Method fallbackMethod = getFallbackMethod(method.getDeclaringClass(), config.getFallbackMethod());
                        String fqcn = fqcn(method);
                        breakers.put(method, createBreakerHolder(config, fallbackMethod, fqcn));
                        LOG.info("breaker created for " +  fqcn + ", now state is close. " + config);
                    });
        }
    }
   ...

CDI Extensionを有効にするためには、META-INF/services/javax.enterprise.inject.spi.Extensionを含め、Extension実装クラス名を定義します。

sample.breaker.BreakerExtention

Commons Langのブレーカが有効なケース

statsのAPIが欲しい、Webダッシュボードが欲しいなど、ボリュームが膨らみそうな場合はCDIインターセプタ実装を膨らまして劣化 Hystrixにするよりも、素直にHystrixを使う方が適切です。

Commons Langのブレーカの強みはそのコンパクトさです。レガシーなシステムでHystrixを導入するのは周りの同意が得られにくい、Hystrixを導入するとそのメンテナンス自体にも責任を負わなければいけない状況では、軽量なCommons Langであれば新たに含めることに障害が少ない場合も多いと思います。

まとめ

Commons Langによるブレーカ実装と、インターセプタとCDI Extentionを用いて宣言的ブレーカの実装を紹介しました。microprofile.ioによりJava EEサーバにもCircuit Breakerが実装されて、自分で作り込まずに済むことを楽しみにしています。