Kafkaコンシューマのrecords-lagは何の値か

Kafkaコンシューマの監視した方が良いメトリクスとして、datadog*1のドキュメントや、HortonworkによるKafkaのベストプラクティス*2ではrecords-lagが紹介されている。

このコンシューマのlagとは具体的にどのように計算された値か、以降にまとめる。以下の内容は、KafkaのJavaクライアント0.11.0.0で調べている。

records-lagについて

records-lagとは、Kafkaのコンシューマのメトリクスで、コンシューマプロセスのJMX MBeanとして公開されている。この値は、Kafkaのブローカー側の最大オフセットと、コンシューマが処理済みのオフセットの差分を示す。jconsoleで見ると、以下のようにmy-topic-0.records-lagmy-topic-0.records-lag-avgmy-topic-0.records-lag-maxのように、トピック名-パーティション番号の命名規則で公開される。例の場合はコンシューマが1つのパーティションしかサブスクライブしていないが、複数のトピックやパーティションをサブスクライブしていた場合は、その分の属性が作られる。

f:id:n_agetsuma:20171009204750p:plain


例えば、ブローカー側のあるパーティションのオフセットが10000で、コンシューマが2000まで読み込んでいた場合、records-lagは8000で8000件のメッセージは未処理であることを示す。

records-lagは0は基本的に0であることが望ましい。このラグが定常的に発生し、かつ増え続けている場合、プロデューサからのメッセージ送信の頻度に対して、コンシューマの処理性能が足りていないことを示す。コンシューマの処理の中で受信したメッセージをDBに書き込んでおり、そのDBの処理性能が遅延している。または、そもそもコンシューマの処理性能が足りず、コンシューマグループにコンシューマを足してスケールアウトの必要がある等が、ラグが増え続ける一般的な原因である。何らかの対処を行わないと、まだコンシューマに処理されていないメッセージがlog.retention.hours(デフォルト1週間)以上経過して削除されてしまう。

以前のKafkaではコンシューマ全体で過去最も大きなラグが発生したかをrecords-lag-maxで確認できたが、それがどのトピックのどのパーティションが発生したものなのか、また現在のラグはいくつなのかを把握することはできなかった。しかし、現在ではKakfa0.10.2.0の以下のissueの対応により、サブスクライブしているパーティションごとに、現在値、平均値、過去の最大値が確認可能となっている。
[KAFKA-4381] Add per partition lag metric to KafkaConsumer. - ASF JIRA

records-lagはどうやって計算されているか

records-lagがいつ、どうやって計算された値なのかを把握するためには、Kafka Javaクライアントのコンシューマの仕組みを簡単に把握する必要がある。

以下のように、よくあるコンシューマのpollループのコードを例に考える。KafkaConsumer.pollメソッドがどうやってメッセージをブローカーから取得しているのか、実装を読んでみた。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpollメソッドは何をしてるのか?
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", 
            record.partition(), record.offset(), record.key(), record.value());
    }
}

全体を図にすると以下のようになる。
f:id:n_agetsuma:20171009211504p:plain

  1. pollメソッドを呼び出すと、Fetcherはコンシューマに割り当てられている各パーティションのデータをフェッチする。この時に、パーティションごとに最大でデフォルト1MB(max.partition.fetch.bytes)のメッセージをまとめてフェッチする。このフェッチのタイミングで、フェッチしたパーティションの現在の最大オフセット値を取得。
  2. フェッチ済みのメッセージの中から、最大でデフォルト500メッセージ(max.poll.records)をpollメソッドの呼び出し元に返す。
    • 既に500メッセージ以上をフェッチ済みであった場合は、新たなフェッチを行わず、フェッチ済みのバッファからメッセージを取得。
    • pollメソッドの呼び出し時に、フェッチ時に取得したパーティションの最大オフセットと、pollメソッドで返したオフセットの差分としてrecords-lagを計算。

例えば、フェッチ時に取得したパーティションの最大オフセットが10000であり、pollメソッドで500メッセージを返すと、records-lagは9500となる。もう一回pollメソッドを回すと、バッファ済みメッセージから500メッセージを取得して、records-lagは9000に縮小する。

遅延が発生してもrecords-lagが拡大しない注意したいパターン

先ほど最大オフセットはフェッチ時に取得、records-lagの取得はpoll時に行うと太字にしたのは意味がある。この仕組みにより、コンシューマで遅延が拡大していても、records-lagが拡大しないケースがある。

  1. 10000メッセージフェッチして、今のところ1000メッセージがpoll済み。この時点でのrecords-lagは9000。
  2. 新たなメッセージをフェッチする前に、ブローカー側に10000メッセージが追加。しかし、この時点で計算されるrecords-lagは9000。
    • 何故ならば、新たなフェッチが発生しておらず、records-lagの計算に使われるパーティションの最大オフセット値が10000のままであるため。フェッチ済みのメッセージの処理が終わるまでは、records-lagの値は減り続ける。
    • 次のフェッチのタイミングでrecords-lagが一気に拡大する。

records-lagはフェッチ時に取得したパーティションの最大オフセット値をベースとしているため、フェッチ後に大量にメッセージの追加が行われても、次のフェッチまでrecords-lagの値には反映されない仕組みになっている。

Burrowの場合

若干脱線するが、LinkedInが作っているコンシューマのラグ監視計測ツールBurrowでは、再現はできていないが、仕組み上このような問題は発生しないと思われる。Burrowではコンシューマのコミット済オフセットが入った__consumer_offsetトピックの値と、データが入ったトピックの最大オフセット値の両方を取得し、その値の差分でラグを計算しており、コンシューマのJMX MBeanの値を使っていない。

まとめ

  • Kafkaコンシューマの監視にはJMX Meanのrecords-lagが有効
  • Kakfa0.10.2.0からはパーティションごとにrecords-lagの監視が可能
  • メッセージのフェッチ直後に、大量のメッセージ追加があると、records-lagの値には反映されないので注意。