minikube startのプロキシ越え

プロキシ環境化でminikubeのセットアップにはまったのでメモ。
詳細は https://github.com/kubernetes/minikube/issues/530 を参照。

環境

プロキシ環境化でのminikube start方法

export HTTP_PROXY=http://xxx.xxx.xxx.xxx:xxxx
export HTTPS_PROXY=http://xxx.xxx.xxx.xxx:xxxx
export NO_PROXY=192.168.99.100

minikube start --vm-driver virtualbox --docker-env HTTP_PROXY=$HTTP_PROXY --docker-env HTTPS_PROXY=$HTTPS_PROXY

環境変数NO_PROXYの設定がない場合、minikube startがタイムアウトして起動しない。

Starting local Kubernetes v1.10.0 cluster...
Starting VM...
Downloading Minikube ISO
 150.53 MB / 150.53 MB [============================================] 100.00% 0s
Getting VM IP address...
Moving files into cluster...
Downloading kubeadm v1.10.0
Downloading kubelet v1.10.0
Finished Downloading kubelet v1.10.0
Finished Downloading kubeadm v1.10.0
Setting up certs...
Connecting to cluster...
Setting up kubeconfig...
Starting cluster components...
E0427 16:33:36.335447    3153 start.go:276] Error starting cluster:  timed out waiting to unmark master: getting node minikube: Get https://192.168.99.100:8443/api/v1/nodes/minikube: Service Unavailable

miniikube startして1〜2分しても応答がない場合、minikube logs -f でログを確認すると、以下のようなログ数行間隔で繰り返し出力されている。

minikube logs -f 
Apr 27 07:32:45 minikube kubelet[3541]: E0427 07:32:45.564897    3541 reflector.go:205] k8s.io/kubernetes/pkg/kubelet/kubelet.go:460: Failed to list *v1.Node: Get https://192.168.99.100:8443/api/v1/nodes?fieldSelector=metadata.name%3Dminikube&limit=500&resourceVersion=0: dial tcp 192.168.99.100:8443: getsockopt: connection refused

起動に途中で失敗した場合、環境をクリーンにした上でリトライする。

minikube delete
rm -rf ~/.minikube

Elasticsearchのインデックス開きすぎによるヒープメモリ枯渇

この記事はElastic stack Advent Calendar 2017の12/6分の記事です。

ElasticスタックによるApacheアクセスログやsar情報などのメトリクス収集を初めて導入した後の頻出トラブルとして、インデックスのオープンしすぎによるJavaヒープメモリ枯渇がある。

検索エンジン用途や、運用監視業務に組み込むような「本気の」運用では、事前にサイジングが行われる。しかし、まずはシステム状況が可視化できるかお試しで導入を始めると、とりあえず運用を始め、インデックスのクローズや削除、スナップショットの定期取得などの運用管理計画はどうしても漏れがちとなる。

では、具体的にElasticsearchはだいたい何ヶ月分のメトリクスが保存できるのが次の疑問になるが、以下のような多様な要素が作用するため、要件に合わせて実機検証が必要となる。

  • 登録するメトリクスの種類
    • Apacheアクセスログだけか、sar ALL相当も含めるか、MetricBeatなどのBeatsは使うか、Beatsを使うなら何台分のマシン情報を収集するのか など
  • Mapping設計
    • フィールド数が増えるほど、ElasticsearchのJavaヒープメモリ使用量は増える
  • Kibanaダッシュボード内容や表示期間、アクセス頻度

ここまではWeb上によくある情報だが、では実際にどの程度Elasticsearchに保持できるのか、実測してみた。

どの程度Elasticsearchに保存できるのか

以下の測定条件で実測してみた。

  • Elasticsearch 6.0.0
  • クラスタ構成。1インスタンスのみ。
  • 登録するのはApacheのCOMBINEDログのみ
  • シャード数はデフォルトの5
  • Javaヒープメモリサイズはデフォルトの1GB
    • Logstashのデフォルトに沿って、日別インデックス (logstash-%{yyyymmdd})
    • 具体的なLogstash設定は以下の通り
input { stdin {} }

filter {
  grok { match => { "message" => "%{HTTPD_COMBINEDLOG}" } }

  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    remove_field => ["timestamp"]
  }

  useragent {
    source => "agent"
    target => "useragent"
    remove_field => ["agent"]
  }

  mutate {
    remove_field => ["message"]
  }
}

output {
  elasticsearch {}
}
測定結果
  • 1インデックスあたり1ドキュメントの場合
    • 約1100インデックス(5500シャード)でCMS-GCループによる応答遅延発生
  • 1インデックスあたり10000ドキュメントの場合
    • 約500インデックス(2500シャード)でCMS-GCループによる応答遅延発生

Javaヒープメモリが枯渇し始め、GCのオーバヘッドが高くなると、以下のようなログがelasticsearch.logに大量に出力される。

[2017-12-05T18:02:17,978][WARN ][o.e.m.j.JvmGcMonitorService] [lvj6mYE] [gc][4474] overhead, spent [1.3s] collecting in the last [2.3s]

インデックスの大量作成を行い、Javaヒープメモリが枯渇した状態では、GCログは以下のような状態となる。水色の縦線がCMS-GCの発生を示しており、やがてヒープが完全に枯渇し始めると黒色の縦線のFull GCが連続して発生する。
f:id:n_agetsuma:20171205182809p:plain
1インデックスあたりの1ドキュメントで5500シャード作成した場合、ヒープダンプを取得してヒストグラムを取得すると、以下のようになる。5500インスタンスのクラスが複数存在し、Elasticsearchの内部実装として、シャード数ごとにインスタンス生成があり、シャードが増えるほどJavaヒープメモリ消費量が増えることが考察できる。
f:id:n_agetsuma:20171205184057p:plain

実測結果からの考察

ヒープ拡大やスケールアウトにより、より大量のインデックスをオープン状態にすることは可能なため、この結果からは確かなことは言えない。

しかし、1ノードでオープンできるシャード数は数十万などの大きなものではなく、多くても数千単位であることが考察できる。非クラスタ構成のElasticsearchでは、以下のような前提においては1〜2ヵ月分程度のインデックスが一度にオープンできてKibanaから閲覧できる量である。それより古いドキュメントはスナップショットを取得して削除するか、"Hot-Warmアーキテクチャ"に代表される、クラスタのdataノード構成を考える必要がある。

  • Apacheアクセスログ相当のフィールド数を持つメトリクスを10種類収集。
    • メトリクスごとにインデックス名を分ける。1日10インデックス。
    • シャード数はデフォルトの5
  • クラスタ構成のElasticsearch
  • 1日あたり、各メトリクスごとに10000ドキュメント

まとめ

Apacheアクセスログを対象に、Elasticsearchのデフォルト設定の場合、1ノードでどの程度のインデックスをオープンするとJavaヒープメモリ1GBが枯渇するか、実測してみました。

  • 1インデックス1ドキュメントの場合: 5500シャード
  • 1インデックス10000ドキュメントの場合: 11000シャード

環境や保存したいデータによっても結果は異なるため、あくまで上記は実測例です。

初めてElasticスタックによるメトリクス収集を導入する場合、Elasticsearchに蓄積したインデックスは定期的にクローズしないと、ディスクに余裕があっても、Javaヒープメモリが枯渇してElasticsearchが無応答になることに注意が必要です。

ElasticsearchのBulk API効果を実測する

Elasticsearchには、クライアントから複数のリクエストをまとめて送信するBulk API機能がある。JDBCのexecuteBatchのようなイメージで使える。

JDBCの場合、Oracleのドキュメントでは、バッチ単位のガイドライン*1として「バッチ・サイズを50から100の一般的な範囲に保つことをお薦めします」が示されている。Elasticsearchの場合、バルクサイズごとにどの程度スループットに差が出るのか実測してみた。

測定環境

  • Elasticsearch 6.0.0-rc2
  • Elasticsearch6で導入されたJava High Level REST Client
  • JUnit 5
  • Oracle JDK 1.8.0_152
  • MacBook Pro (13-inch, Late 2016)
  • テストアプリケーションからlocalhost上のElasticsearchに接続

テストの流れ

  1. @BeforeEachでテスト用のインデックスとそのマッピングを作成。自動マッピング作成によってテスト結果が揺れないようにするため。
  2. テスト実行。JIT影響を鑑みて10回実行する。10回中一番早かったレスポンスを測定対象とする。
  3. @AfterEachでテスト用インデックスを削除

テストパターン

  1. 通常のIndex API(Bulk API未使用)
  2. Bulk API - バルクサイズ 10/50/100/1000/10000 の5パターン

テストコード全体は以下の通り。
JSON処理の高速化についてはKOMIYAさんの記事を参考に、jacksonのAfterburnerModuleを使ったり、JSONをStringではなくbyte[]で扱っている。テストコードの全体像はGitHub - n-agetsu/elastic-restclientを参照。

class IndexPerformanceTest {
    private static final Logger LOG = LogManager.getLogger(IndexPerformanceTest.class);

    private static final HttpHost ES_HOST = new HttpHost("localhost", 9200, "http");
    private static final int SEND_DOC_NUM = 10000;
    private static final String INDEX = "posts";
    private static final String TYPE = "doc";

    private static ObjectMapper BURN_MAPPER;
    private static RestHighLevelClient highLevelClient;
    private static RestClient client;

    @BeforeAll
    static void setup() throws JsonProcessingException {
        highLevelClient = new RestHighLevelClient(RestClient.builder(ES_HOST));
        client = RestClient.builder(ES_HOST).build();
        BURN_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
    }

    @BeforeEach
    void beforeEach() throws IOException {
        // create mapping
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support Mapping API.
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("settings")
                     .field("number_of_shards", 1)
                   .endObject()
                    .startObject("mappings")
                      .startObject(TYPE)
                        .startObject("properties")
                          .startObject("user")
                            .field("type", "text")
                          .endObject()
                          .startObject("postDate")
                            .field("type", "date")
                          .endObject()
                          .startObject("message")
                            .field("type", "text")
                          .endObject()
                        .endObject()
                      .endObject()
                    .endObject();
        }
        builder.endObject();
        HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON);
        Response response = client.performRequest("PUT", "/" + INDEX, Collections.emptyMap(), entity);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("create mapping posts failed, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @AfterEach
    void afterEach() throws IOException {
        // delete index for cleanup
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support DELETE INDEX API.
        Response response = client.performRequest("DELETE", "/" + INDEX);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("delete index posts, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @DisplayName("Index Performance Test - Non Bulk API")
    @RepeatedTest(value = 10, name = RepeatedTest.LONG_DISPLAY_NAME)
    void testIndexRequest() throws IOException {
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            highLevelClient.index(request);
        }
    }

    @DisplayName("Index Performance Test - Bulk API")
    @ParameterizedTest(name = "run {0} with bulkSize {1}")
    @MethodSource("repeatCountAndBulkSize")
    void testIndexBulkRequest(int repeatCount, int bulkSize) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            bulkRequest.add(request);
            boolean last = i == SEND_DOC_NUM - 1;
            if (i % bulkSize == 0 || last) {
                highLevelClient.bulk(bulkRequest);
                if (!last) {
                    bulkRequest = new BulkRequest();
                }
            }
        }
    }

    private static Stream<Arguments> repeatCountAndBulkSize() {
        // test 10 times per batchSize
        Stream.Builder<Arguments> builder = Stream.builder();
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 50)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 100)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 1000)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10000)));
        return builder.build();
    }

    @AfterAll
    static void cleanup() {
        try {
            highLevelClient.close();
        } catch (IOException e) {
            LOG.error(e);
        }

        try {
            client.close();
        } catch (IOException e) {
            LOG.error(e);
        }
    }
}

測定結果

テスト項目 処理時間
通常のIndex API(Bulk API未使用) 6.46 s
Bulk API(バルクサイズ10) 1.08s
Bulk API(バルクサイズ50) 492ms
Bulk API(バルクサイズ100) 383ms
Bulk API(バルクサイズ1000) 266ms
Bulk API(バルクサイズ10000) 254ms

Bulk APIの利用により、バッチサイズが10のような小さな値でも、有効なことがわかる。このケースではバルクサイズ1000ドキュメント程度が効率が良かった。

バルクサイズを大きな値にするとドキュメントが貯まるまでクライアントでデータ保持して、データの発生からElasticsearchで検索可能となるまで遅延が大きくなる。このため、いくつが良いとは一概には言えないが、まとまった単位のリクエストを投げる時には値はいずれにしても、Bulk APIを使った方が良いことは上記の値から判断できる。

Elasticsearch側の最大リクエストサイズに上限としてhttp.max_content_lengthがあるが、デフォルトは100MBと非常に大きな値である。Elasticsearchへの同時並行リクエストが多い場合は、大きなリクエストを並行に受け付けることでOutOfMemoryErrorに繋がる可能性があるが、並列数が少ない場合は積極的にバッチサイズを拡大できると考える。

おまけ Javaの性能測定時はJIT影響に注意

Bulk API未使用時の10回テストした時の値を以下のグラフに示すが、JavaVMを再起動せずにmaven-surefire-pluginによって繰り返しテスト実行していくうちに、最初の測定値の2倍近く性能向上した (13秒→6秒)。デフォルトでは-XX:CompileThreshold=10000により、10000回のメソッド実行でJITコンパイル対象となり、この影響は性能に大きな差を与える。

f:id:n_agetsuma:20171113210142p:plain

Javaのアプリケーションでは、起動直後にパフォーマンスを測定せず、必ずウォームアップしてから測定する必要がある。

Kafkaコンシューマのリバランスはいつ行われるか

Kafkaのコンシューマグループに、コンシューマを追加すると、各コンシューマがどのパーティションを読み取るか再割り当てするリバランスが行われる。

Kafka0.11.0.1を対象に、以下の観点でKafkaの実装を調べてみた結果をまとめる。

  • リバランスはいつ行われるのか
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
  • リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスはいつ行われるのか

KafkaConsumer.poll()メソッドの内部で行われる。

Kafkaのコンシューマの実装はだいたい以下のようになる。コード全体はgist参照。
Kafka Comsumer Sample · GitHub

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

コンシューマを起動し、どのパーティションのサブスクライブを始めるのかの決定は、poll(long timeout)メソッドの内部で行われる。このとき、コンシューマからKafkaプロトコルJoinGroupと呼ばれるリクエス*1をブローカーに送信し、その応答メッセージにどのパーティションのサブスクライブをすべきか含まれている。

リバランスの処理はpollメソッドを呼び出したスレッドで行われ、メッセージのフェッチ要求の前に行われる。ここまでの挙動は、KafkaConsumer.pollOnceメソッドを読み進めていくとわかる。

なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか

既存コンシューマグループにコンシューマが追加された場合、既にコンシューマグループに含まれている全てのコンシューマからJoinGroupリクエストが来るまで、JoinGroupレスポンスが返されず、パーティションの割り当てが行われないため。

既存のコンシューマによっては、メッセージ受信後の処理が重い、マシンリソース枯渇などの要因により、次のpoll(long timeout)が呼ばれるまで時間がかかるケースがあるだろう。

参加したいコンシューマグループに含まれる、全てのコンシューマでpoll(long timeout)が実行されるまでは、新規参加したコンシューマにパーティションが割り当てられず、メッセージの配信は始まらない。

リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスが起こった時に、オフセットコミットのタイミングによっては、既存コンシューマと新規コンシューマに重複メッセージが配信されないかについて。

これはオフセットのコミット方式にも依存する。

autoCommitの場合

既存コンシューマでは、poll()メソッド内部のリバランスの前処理として、commitSync()によるオフセットコミットを行うため、リバランスの発生を原因としたメッセージ重複は起こらない*2。新規に追加されたコンシューマでは、リバランス直前にコミットされたオフセットを読み込んでサブスクライブを開始する。

このリバランス直前のオフセットコミット処理はConsumerCoodinator.onJoinPrepareメソッドで実装されている。

手動コミットの場合

前述のコンシューマのコード例を再掲するが、以下のようにpoll()の直前でcommitSync()を実行して正常終了している場合、poll()内部でリバランスが発生した時には常に最新のオフセットがコミットされているため、メッセージ重複は起こらない。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

上記のコードのcommmitSync()をcommitAsync()に置き換えた場合、メッセージ重複が起こらないかについては、実装から読み取れなかった。

手元で調べる限りでは、Kafkaは同一クライアントからのリクエストを逐次的に処理するように見える。commitAsync()によるオフセットのコミット要求の処理が終わるまで、poll()メソッドから送られたJoin Groupリクエストのハンドリングを行わない。しかし、ネットワーク遅延などの要因により、オフセットコミット要求とJoin Group要求の順序が逆になってKafkaに届いた場合、Join Group要求時に最新オフセットがコミットされていない可能性がある。オライリーKafka: The Definitive Guideで紹介されているように、ConsumerRebalanceListenerを実装して、リバランス開始前にコールバックされるonPartitionsRevokedメソッドでcommitSync()を実行した方が安全だろう。

まとめ

  • リバランスはいつ行われるのか
    • KafkaConsumer.poll()メソッドの内部で行われる
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
    • 既存コンシューマがpoll()を実行してJoin Groupリクエストが投げられるのをKafkaが待っているため
  • リバランスが起こった時に重複メッセージ処理は発生しないか
    • autoCommitではリバランス直前にコミットされる。手動コミットでは、poll()の直前にcommitSync()を発行していれば、重複メッセージ処理は発生しない。

*1:他にKafkaにどのようなプロトコルがあるかは KafkaApis.scalaを読むと何となくわかる

*2:autoCommitではコンシューマプロセスクラッシュ時に、前回コミット以降に処理されたメッセージが重複メッセージ配信されるため、重複メッセージ配信が許容できない要件では利用できない。

Kafkaコンシューマのrecords-lagは何の値か

Kafkaコンシューマの監視した方が良いメトリクスとして、datadog*1のドキュメントや、HortonworkによるKafkaのベストプラクティス*2ではrecords-lagが紹介されている。

このコンシューマのlagとは具体的にどのように計算された値か、以降にまとめる。以下の内容は、KafkaのJavaクライアント0.11.0.0で調べている。

records-lagについて

records-lagとは、Kafkaのコンシューマのメトリクスで、コンシューマプロセスのJMX MBeanとして公開されている。この値は、Kafkaのブローカー側の最大オフセットと、コンシューマが処理済みのオフセットの差分を示す。jconsoleで見ると、以下のようにmy-topic-0.records-lagmy-topic-0.records-lag-avgmy-topic-0.records-lag-maxのように、トピック名-パーティション番号の命名規則で公開される。例の場合はコンシューマが1つのパーティションしかサブスクライブしていないが、複数のトピックやパーティションをサブスクライブしていた場合は、その分の属性が作られる。

f:id:n_agetsuma:20171009204750p:plain


例えば、ブローカー側のあるパーティションのオフセットが10000で、コンシューマが2000まで読み込んでいた場合、records-lagは8000で8000件のメッセージは未処理であることを示す。

records-lagは0は基本的に0であることが望ましい。このラグが定常的に発生し、かつ増え続けている場合、プロデューサからのメッセージ送信の頻度に対して、コンシューマの処理性能が足りていないことを示す。コンシューマの処理の中で受信したメッセージをDBに書き込んでおり、そのDBの処理性能が遅延している。または、そもそもコンシューマの処理性能が足りず、コンシューマグループにコンシューマを足してスケールアウトの必要がある等が、ラグが増え続ける一般的な原因である。何らかの対処を行わないと、まだコンシューマに処理されていないメッセージがlog.retention.hours(デフォルト1週間)以上経過して削除されてしまう。

以前のKafkaではコンシューマ全体で過去最も大きなラグが発生したかをrecords-lag-maxで確認できたが、それがどのトピックのどのパーティションが発生したものなのか、また現在のラグはいくつなのかを把握することはできなかった。しかし、現在ではKakfa0.10.2.0の以下のissueの対応により、サブスクライブしているパーティションごとに、現在値、平均値、過去の最大値が確認可能となっている。
[KAFKA-4381] Add per partition lag metric to KafkaConsumer. - ASF JIRA

records-lagはどうやって計算されているか

records-lagがいつ、どうやって計算された値なのかを把握するためには、Kafka Javaクライアントのコンシューマの仕組みを簡単に把握する必要がある。

以下のように、よくあるコンシューマのpollループのコードを例に考える。KafkaConsumer.pollメソッドがどうやってメッセージをブローカーから取得しているのか、実装を読んでみた。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpollメソッドは何をしてるのか?
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", 
            record.partition(), record.offset(), record.key(), record.value());
    }
}

全体を図にすると以下のようになる。
f:id:n_agetsuma:20171009211504p:plain

  1. pollメソッドを呼び出すと、Fetcherはコンシューマに割り当てられている各パーティションのデータをフェッチする。この時に、パーティションごとに最大でデフォルト1MB(max.partition.fetch.bytes)のメッセージをまとめてフェッチする。このフェッチのタイミングで、フェッチしたパーティションの現在の最大オフセット値を取得。
  2. フェッチ済みのメッセージの中から、最大でデフォルト500メッセージ(max.poll.records)をpollメソッドの呼び出し元に返す。
    • 既に500メッセージ以上をフェッチ済みであった場合は、新たなフェッチを行わず、フェッチ済みのバッファからメッセージを取得。
    • pollメソッドの呼び出し時に、フェッチ時に取得したパーティションの最大オフセットと、pollメソッドで返したオフセットの差分としてrecords-lagを計算。

例えば、フェッチ時に取得したパーティションの最大オフセットが10000であり、pollメソッドで500メッセージを返すと、records-lagは9500となる。もう一回pollメソッドを回すと、バッファ済みメッセージから500メッセージを取得して、records-lagは9000に縮小する。

遅延が発生してもrecords-lagが拡大しない注意したいパターン

先ほど最大オフセットはフェッチ時に取得、records-lagの取得はpoll時に行うと太字にしたのは意味がある。この仕組みにより、コンシューマで遅延が拡大していても、records-lagが拡大しないケースがある。

  1. 10000メッセージフェッチして、今のところ1000メッセージがpoll済み。この時点でのrecords-lagは9000。
  2. 新たなメッセージをフェッチする前に、ブローカー側に10000メッセージが追加。しかし、この時点で計算されるrecords-lagは9000。
    • 何故ならば、新たなフェッチが発生しておらず、records-lagの計算に使われるパーティションの最大オフセット値が10000のままであるため。フェッチ済みのメッセージの処理が終わるまでは、records-lagの値は減り続ける。
    • 次のフェッチのタイミングでrecords-lagが一気に拡大する。

records-lagはフェッチ時に取得したパーティションの最大オフセット値をベースとしているため、フェッチ後に大量にメッセージの追加が行われても、次のフェッチまでrecords-lagの値には反映されない仕組みになっている。

Burrowの場合

若干脱線するが、LinkedInが作っているコンシューマのラグ監視計測ツールBurrowでは、再現はできていないが、仕組み上このような問題は発生しないと思われる。Burrowではコンシューマのコミット済オフセットが入った__consumer_offsetトピックの値と、データが入ったトピックの最大オフセット値の両方を取得し、その値の差分でラグを計算しており、コンシューマのJMX MBeanの値を使っていない。

まとめ

  • Kafkaコンシューマの監視にはJMX Meanのrecords-lagが有効
  • Kakfa0.10.2.0からはパーティションごとにrecords-lagの監視が可能
  • メッセージのフェッチ直後に、大量のメッセージ追加があると、records-lagの値には反映されないので注意。

tcpdumpからKafkaのリトライ可能エラーを追う

Kafkaのメッセージロストの原因の1つに、プロデューサの設定のretries(デフォルト0:リトライしない)の未設定がある。

Kafkaクライアントの送信リトライ

プロデューサからメッセージ送信時のエラーには、retires設定によりKafkaのクライアントライブラリ内で自動リトライ可能なエラーと、リトライせずにProducer.sendメソッドの呼び出し元に例外を投げるエラーがある。自動リトライ可能なエラーで発生する例外は、org.apache.kafka.common.errors.RetriableExceptionの継承クラスである。代表的なものを以下に示す。

retiresによりリトライするエラー

  • NotLeaderForPartitionException
    • リーダーパーティションではないブローカーにメッセージが送られた場合のエラー応答。ブローカー障害によるフェールオーバ中にメッセージを送信すると起こりやすい。
  • NetworkException
    • ブローカーからレスポンスがない場合。メッセージを送ってから、ackが返される間にブローカーやNWに障害が発生すると起こりやすい。
  • 一時的な障害で、リトライすれば成功の見込みがある場合

リトライしないエラー

  • SerializationException
    • key.serializerやvalue.serializerのシリアライザでエラーが発生した場合
  • RecordTooLargeException
    • ブローカー設定のreplica.fetch.max.bytes(デフォルト1048576バイト)を超えるメッセージが送信された場合
  • リトライしても成功の見込みがないエラー

Kafkaとしてはリトライ可能なエラーを返しているが、クライアントがリトライしないでメッセージロストしている場合、クライアント側のログには出力されるが、Kafkaのブローカー側には特に何も出ていないこともある。Kafkaクラスタを運用管理する立場で、クライアント側より「たまにメッセージロストしているように見えます。クライアント側のログはありません。」と申告を受けても、tcpdumpの取得によりリトライエラーが返されていることを特定できる場合がある。

WiresharkによるKafkaプロトコル解析

WiresharkにはKafkaプロトコルを解釈する機能がある。0.10系までのプロトコル解釈には対応しているが、最新の0.11系には2017/09/26時点ではまだ対応していない。

デフォルトでは9092ポートの通信をKafkaの通信として解釈している。もし9092ポート以外もKafkaプロトコルとして解釈させたい場合は、Wiresharkの[Preferences] - [Protocols] - [Kafka] の設定で変更できる。
f:id:n_agetsuma:20170926171300p:plain

この状態でKafkaの通信を含むpcapファイルを読み込ませると、以下のようにKafkaプロトコルを解釈して表示される。
f:id:n_agetsuma:20170926171024p:plain

Kafkaは一般的に高トラフィックな環境で使われることが多いため、ローテーションでpcapを定期的に捨てるようにし、バッファも広げてなるべくdropped by kernelのパケットが増えないようにする。IO影響をなるべく避けるために、pcapファイルの出力先はlog.dirsの設定先以外にする。

tcpdump -w /path/kafka.pcap -i enp0s3 -C 1024 -W 10 -z gzip -nn -B 524288

取得したpcapファイルをwiresharkに読み込ませ、フィルタ設定にkafka.error > 0を設定すると、エラー応答の一覧が表示される。エラーコード0はNoErrorを示し、異常があると1以上のエラーコードを返すため、0より大きい値でフィルタする。

f:id:n_agetsuma:20170926132913p:plain

Kafkaのレスポンスメッセージはエラーコードが含まれており、上記の例の場合ではNot Leader For Partition (6)、リーダーパーティションではないブローカーにメッセージが送信されたため、エラーコード6を返している。

エラーコードの一覧と例外クラスとのマッピングは、KafkaのErrorMapping.scalaにある。今回の例であれば、エラーコード6がNotLeaderForPartitionExceptionに結びついており、NotLeaderForPartitionExceptionはRetriableExceptionの継承クラスのため、リトライ可能エラーが発生している。

ErrorMapping.scalaには、コンシューマリクエスト時などのすべてのエラーコードが記載されているが、その中でもプロデューサからのメッセージ送信時に発生するエラーコードの一覧は、ProduceResponseクラスにコメントしてまとめられている。

クライアント側ログによるリトライ発生の確認

前述のpcapの確認結果より、少なくともブローカー側からはリトライ可能なエラーが返されていることがわかった。クライアントにretriesの設定が有効となっており、retry.backoff.ms(デフォルト100ミリ秒)の間隔でリトライされているかは、クライアント側のWARNログ出力から確認できる。

2017-09-25 21:22:24.954 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - Got error produce response with correlation id 18504 on topic-partition my-topic-1, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION
2017-09-25 21:23:45.469 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - Got error produce response with correlation id 87895 on topic-partition my-topic-1, retrying (9 attempts left). Error: NETWORK_EXCEPTION

my-topicのパーティション番号1でリトライが発生しており、その原因となるエラーが出力されている。

まとめ

  • Kafkaのエラーにはプロデューサで送信リトライするエラーとしないエラーがある
  • プロデューサ設定のretriesはデフォルト0であり、自動リトライ可能なエラーが発生しても送信リトライしない
  • Kafka0.10系まであればWiresharkでKafkaプロトコルを解釈できる。自動リトライ可能なエラーの発生有無をpcapから特定できる。
  • 送信リトライの発生はクライアント側でWARNログとして出力される

Elastic {ON} 2017 3/7 ノート(1日目)

今日からElastic {ON}が始まった。会場はサンフランシスコのPier48という、AT&Tパーク近くの倉庫のようなイベント会場。キーノートを行う一番大きなセッション部屋はそれなりに大きい。
f:id:n_agetsuma:20170307123905j:plain

キーノート

日本のElastic {ON} Tourでもお見かけしたことのある、CTOのShay Banonさんを中心に話は進む。
f:id:n_agetsuma:20170307130639j:plain

新しい機能の紹介に絞ると、以下のような話題がある。

  • Kibanaの新しいVisualize
  • Filebeat module (coming in 5.3)
  • Machine Learning (coming in 5.4)
  • Elastic Cloud Enterprise (Beta now)
  • Elastic SQL (coming soon)
  • Kibana canvasプラグイン (coming soon)

Kibanaの新しいVisualize

導入時期については聞き取れず。以下のダッシュボードのような、今までにないVisualizeが追加される。
f:id:n_agetsuma:20170307133131j:plain

新しいVisualizeの1つであるtime series。Inboundのトラフィックを中心線より上、Outboundを下にグラフ化し、In/Outの割合がわかりやすくなる。
f:id:n_agetsuma:20170307133709j:plain

Filebeat module (coming in 5.3)

f:id:n_agetsuma:20170307132846j:plain

詳細はGithubのissuehttps://github.com/elastic/beats/issues/3159 に書かれていたが、Filebeat moduleとは、ざっくりいうと、以下のような設定をまとめたもの。

  • Filebeatのprospectorのyml設定
  • ElasticsearchのInjest Nodeのパイプライン定義
  • Elasticsearchのフィールド定義(各フィールドの型情報など)
  • Kibanaダッシュボード

これは、キーノートの後に行われたセッションで説明されていたが、今までのログの取り込みからは可視化までは、以下のようなフローを"ログごとに"繰り返し行う必要があった。f:id:n_agetsuma:20170307170629j:plain

  1. beatの設定をして
  2. LogstashでGrok書いて (またはInjest Nodeにパイプライン定義して)
  3. Elasticsearchのtemplate書いてスキーマ定義して
  4. KibanaでVisualizeとダッシュボード作る

Filebeat moduleはこれらの"繰り返し"行う作業を一つの既成のモジュールにまとめて、以下のようにbeatの-setupオプションを実行すると、今までやっていたような設定を1コマンドでやってくれるらしい。
f:id:n_agetsuma:20170307132431j:plain

Filebeat moduleのデモとして、Nginxログのダッシュボード、SSHアクセスログ(ログイン成功/失敗数など)、sudo実行履歴の分析が紹介されていた。
f:id:n_agetsuma:20170307132513j:plain

masterブランチのFilebeat moduleのドキュメントには、Apache2、MySQL、Nginx、Systemの各モジュールがあると示されている。

Machine Learning (comping in 5.4)

f:id:n_agetsuma:20170307135756j:plain
日本のElasticのイベントではPrelertという名前で紹介されていた、機械学習プラグインがMachine Learningという名前で紹介された。*1

IT Operationデータと呼ばれていた、仮想的な業務に対するデータ傾向分析のデモ。KibanaのMachine Learningプラグインでは、Jobという単位でタスクを作成。過去のデータから自動的に傾向を抽出し、特異点を発見する。デモの範囲では、いつもの折れ線グラフを作る場合と同じぐらいの入力をしただけで、過去のデータが傾向を抽出。左からデータが増えるごとに、傾向を示す薄い色の範囲が狭くなり、画面右側のようにデータが大きく動くと、また傾向の範囲の精度が荒くなっている。
f:id:n_agetsuma:20170307135107j:plain

特異点は、heat map chartのような図で、赤や青で上下に値が振れ過ぎているのを可視化していた。
f:id:n_agetsuma:20170307135521j:plain
ここは私の想像の部分だが、以下のような用途で使えるかなぁと思った。

  • 一般消費者向けのお店(スーパーとか衣類店など)で、雨降った日に必要以上に売り上げが落ち込んでいる店を一発で可視化する
  • システムリソース解析で、色んなサービスがRESTやSOAPで絡み合っているシステムにおいて、業務量がそれほど増えてないのに、他サービスと比較してリソース消費量が多いサービスを一発で抽出する(アプリケーションに性能バグがないか抽出)

Elastic Cloud Enterprise (Beta now)

f:id:n_agetsuma:20170307141825j:plain

Elastic Cloudはフルマネージドなサービスに対して、自分の好きなpublic or privateクラウド、またはベアメタルのハード上で、Elastic Cloudのような管理機能を持つクラスタを構築するためのもの。既にBeta1が出ており、ドキュメントは以下から参照できる。
What Is Elastic Cloud Enterprise? | Elastic Cloud Enterprise Reference [1.0.0-beta1] | Elastic

自分の所属している組織で考えると、Elasticスタックを初めて使うプロジェクトに対して、お試し環境として貸し出し用に使いたいなと考えた。外に出せないデータ向けの、オンプレミスのお試し環境は欲しい。プロダクション環境用途として使われると、一人で運用管理するのは少々荷が重い。

Elasticsearch SQL (coming soon)

f:id:n_agetsuma:20170307143800j:plain

ElasticsearchクエリにSQLが使えるAPI /_sql の導入。
SELECT文だけでなく、DESCRIBEによるスキーマ情報の確認、EXPLAINによるクエリ実行計画の確認もできる。RDBMSライクに操作可能。
f:id:n_agetsuma:20170307143324j:plain
f:id:n_agetsuma:20170307143458j:plain

Kibana canvasプラグイン (coming soon)

pptスライドやポスターのようなダッシュボードが作れるプラグイン。以下のスライドのようなものに含まれる、グラフはVisualizeで作成したもので、文字の部分はpptのようにオブジェクトして埋め込んで操作していた。
f:id:n_agetsuma:20170307144537j:plain
縦書きの例も紹介。
f:id:n_agetsuma:20170307144724j:plain
動画は撮れなかったため、文字では伝わりにくいが、通常のダッシュボードと同様に毎秒更新のような設定をしておくと、グラフが毎秒最新のデータで更新される。ライブデータでプレゼンできるので、人に見せる時にはインパクトがある。
今までのダッシュボードはKibanaっていうのが一目でわかる見た目しか実現できなかったため、システムにあまり詳しくはない、ビジネス寄りの方へのレポート作成として重宝できるプラグインと思われる。

各セッションについては後日追記予定。

What's Evolving in Elasticsearch?

TODO
セッションの前半はElasticsearch5.2までの振り返り。セッション後半が今後の話。
Elasticsearch 6 and beyond

  • Doc Value 2.0 (Lucene 7)
  • Index Sorting (Lucene 7) https://github.com/elastic/elasticsearch/issues/6720
  • Sequence Number (Internal use)
  • Rolling Upgrades 6.0
    • 5.x系最新から6.x系最新へのアップデート時はクラスタ全体のリスタートが不要に
    • ユーザがアップストリームについて来やすいようになる
  • Cross Manjor Version Search
  • 異なるメジャーバージョンの並行運用が可能。Kibana -> Elasticsearch6.x -> Elasticsearch5.xの流れのように、新しいElasticsearchから古いElasticsearchにリクエストを転送することで、メジャーバージョンが共存したクラスタへのクエリを実現する。

What's Cookin' in Kibana?

TODO
こちらもセッションの前半はKibana5.2までの振り返り。tag cloudやheatmap chart、Logstash Monitoring、I18N(ロシア語などへの一次対応)、5.1.1のKibana高速化など。
セッション後半が今後の話。

f:id:n_agetsuma:20170307162027j:plain

  • New Visualization (vector map, pivot table)
  • Timeseries Builder
  • ダッシュボードのView/Editモードの導入
  • Logstash Pipeline Monitoring
  • Ingest Node Pipeline UI (Ingest Nodeのパイプライン設定作成のGUIサポート。simurateの結果がすぐに出る)
  • 実装がAngularからReactに変更

What's Brewing in Beats?

TODO

  • Filebeat moduleのデモ (MySQL)
  • Metricbeatの応用
    • PrometheusとMetricbeatを連携させて、アプリケーションレイヤ(注文数など)のメトリクス収集
    • jolokiaと組み合わせてJMXメトリクス収集
  • Heartbeatの導入
    • ping監視(レイヤ3)、TCP/TLS接続(レイヤ4)、HTTPリクエスト(レイヤ7)によるサービス死活監視
    • HTTPレスポンス時間のheatmap chartを含むダッシュボード
  • DockerへのMetricbeat
    • DockerコンテナとしてMetricbeatを起動すると、同じホスト内のそれぞれのコンテナの状態を監視できる

将来的なテーマ

  • セキュリティ関連のユースケースへの機能拡充
    • 例えば、SSHやsudoログ監視のようなもの。ポーリングベースではなく、通知ベースでBeatで情報収集できないか。
  • more module
  • De-duplicate(一意なキーを生成して、Filebeatリスタート時の重複ログ登録が防げないか)
  • Central configuration management

*1:言葉で言ってたかは聞き取れなかったが、少なくともキーノートのスライドにはどこにもPrelertって書いてなかった気がする