見習いプログラミング日記

Javaを中心に色々なことを考えてみます。目指せ本物のプログラマ。

OpenShift Origin 3.9のインストール

OpenShift Origin 3.9のセットアップ手順メモ。

前提条件は以下の通り。

  • OpenShift用DNSサーバ(dnsmasq)、OpenShift Master 1台、 Node 1台
  • VirtualBoxのブリッジネットワーク上に構成
  • ゲストマシンのOSは全てCentOS7

DNSサーバのセットアップ

・dnsmasqのインストール

# yum install -y dnsmasq

・/etc/hostsのの追記
dnsmasqは/etc/hostsを設定ファイルとして読み込み、名前解決を行う。

192.168.11.10 dns dns.openshift.example.com
192.168.11.11 master master.openshift.example.com
192.168.11.12 node01 node01.openshift.example.com

・/etc/dnsmasq.confの編集
DNSクエリ要求に応答するドメインの設定(local=)、Route用のワイルドカードDNS(address=)、DNSリクエスト待ち受けのネットワークインタフェース(interface=)、DHCPサーバ機能の無効化(no-dhcp-interface=)を設定。

# diff -u /etc/dnsmasq.conf.org /etc/dnsmasq.conf

--- /etc/dnsmasq.conf.org       2018-05-26 12:08:03.447080188 +0900
+++ /etc/dnsmasq.conf   2018-05-26 15:02:48.609040878 +0900
@@ -72,11 +72,13 @@
 # Add local-only domains here, queries in these domains are answered
 # from /etc/hosts or DHCP only.
 #local=/localnet/
+local=/openshift.example.com/

 # Add domains which you want to force to an IP address here.
 # The example below send any host in double-click.net to a local
 # web-server.
 #address=/double-click.net/127.0.0.1
+address=/cloudapps.example.com/192.168.11.11

 # --address (and --server) work with IPv6 addresses too.
 #address=/www.thekelleys.org.uk/fe80::20d:60ff:fe36:f83
@@ -103,7 +105,7 @@
 # specified interfaces (and the loopback) give the name of the
 # interface (eg eth0) here.
 # Repeat the line for more than one interface.
-#interface=
+interface=enp0s3
 # Or you can specify which interface _not_ to listen on
 #except-interface=
 # Or which to listen on by address (remember to include 127.0.0.1 if
@@ -112,7 +114,7 @@
 # If you want dnsmasq to provide only DNS service on an interface,
 # configure it as shown above, and then use the following line to
 # disable DHCP and TFTP on it.
-#no-dhcp-interface=
+no-dhcp-interface=enp0s3

 # On systems which support it, dnsmasq binds the wildcard address,
 # even when it is listening on only some interfaces. It then discards

DNSサーバの起動
ここではあくまでローカルの実験用環境のため、DNSリクエストのポート53へのアクセス許可はfirewalldを切って対処。

# systemctl stop firewalld
# systemctl disable firewalld
# systemctl enable dnsmasq
# systemctl start dnsmasq

DNSサーバのネットワーク設定
DNS1はOpenShift用に起動した内部ネットワーク用のdnsmasqサーバ。DNS2はWebアクセス用のDNSサーバ。

# vim /etc/sysconfig/network-scripts/ifcfg-enp0s3
TYPE=Ethernet
BOOTPROTO=none
NAME=enp0s3
UUID=4590c759-b0cd-4dc2-9f43-779201c6593c
DEVICE=enp0s3
ONBOOT=yes
IPADDR=192.168.11.10
PREFIX=24
GATEWAY=192.168.1.1
DNS1=127.0.0.1
DNS2=192.168.11.1

# hostnamectl set-hostname dns.openshift.example.com
# systemctl restart network

Masterのセットアップ

・Masterサーバのネットワーク設定

# vim /etc/sysconfig/network-scripts/ifcfg-enp0s3
TYPE=Ethernet
BOOTPROTO=none
NAME=enp0s3
UUID=4590c759-b0cd-4dc2-9f43-779201c6593c
DEVICE=enp0s3
ONBOOT=yes
IPADDR=192.168.11.11
PREFIX=24
GATEWAY=192.168.11.1
DNS1=192.168.11.10

# hostnamectl set-hostname master.openshift.example.com
# systemctl restart network

・必要パッケージのインストール

# yum install -y wget git net-tools bind-utils yum-utils iptables-services bridge-utils bash-completion kexec-tools sos psacct unzip
# yum update
# systemctl reboot

OpenShift公式ドキュメントのHost Preparation Installing Base Packagesの通りにインストールすると、後述のAnsibleによるインストール時に以下のエラーが発生したため、unzipのインストールを追加している。

TASK [etcd : Unarchive cert tarball] **************************************************************************************
fatal: [master.openshift.example.com]: FAILED! => {"changed": false, "msg": "Failed to find handler for \"~None/.ansible/tmp/ansible-tmp-1527315221.2-254636300880053/source\". Make sure the required command to extract the file is installed. Command \"unzip\" not found. Command \"/usr/bin/gtar\" could not handle archive."}

・Advanced Instllationに必要なパッケージのインストール

# yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
# sed -i -e "s/^enabled=1/enabled=0/" /etc/yum.repos.d/epel.repo
# yum -y --enablerepo=epel install ansible pyOpenSSL

・OpenShift3.9のダウンロード

# cd ~
# git clone https://github.com/openshift/openshift-ansible
# cd openshift-ansible
# git checkout release-3.9

・Dockerのインストール

# yum install -y docker-1.13.1
# systemctl enable docker
# systemctl start docker

Nodeのセットアップ

・Nodeサーバのネットワーク設定

# vim /etc/sysconfig/network-scripts/ifcfg-enp0s3
TYPE=Ethernet
BOOTPROTO=none
NAME=enp0s3
UUID=4590c759-b0cd-4dc2-9f43-779201c6593c
DEVICE=enp0s3
ONBOOT=yes
IPADDR=192.168.11.12
PREFIX=24
GATEWAY=192.168.11.1
DNS1=192.168.11.10

# hostnamectl set-hostname node01.openshift.example.com
# systemctl restart network

・必要パッケージのインストール
Masterと同じパッケージをインストール

# yum install -y wget git net-tools bind-utils yum-utils iptables-services bridge-utils bash-completion kexec-tools sos psacct unzip
# yum update
# systemctl reboot

・Dockerのインストール

# yum install -y docker-1.13.1
# systemctl enable docker
# systemctl start docker

OpenShift Originのインストール

Masterサーバ上で以下を実行する。

SSHログイン時の対話型パスワード入力の抑止

# ssh-keygen
# for host in master.openshift.example.com node01.openshift.example.com; do ssh-copy-id -i ~/.ssh/id_rsa.pub $host; done

・/etc/ansible/hostsの設定

[OSEv3:children]
masters
nodes
etcd

[OSEv3:vars]
ansible_ssh_user=root
openshift_deployment_type=origin
openshift_master_default_subdomain=cloudapps.example.com
openshift_disable_check=memory_availability,disk_availability

[masters]
master.openshift.example.com

[etcd]
master.openshift.example.com

[nodes]
master.openshift.example.com openshift_node_labels="{'region': 'infra', 'zone': 'default'}"
node01.openshift.example.com openshift_node_labels="{'region': 'primary', 'zone': 'default'}"

・OpenShiftインストールの実行

# ansible-playbook -i /etc/ansible/hosts ~/openshift-ansible/playbooks/prerequisites.yml
# ansible-playbook -i /etc/ansible/hosts ~/openshift-ansible/playbooks/deploy_cluster.yml

インストール後の初期セットアップ

後日追記予定

minikube startのプロキシ越え

プロキシ環境化でminikubeのセットアップにはまったのでメモ。
詳細は https://github.com/kubernetes/minikube/issues/530 を参照。

環境

プロキシ環境化でのminikube start方法

export HTTP_PROXY=http://xxx.xxx.xxx.xxx:xxxx
export HTTPS_PROXY=http://xxx.xxx.xxx.xxx:xxxx
export NO_PROXY=192.168.99.100

minikube start --vm-driver virtualbox --docker-env HTTP_PROXY=$HTTP_PROXY --docker-env HTTPS_PROXY=$HTTPS_PROXY

環境変数NO_PROXYの設定がない場合、minikube startがタイムアウトして起動しない。

Starting local Kubernetes v1.10.0 cluster...
Starting VM...
Downloading Minikube ISO
 150.53 MB / 150.53 MB [============================================] 100.00% 0s
Getting VM IP address...
Moving files into cluster...
Downloading kubeadm v1.10.0
Downloading kubelet v1.10.0
Finished Downloading kubelet v1.10.0
Finished Downloading kubeadm v1.10.0
Setting up certs...
Connecting to cluster...
Setting up kubeconfig...
Starting cluster components...
E0427 16:33:36.335447    3153 start.go:276] Error starting cluster:  timed out waiting to unmark master: getting node minikube: Get https://192.168.99.100:8443/api/v1/nodes/minikube: Service Unavailable

miniikube startして1〜2分しても応答がない場合、minikube logs -f でログを確認すると、以下のようなログ数行間隔で繰り返し出力されている。

minikube logs -f 
Apr 27 07:32:45 minikube kubelet[3541]: E0427 07:32:45.564897    3541 reflector.go:205] k8s.io/kubernetes/pkg/kubelet/kubelet.go:460: Failed to list *v1.Node: Get https://192.168.99.100:8443/api/v1/nodes?fieldSelector=metadata.name%3Dminikube&limit=500&resourceVersion=0: dial tcp 192.168.99.100:8443: getsockopt: connection refused

起動に途中で失敗した場合、環境をクリーンにした上でリトライする。

minikube delete
rm -rf ~/.minikube

Elasticsearchのインデックス開きすぎによるヒープメモリ枯渇

この記事はElastic stack Advent Calendar 2017の12/6分の記事です。

ElasticスタックによるApacheアクセスログやsar情報などのメトリクス収集を初めて導入した後の頻出トラブルとして、インデックスのオープンしすぎによるJavaヒープメモリ枯渇がある。

検索エンジン用途や、運用監視業務に組み込むような「本気の」運用では、事前にサイジングが行われる。しかし、まずはシステム状況が可視化できるかお試しで導入を始めると、とりあえず運用を始め、インデックスのクローズや削除、スナップショットの定期取得などの運用管理計画はどうしても漏れがちとなる。

では、具体的にElasticsearchはだいたい何ヶ月分のメトリクスが保存できるのが次の疑問になるが、以下のような多様な要素が作用するため、要件に合わせて実機検証が必要となる。

  • 登録するメトリクスの種類
    • Apacheアクセスログだけか、sar ALL相当も含めるか、MetricBeatなどのBeatsは使うか、Beatsを使うなら何台分のマシン情報を収集するのか など
  • Mapping設計
    • フィールド数が増えるほど、ElasticsearchのJavaヒープメモリ使用量は増える
  • Kibanaダッシュボード内容や表示期間、アクセス頻度

ここまではWeb上によくある情報だが、では実際にどの程度Elasticsearchに保持できるのか、実測してみた。

どの程度Elasticsearchに保存できるのか

以下の測定条件で実測してみた。

  • Elasticsearch 6.0.0
  • クラスタ構成。1インスタンスのみ。
  • 登録するのはApacheのCOMBINEDログのみ
  • シャード数はデフォルトの5
  • Javaヒープメモリサイズはデフォルトの1GB
    • Logstashのデフォルトに沿って、日別インデックス (logstash-%{yyyymmdd})
    • 具体的なLogstash設定は以下の通り
input { stdin {} }

filter {
  grok { match => { "message" => "%{HTTPD_COMBINEDLOG}" } }

  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    remove_field => ["timestamp"]
  }

  useragent {
    source => "agent"
    target => "useragent"
    remove_field => ["agent"]
  }

  mutate {
    remove_field => ["message"]
  }
}

output {
  elasticsearch {}
}
測定結果
  • 1インデックスあたり1ドキュメントの場合
    • 約1100インデックス(5500シャード)でCMS-GCループによる応答遅延発生
  • 1インデックスあたり10000ドキュメントの場合
    • 約500インデックス(2500シャード)でCMS-GCループによる応答遅延発生

Javaヒープメモリが枯渇し始め、GCのオーバヘッドが高くなると、以下のようなログがelasticsearch.logに大量に出力される。

[2017-12-05T18:02:17,978][WARN ][o.e.m.j.JvmGcMonitorService] [lvj6mYE] [gc][4474] overhead, spent [1.3s] collecting in the last [2.3s]

インデックスの大量作成を行い、Javaヒープメモリが枯渇した状態では、GCログは以下のような状態となる。水色の縦線がCMS-GCの発生を示しており、やがてヒープが完全に枯渇し始めると黒色の縦線のFull GCが連続して発生する。
f:id:n_agetsuma:20171205182809p:plain
1インデックスあたりの1ドキュメントで5500シャード作成した場合、ヒープダンプを取得してヒストグラムを取得すると、以下のようになる。5500インスタンスのクラスが複数存在し、Elasticsearchの内部実装として、シャード数ごとにインスタンス生成があり、シャードが増えるほどJavaヒープメモリ消費量が増えることが考察できる。
f:id:n_agetsuma:20171205184057p:plain

実測結果からの考察

ヒープ拡大やスケールアウトにより、より大量のインデックスをオープン状態にすることは可能なため、この結果からは確かなことは言えない。

しかし、1ノードでオープンできるシャード数は数十万などの大きなものではなく、多くても数千単位であることが考察できる。非クラスタ構成のElasticsearchでは、以下のような前提においては1〜2ヵ月分程度のインデックスが一度にオープンできてKibanaから閲覧できる量である。それより古いドキュメントはスナップショットを取得して削除するか、"Hot-Warmアーキテクチャ"に代表される、クラスタのdataノード構成を考える必要がある。

  • Apacheアクセスログ相当のフィールド数を持つメトリクスを10種類収集。
    • メトリクスごとにインデックス名を分ける。1日10インデックス。
    • シャード数はデフォルトの5
  • クラスタ構成のElasticsearch
  • 1日あたり、各メトリクスごとに10000ドキュメント

まとめ

Apacheアクセスログを対象に、Elasticsearchのデフォルト設定の場合、1ノードでどの程度のインデックスをオープンするとJavaヒープメモリ1GBが枯渇するか、実測してみました。

  • 1インデックス1ドキュメントの場合: 5500シャード
  • 1インデックス10000ドキュメントの場合: 11000シャード

環境や保存したいデータによっても結果は異なるため、あくまで上記は実測例です。

初めてElasticスタックによるメトリクス収集を導入する場合、Elasticsearchに蓄積したインデックスは定期的にクローズしないと、ディスクに余裕があっても、Javaヒープメモリが枯渇してElasticsearchが無応答になることに注意が必要です。

ElasticsearchのBulk API効果を実測する

Elasticsearchには、クライアントから複数のリクエストをまとめて送信するBulk API機能がある。JDBCのexecuteBatchのようなイメージで使える。

JDBCの場合、Oracleのドキュメントでは、バッチ単位のガイドライン*1として「バッチ・サイズを50から100の一般的な範囲に保つことをお薦めします」が示されている。Elasticsearchの場合、バルクサイズごとにどの程度スループットに差が出るのか実測してみた。

測定環境

  • Elasticsearch 6.0.0-rc2
  • Elasticsearch6で導入されたJava High Level REST Client
  • JUnit 5
  • Oracle JDK 1.8.0_152
  • MacBook Pro (13-inch, Late 2016)
  • テストアプリケーションからlocalhost上のElasticsearchに接続

テストの流れ

  1. @BeforeEachでテスト用のインデックスとそのマッピングを作成。自動マッピング作成によってテスト結果が揺れないようにするため。
  2. テスト実行。JIT影響を鑑みて10回実行する。10回中一番早かったレスポンスを測定対象とする。
  3. @AfterEachでテスト用インデックスを削除

テストパターン

  1. 通常のIndex API(Bulk API未使用)
  2. Bulk API - バルクサイズ 10/50/100/1000/10000 の5パターン

テストコード全体は以下の通り。
JSON処理の高速化についてはKOMIYAさんの記事を参考に、jacksonのAfterburnerModuleを使ったり、JSONをStringではなくbyte[]で扱っている。テストコードの全体像はGitHub - n-agetsu/elastic-restclientを参照。

class IndexPerformanceTest {
    private static final Logger LOG = LogManager.getLogger(IndexPerformanceTest.class);

    private static final HttpHost ES_HOST = new HttpHost("localhost", 9200, "http");
    private static final int SEND_DOC_NUM = 10000;
    private static final String INDEX = "posts";
    private static final String TYPE = "doc";

    private static ObjectMapper BURN_MAPPER;
    private static RestHighLevelClient highLevelClient;
    private static RestClient client;

    @BeforeAll
    static void setup() throws JsonProcessingException {
        highLevelClient = new RestHighLevelClient(RestClient.builder(ES_HOST));
        client = RestClient.builder(ES_HOST).build();
        BURN_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
    }

    @BeforeEach
    void beforeEach() throws IOException {
        // create mapping
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support Mapping API.
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("settings")
                     .field("number_of_shards", 1)
                   .endObject()
                    .startObject("mappings")
                      .startObject(TYPE)
                        .startObject("properties")
                          .startObject("user")
                            .field("type", "text")
                          .endObject()
                          .startObject("postDate")
                            .field("type", "date")
                          .endObject()
                          .startObject("message")
                            .field("type", "text")
                          .endObject()
                        .endObject()
                      .endObject()
                    .endObject();
        }
        builder.endObject();
        HttpEntity entity = new NStringEntity(builder.string(), ContentType.APPLICATION_JSON);
        Response response = client.performRequest("PUT", "/" + INDEX, Collections.emptyMap(), entity);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("create mapping posts failed, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @AfterEach
    void afterEach() throws IOException {
        // delete index for cleanup
        // Use low level REST client,
        // because Elasticsearch6.0-rc2 high level REST Client doesn't support DELETE INDEX API.
        Response response = client.performRequest("DELETE", "/" + INDEX);
        boolean acknowledged = BURN_MAPPER.readTree(response.getEntity().getContent()).get("acknowledged").asBoolean();
        if (!acknowledged) {
            LOG.error("delete index posts, cause {}" + EntityUtils.toString(response.getEntity()));
            System.exit(1);
        }
    }

    @DisplayName("Index Performance Test - Non Bulk API")
    @RepeatedTest(value = 10, name = RepeatedTest.LONG_DISPLAY_NAME)
    void testIndexRequest() throws IOException {
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            highLevelClient.index(request);
        }
    }

    @DisplayName("Index Performance Test - Bulk API")
    @ParameterizedTest(name = "run {0} with bulkSize {1}")
    @MethodSource("repeatCountAndBulkSize")
    void testIndexBulkRequest(int repeatCount, int bulkSize) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (int i = 0; i < SEND_DOC_NUM; i++) {
            Post post = new Post("test user" + i, new Date(), "trying out Elasticsearch");
            IndexRequest request = new IndexRequest()
                    .index(INDEX).type(TYPE).id(String.valueOf(i))
                    .source(BURN_MAPPER.writeValueAsBytes(post), XContentType.JSON);
            bulkRequest.add(request);
            boolean last = i == SEND_DOC_NUM - 1;
            if (i % bulkSize == 0 || last) {
                highLevelClient.bulk(bulkRequest);
                if (!last) {
                    bulkRequest = new BulkRequest();
                }
            }
        }
    }

    private static Stream<Arguments> repeatCountAndBulkSize() {
        // test 10 times per batchSize
        Stream.Builder<Arguments> builder = Stream.builder();
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 50)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 100)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 1000)));
        IntStream.range(0, 10).forEach(i -> builder.add(Arguments.of(i, 10000)));
        return builder.build();
    }

    @AfterAll
    static void cleanup() {
        try {
            highLevelClient.close();
        } catch (IOException e) {
            LOG.error(e);
        }

        try {
            client.close();
        } catch (IOException e) {
            LOG.error(e);
        }
    }
}

測定結果

テスト項目 処理時間
通常のIndex API(Bulk API未使用) 6.46 s
Bulk API(バルクサイズ10) 1.08s
Bulk API(バルクサイズ50) 492ms
Bulk API(バルクサイズ100) 383ms
Bulk API(バルクサイズ1000) 266ms
Bulk API(バルクサイズ10000) 254ms

Bulk APIの利用により、バッチサイズが10のような小さな値でも、有効なことがわかる。このケースではバルクサイズ1000ドキュメント程度が効率が良かった。

バルクサイズを大きな値にするとドキュメントが貯まるまでクライアントでデータ保持して、データの発生からElasticsearchで検索可能となるまで遅延が大きくなる。このため、いくつが良いとは一概には言えないが、まとまった単位のリクエストを投げる時には値はいずれにしても、Bulk APIを使った方が良いことは上記の値から判断できる。

Elasticsearch側の最大リクエストサイズに上限としてhttp.max_content_lengthがあるが、デフォルトは100MBと非常に大きな値である。Elasticsearchへの同時並行リクエストが多い場合は、大きなリクエストを並行に受け付けることでOutOfMemoryErrorに繋がる可能性があるが、並列数が少ない場合は積極的にバッチサイズを拡大できると考える。

おまけ Javaの性能測定時はJIT影響に注意

Bulk API未使用時の10回テストした時の値を以下のグラフに示すが、JavaVMを再起動せずにmaven-surefire-pluginによって繰り返しテスト実行していくうちに、最初の測定値の2倍近く性能向上した (13秒→6秒)。デフォルトでは-XX:CompileThreshold=10000により、10000回のメソッド実行でJITコンパイル対象となり、この影響は性能に大きな差を与える。

f:id:n_agetsuma:20171113210142p:plain

Javaのアプリケーションでは、起動直後にパフォーマンスを測定せず、必ずウォームアップしてから測定する必要がある。

Kafkaコンシューマのリバランスはいつ行われるか

Kafkaのコンシューマグループに、コンシューマを追加すると、各コンシューマがどのパーティションを読み取るか再割り当てするリバランスが行われる。

Kafka0.11.0.1を対象に、以下の観点でKafkaの実装を調べてみた結果をまとめる。

  • リバランスはいつ行われるのか
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
  • リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスはいつ行われるのか

KafkaConsumer.poll()メソッドの内部で行われる。

Kafkaのコンシューマの実装はだいたい以下のようになる。コード全体はgist参照。
Kafka Comsumer Sample · GitHub

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

コンシューマを起動し、どのパーティションのサブスクライブを始めるのかの決定は、poll(long timeout)メソッドの内部で行われる。このとき、コンシューマからKafkaプロトコルJoinGroupと呼ばれるリクエス*1をブローカーに送信し、その応答メッセージにどのパーティションのサブスクライブをすべきか含まれている。

リバランスの処理はpollメソッドを呼び出したスレッドで行われ、メッセージのフェッチ要求の前に行われる。ここまでの挙動は、KafkaConsumer.pollOnceメソッドを読み進めていくとわかる。

なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか

既存コンシューマグループにコンシューマが追加された場合、既にコンシューマグループに含まれている全てのコンシューマからJoinGroupリクエストが来るまで、JoinGroupレスポンスが返されず、パーティションの割り当てが行われないため。

既存のコンシューマによっては、メッセージ受信後の処理が重い、マシンリソース枯渇などの要因により、次のpoll(long timeout)が呼ばれるまで時間がかかるケースがあるだろう。

参加したいコンシューマグループに含まれる、全てのコンシューマでpoll(long timeout)が実行されるまでは、新規参加したコンシューマにパーティションが割り当てられず、メッセージの配信は始まらない。

リバランスが起こった時に重複メッセージ処理は発生しないか

リバランスが起こった時に、オフセットコミットのタイミングによっては、既存コンシューマと新規コンシューマに重複メッセージが配信されないかについて。

これはオフセットのコミット方式にも依存する。

autoCommitの場合

既存コンシューマでは、poll()メソッド内部のリバランスの前処理として、commitSync()によるオフセットコミットを行うため、リバランスの発生を原因としたメッセージ重複は起こらない*2。新規に追加されたコンシューマでは、リバランス直前にコミットされたオフセットを読み込んでサブスクライブを開始する。

このリバランス直前のオフセットコミット処理はConsumerCoodinator.onJoinPrepareメソッドで実装されている。

手動コミットの場合

前述のコンシューマのコード例を再掲するが、以下のようにpoll()の直前でcommitSync()を実行して正常終了している場合、poll()内部でリバランスが発生した時には常に最新のオフセットがコミットされているため、メッセージ重複は起こらない。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties());
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpoll()の実行タイミングで、コンシューマグループに初めてコンシューマがアサインされた場合、
    // または、既に実行されているコンシューマにおいて、自身が所属するコンシューマグループに
    // 新規コンシューマからジョイン要求が投げられていた場合にリバランス処理が行われる
    ConsumerRecords<String, String> records = consumer.poll(1000);
    records.forEach(record -> {
        LOG.info("partition = {}, offset = {}, key = {}, value = {}",
                record.partition(), record.offset(), record.key(), record.value());
    });
    consumer.commitSync();
}

上記のコードのcommmitSync()をcommitAsync()に置き換えた場合、メッセージ重複が起こらないかについては、実装から読み取れなかった。

手元で調べる限りでは、Kafkaは同一クライアントからのリクエストを逐次的に処理するように見える。commitAsync()によるオフセットのコミット要求の処理が終わるまで、poll()メソッドから送られたJoin Groupリクエストのハンドリングを行わない。しかし、ネットワーク遅延などの要因により、オフセットコミット要求とJoin Group要求の順序が逆になってKafkaに届いた場合、Join Group要求時に最新オフセットがコミットされていない可能性がある。オライリーKafka: The Definitive Guideで紹介されているように、ConsumerRebalanceListenerを実装して、リバランス開始前にコールバックされるonPartitionsRevokedメソッドでcommitSync()を実行した方が安全だろう。

まとめ

  • リバランスはいつ行われるのか
    • KafkaConsumer.poll()メソッドの内部で行われる
  • なぜ新規コンシューマ追加しても、すぐにメッセージ配信が始まらないのか
    • 既存コンシューマがpoll()を実行してJoin Groupリクエストが投げられるのをKafkaが待っているため
  • リバランスが起こった時に重複メッセージ処理は発生しないか
    • autoCommitではリバランス直前にコミットされる。手動コミットでは、poll()の直前にcommitSync()を発行していれば、重複メッセージ処理は発生しない。

*1:他にKafkaにどのようなプロトコルがあるかは KafkaApis.scalaを読むと何となくわかる

*2:autoCommitではコンシューマプロセスクラッシュ時に、前回コミット以降に処理されたメッセージが重複メッセージ配信されるため、重複メッセージ配信が許容できない要件では利用できない。

Kafkaコンシューマのrecords-lagは何の値か

Kafkaコンシューマの監視した方が良いメトリクスとして、datadog*1のドキュメントや、HortonworkによるKafkaのベストプラクティス*2ではrecords-lagが紹介されている。

このコンシューマのlagとは具体的にどのように計算された値か、以降にまとめる。以下の内容は、KafkaのJavaクライアント0.11.0.0で調べている。

records-lagについて

records-lagとは、Kafkaのコンシューマのメトリクスで、コンシューマプロセスのJMX MBeanとして公開されている。この値は、Kafkaのブローカー側の最大オフセットと、コンシューマが処理済みのオフセットの差分を示す。jconsoleで見ると、以下のようにmy-topic-0.records-lagmy-topic-0.records-lag-avgmy-topic-0.records-lag-maxのように、トピック名-パーティション番号の命名規則で公開される。例の場合はコンシューマが1つのパーティションしかサブスクライブしていないが、複数のトピックやパーティションをサブスクライブしていた場合は、その分の属性が作られる。

f:id:n_agetsuma:20171009204750p:plain


例えば、ブローカー側のあるパーティションのオフセットが10000で、コンシューマが2000まで読み込んでいた場合、records-lagは8000で8000件のメッセージは未処理であることを示す。

records-lagは0は基本的に0であることが望ましい。このラグが定常的に発生し、かつ増え続けている場合、プロデューサからのメッセージ送信の頻度に対して、コンシューマの処理性能が足りていないことを示す。コンシューマの処理の中で受信したメッセージをDBに書き込んでおり、そのDBの処理性能が遅延している。または、そもそもコンシューマの処理性能が足りず、コンシューマグループにコンシューマを足してスケールアウトの必要がある等が、ラグが増え続ける一般的な原因である。何らかの対処を行わないと、まだコンシューマに処理されていないメッセージがlog.retention.hours(デフォルト1週間)以上経過して削除されてしまう。

以前のKafkaではコンシューマ全体で過去最も大きなラグが発生したかをrecords-lag-maxで確認できたが、それがどのトピックのどのパーティションが発生したものなのか、また現在のラグはいくつなのかを把握することはできなかった。しかし、現在ではKakfa0.10.2.0の以下のissueの対応により、サブスクライブしているパーティションごとに、現在値、平均値、過去の最大値が確認可能となっている。
[KAFKA-4381] Add per partition lag metric to KafkaConsumer. - ASF JIRA

records-lagはどうやって計算されているか

records-lagがいつ、どうやって計算された値なのかを把握するためには、Kafka Javaクライアントのコンシューマの仕組みを簡単に把握する必要がある。

以下のように、よくあるコンシューマのpollループのコードを例に考える。KafkaConsumer.pollメソッドがどうやってメッセージをブローカーから取得しているのか、実装を読んでみた。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    // 以下のpollメソッドは何をしてるのか?
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", 
            record.partition(), record.offset(), record.key(), record.value());
    }
}

全体を図にすると以下のようになる。
f:id:n_agetsuma:20171009211504p:plain

  1. pollメソッドを呼び出すと、Fetcherはコンシューマに割り当てられている各パーティションのデータをフェッチする。この時に、パーティションごとに最大でデフォルト1MB(max.partition.fetch.bytes)のメッセージをまとめてフェッチする。このフェッチのタイミングで、フェッチしたパーティションの現在の最大オフセット値を取得。
  2. フェッチ済みのメッセージの中から、最大でデフォルト500メッセージ(max.poll.records)をpollメソッドの呼び出し元に返す。
    • 既に500メッセージ以上をフェッチ済みであった場合は、新たなフェッチを行わず、フェッチ済みのバッファからメッセージを取得。
    • pollメソッドの呼び出し時に、フェッチ時に取得したパーティションの最大オフセットと、pollメソッドで返したオフセットの差分としてrecords-lagを計算。

例えば、フェッチ時に取得したパーティションの最大オフセットが10000であり、pollメソッドで500メッセージを返すと、records-lagは9500となる。もう一回pollメソッドを回すと、バッファ済みメッセージから500メッセージを取得して、records-lagは9000に縮小する。

遅延が発生してもrecords-lagが拡大しない注意したいパターン

先ほど最大オフセットはフェッチ時に取得、records-lagの取得はpoll時に行うと太字にしたのは意味がある。この仕組みにより、コンシューマで遅延が拡大していても、records-lagが拡大しないケースがある。

  1. 10000メッセージフェッチして、今のところ1000メッセージがpoll済み。この時点でのrecords-lagは9000。
  2. 新たなメッセージをフェッチする前に、ブローカー側に10000メッセージが追加。しかし、この時点で計算されるrecords-lagは9000。
    • 何故ならば、新たなフェッチが発生しておらず、records-lagの計算に使われるパーティションの最大オフセット値が10000のままであるため。フェッチ済みのメッセージの処理が終わるまでは、records-lagの値は減り続ける。
    • 次のフェッチのタイミングでrecords-lagが一気に拡大する。

records-lagはフェッチ時に取得したパーティションの最大オフセット値をベースとしているため、フェッチ後に大量にメッセージの追加が行われても、次のフェッチまでrecords-lagの値には反映されない仕組みになっている。

Burrowの場合

若干脱線するが、LinkedInが作っているコンシューマのラグ監視計測ツールBurrowでは、再現はできていないが、仕組み上このような問題は発生しないと思われる。Burrowではコンシューマのコミット済オフセットが入った__consumer_offsetトピックの値と、データが入ったトピックの最大オフセット値の両方を取得し、その値の差分でラグを計算しており、コンシューマのJMX MBeanの値を使っていない。

まとめ

  • Kafkaコンシューマの監視にはJMX Meanのrecords-lagが有効
  • Kakfa0.10.2.0からはパーティションごとにrecords-lagの監視が可能
  • メッセージのフェッチ直後に、大量のメッセージ追加があると、records-lagの値には反映されないので注意。

tcpdumpからKafkaのリトライ可能エラーを追う

Kafkaのメッセージロストの原因の1つに、プロデューサの設定のretries(デフォルト0:リトライしない)の未設定がある。

Kafkaクライアントの送信リトライ

プロデューサからメッセージ送信時のエラーには、retires設定によりKafkaのクライアントライブラリ内で自動リトライ可能なエラーと、リトライせずにProducer.sendメソッドの呼び出し元に例外を投げるエラーがある。自動リトライ可能なエラーで発生する例外は、org.apache.kafka.common.errors.RetriableExceptionの継承クラスである。代表的なものを以下に示す。

retiresによりリトライするエラー

  • NotLeaderForPartitionException
    • リーダーパーティションではないブローカーにメッセージが送られた場合のエラー応答。ブローカー障害によるフェールオーバ中にメッセージを送信すると起こりやすい。
  • NetworkException
    • ブローカーからレスポンスがない場合。メッセージを送ってから、ackが返される間にブローカーやNWに障害が発生すると起こりやすい。
  • 一時的な障害で、リトライすれば成功の見込みがある場合

リトライしないエラー

  • SerializationException
    • key.serializerやvalue.serializerのシリアライザでエラーが発生した場合
  • RecordTooLargeException
    • ブローカー設定のreplica.fetch.max.bytes(デフォルト1048576バイト)を超えるメッセージが送信された場合
  • リトライしても成功の見込みがないエラー

Kafkaとしてはリトライ可能なエラーを返しているが、クライアントがリトライしないでメッセージロストしている場合、クライアント側のログには出力されるが、Kafkaのブローカー側には特に何も出ていないこともある。Kafkaクラスタを運用管理する立場で、クライアント側より「たまにメッセージロストしているように見えます。クライアント側のログはありません。」と申告を受けても、tcpdumpの取得によりリトライエラーが返されていることを特定できる場合がある。

WiresharkによるKafkaプロトコル解析

WiresharkにはKafkaプロトコルを解釈する機能がある。0.10系までのプロトコル解釈には対応しているが、最新の0.11系には2017/09/26時点ではまだ対応していない。

デフォルトでは9092ポートの通信をKafkaの通信として解釈している。もし9092ポート以外もKafkaプロトコルとして解釈させたい場合は、Wiresharkの[Preferences] - [Protocols] - [Kafka] の設定で変更できる。
f:id:n_agetsuma:20170926171300p:plain

この状態でKafkaの通信を含むpcapファイルを読み込ませると、以下のようにKafkaプロトコルを解釈して表示される。
f:id:n_agetsuma:20170926171024p:plain

Kafkaは一般的に高トラフィックな環境で使われることが多いため、ローテーションでpcapを定期的に捨てるようにし、バッファも広げてなるべくdropped by kernelのパケットが増えないようにする。IO影響をなるべく避けるために、pcapファイルの出力先はlog.dirsの設定先以外にする。

tcpdump -w /path/kafka.pcap -i enp0s3 -C 1024 -W 10 -z gzip -nn -B 524288

取得したpcapファイルをwiresharkに読み込ませ、フィルタ設定にkafka.error > 0を設定すると、エラー応答の一覧が表示される。エラーコード0はNoErrorを示し、異常があると1以上のエラーコードを返すため、0より大きい値でフィルタする。

f:id:n_agetsuma:20170926132913p:plain

Kafkaのレスポンスメッセージはエラーコードが含まれており、上記の例の場合ではNot Leader For Partition (6)、リーダーパーティションではないブローカーにメッセージが送信されたため、エラーコード6を返している。

エラーコードの一覧と例外クラスとのマッピングは、KafkaのErrorMapping.scalaにある。今回の例であれば、エラーコード6がNotLeaderForPartitionExceptionに結びついており、NotLeaderForPartitionExceptionはRetriableExceptionの継承クラスのため、リトライ可能エラーが発生している。

ErrorMapping.scalaには、コンシューマリクエスト時などのすべてのエラーコードが記載されているが、その中でもプロデューサからのメッセージ送信時に発生するエラーコードの一覧は、ProduceResponseクラスにコメントしてまとめられている。

クライアント側ログによるリトライ発生の確認

前述のpcapの確認結果より、少なくともブローカー側からはリトライ可能なエラーが返されていることがわかった。クライアントにretriesの設定が有効となっており、retry.backoff.ms(デフォルト100ミリ秒)の間隔でリトライされているかは、クライアント側のWARNログ出力から確認できる。

2017-09-25 21:22:24.954 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - Got error produce response with correlation id 18504 on topic-partition my-topic-1, retrying (9 attempts left). Error: NOT_LEADER_FOR_PARTITION
2017-09-25 21:23:45.469 [kafka-producer-network-thread | producer-1] WARN  o.a.k.c.producer.internals.Sender - Got error produce response with correlation id 87895 on topic-partition my-topic-1, retrying (9 attempts left). Error: NETWORK_EXCEPTION

my-topicのパーティション番号1でリトライが発生しており、その原因となるエラーが出力されている。

まとめ

  • Kafkaのエラーにはプロデューサで送信リトライするエラーとしないエラーがある
  • プロデューサ設定のretriesはデフォルト0であり、自動リトライ可能なエラーが発生しても送信リトライしない
  • Kafka0.10系まであればWiresharkでKafkaプロトコルを解釈できる。自動リトライ可能なエラーの発生有無をpcapから特定できる。
  • 送信リトライの発生はクライアント側でWARNログとして出力される