Kafkaコンシューマのリバランスはいつ行われるか

Kafkaのコンシューマグループに、コンシューマを追加すると、各コンシューマがどのパーティションを読み取るか再割り当てするリバランスが行われる。

Kafka0.11.0.1を対象に、以下の観点でKafkaの実装を調べてみた結果をまとめる。

  • リバランスはいつ行われるのか
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
  • リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスはいつ行われるのか

KafkaConsumer.poll()メソッドの内部で行われる。

Kafkaのコンシューマの実装はだいたい以下のようになる。コード全体はgist参照。
Kafka Comsumer Sample · GitHub

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

コンシューマを起動し、どのパーティションのサブスクライブを始めるのかの決定は、poll(long timeout)メソッドの内部で行われる。このとき、コンシューマからKafkaプロトコルJoinGroupと呼ばれるリクエス*1をブローカーに送信し、その応答メッセージにどのパーティションのサブスクライブをすべきか含まれている。

リバランスの処理はpollメソッドを呼び出したスレッドで行われ、メッセージのフェッチ要求の前に行われる。ここまでの挙動は、KafkaConsumer.pollOnceメソッドを読み進めていくとわかる。

なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか

既存コンシューマグループにコンシューマが追加された場合、既にコンシューマグループに含まれている全てのコンシューマからJoinGroupリクエストが来るまで、JoinGroupレスポンスが返されず、パーティションの割り当てが行われないため。

既存のコンシューマによっては、メッセージ受信後の処理が重い、マシンリソース枯渇などの要因により、次のpoll(long timeout)が呼ばれるまで時間がかかるケースがあるだろう。

参加したいコンシューマグループに含まれる、全てのコンシューマでpoll(long timeout)が実行されるまでは、新規参加したコンシューマにパーティションが割り当てられず、メッセージの配信は始まらない。

リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスが起こった時に、オフセットコミットのタイミングによっては、既存コンシューマと新規コンシューマに重複メッセージが配信されないかについて。

これはオフセットのコミット方式にも依存する。

autoCommitの場合

既存コンシューマでは、poll()メソッド内部のリバランスの前処理として、commitSync()によるオフセットコミットを行うため、リバランスの発生を原因としたメッセージ重複は起こらない*2。新規に追加されたコンシューマでは、リバランス直前にコミットされたオフセットを読み込んでサブスクライブを開始する。

このリバランス直前のオフセットコミット処理はConsumerCoodinator.onJoinPrepareメソッドで実装されている。

手動コミットの場合

前述のコンシューマのコード例を再掲するが、以下のようにpoll()の直前でcommitSync()を実行して正常終了している場合、poll()内部でリバランスが発生した時には常に最新のオフセットがコミットされているため、メッセージ重複は起こらない。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

上記のコードのcommmitSync()をcommitAsync()に置き換えた場合、メッセージ重複が起こらないかについては、実装から読み取れなかった。

手元で調べる限りでは、Kafkaは同一クライアントからのリクエストを逐次的に処理するように見える。commitAsync()によるオフセットのコミット要求の処理が終わるまで、poll()メソッドから送られたJoin Groupリクエストのハンドリングを行わない。しかし、ネットワーク遅延などの要因により、オフセットコミット要求とJoin Group要求の順序が逆になってKafkaに届いた場合、Join Group要求時に最新オフセットがコミットされていない可能性がある。オライリーKafka: The Definitive Guideで紹介されているように、ConsumerRebalanceListenerを実装して、リバランス開始前にコールバックされるonPartitionsRevokedメソッドでcommitSync()を実行した方が安全だろう。

まとめ

  • リバランスはいつ行われるのか
    • KafkaConsumer.poll()メソッドの内部で行われる
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
    • 既存コンシューマがpoll()を実行してJoin Groupリクエストが投げられるのをKafkaが待っているため
  • リバランスが起こった時に重複メッセージ処理は発生しないか
    • autoCommitではリバランス直前にコミットされる。手動コミットでは、poll()の直前にcommitSync()を発行していれば、重複メッセージ処理は発生しない。

*1:他にKafkaにどのようなプロトコルがあるかは KafkaApis.scalaを読むと何となくわかる

*2:autoCommitではコンシューマプロセスクラッシュ時に、前回コミット以降に処理されたメッセージが重複メッセージ配信されるため、重複メッセージ配信が許容できない要件では利用できない。