Kafkaコンシューマのrecords-lagは何の値か
Kafkaコンシューマの監視した方が良いメトリクスとして、datadog*1のドキュメントや、HortonworkによるKafkaのベストプラクティス*2ではrecords-lag
が紹介されている。
このコンシューマのlagとは具体的にどのように計算された値か、以降にまとめる。以下の内容は、KafkaのJavaクライアント0.11.0.0で調べている。
records-lagについて
records-lagとは、Kafkaのコンシューマのメトリクスで、コンシューマプロセスのJMX MBeanとして公開されている。この値は、Kafkaのブローカー側の最大オフセットと、コンシューマが処理済みのオフセットの差分を示す。jconsoleで見ると、以下のようにmy-topic-0.records-lag
、my-topic-0.records-lag-avg
、my-topic-0.records-lag-max
のように、トピック名-パーティション番号の命名規則で公開される。例の場合はコンシューマが1つのパーティションしかサブスクライブしていないが、複数のトピックやパーティションをサブスクライブしていた場合は、その分の属性が作られる。
例えば、ブローカー側のあるパーティションのオフセットが10000で、コンシューマが2000まで読み込んでいた場合、records-lagは8000で8000件のメッセージは未処理であることを示す。
records-lagは0は基本的に0であることが望ましい。このラグが定常的に発生し、かつ増え続けている場合、プロデューサからのメッセージ送信の頻度に対して、コンシューマの処理性能が足りていないことを示す。コンシューマの処理の中で受信したメッセージをDBに書き込んでおり、そのDBの処理性能が遅延している。または、そもそもコンシューマの処理性能が足りず、コンシューマグループにコンシューマを足してスケールアウトの必要がある等が、ラグが増え続ける一般的な原因である。何らかの対処を行わないと、まだコンシューマに処理されていないメッセージがlog.retention.hours
(デフォルト1週間)以上経過して削除されてしまう。
以前のKafkaではコンシューマ全体で過去最も大きなラグが発生したかをrecords-lag-max
で確認できたが、それがどのトピックのどのパーティションが発生したものなのか、また現在のラグはいくつなのかを把握することはできなかった。しかし、現在ではKakfa0.10.2.0の以下のissueの対応により、サブスクライブしているパーティションごとに、現在値、平均値、過去の最大値が確認可能となっている。
[KAFKA-4381] Add per partition lag metric to KafkaConsumer. - ASF JIRA
records-lagはどうやって計算されているか
records-lagがいつ、どうやって計算された値なのかを把握するためには、Kafka Javaクライアントのコンシューマの仕組みを簡単に把握する必要がある。
以下のように、よくあるコンシューマのpollループのコードを例に考える。KafkaConsumer.poll
メソッドがどうやってメッセージをブローカーから取得しているのか、実装を読んでみた。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { // 以下のpollメソッドは何をしてるのか? ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } }
全体を図にすると以下のようになる。
- pollメソッドを呼び出すと、Fetcherはコンシューマに割り当てられている各パーティションのデータをフェッチする。この時に、パーティションごとに最大でデフォルト1MB(max.partition.fetch.bytes)のメッセージをまとめてフェッチする。このフェッチのタイミングで、フェッチしたパーティションの現在の最大オフセット値を取得。
- フェッチ済みのメッセージの中から、最大でデフォルト500メッセージ(max.poll.records)をpollメソッドの呼び出し元に返す。
- 既に500メッセージ以上をフェッチ済みであった場合は、新たなフェッチを行わず、フェッチ済みのバッファからメッセージを取得。
- pollメソッドの呼び出し時に、フェッチ時に取得したパーティションの最大オフセットと、pollメソッドで返したオフセットの差分としてrecords-lagを計算。
例えば、フェッチ時に取得したパーティションの最大オフセットが10000であり、pollメソッドで500メッセージを返すと、records-lagは9500となる。もう一回pollメソッドを回すと、バッファ済みメッセージから500メッセージを取得して、records-lagは9000に縮小する。
遅延が発生してもrecords-lagが拡大しない注意したいパターン
先ほど最大オフセットはフェッチ時に取得、records-lagの取得はpoll時に行うと太字にしたのは意味がある。この仕組みにより、コンシューマで遅延が拡大していても、records-lagが拡大しないケースがある。
- 10000メッセージフェッチして、今のところ1000メッセージがpoll済み。この時点でのrecords-lagは9000。
- 新たなメッセージをフェッチする前に、ブローカー側に10000メッセージが追加。しかし、この時点で計算されるrecords-lagは9000。
- 何故ならば、新たなフェッチが発生しておらず、records-lagの計算に使われるパーティションの最大オフセット値が10000のままであるため。フェッチ済みのメッセージの処理が終わるまでは、records-lagの値は減り続ける。
- 次のフェッチのタイミングでrecords-lagが一気に拡大する。
records-lagはフェッチ時に取得したパーティションの最大オフセット値をベースとしているため、フェッチ後に大量にメッセージの追加が行われても、次のフェッチまでrecords-lagの値には反映されない仕組みになっている。