LogstashからIngest Nodeへの移行
Elasticsearch 5.xからはIngest Nodeを使うと、Logstashを使わなくてもElasticsearchでログの変換ができるとElastic社のスライドにまとまっていたので、LogstashからIngest Nodeへの移行についてメモ。
LogstashからIngest Nodeへの移行
今までFilebeatで集めてきたログをLogstashに送ってjson変換していたところ、Elasticsearchで直接json変換できるようになるため、Logstashを使わなくてもログの収集と可視化が可能となる。
- Filebeat(収集) -> Logstash(変換) -> Elasticsearch(蓄積)
- Filebeat(収集) -> Elasticsearch(変換/蓄積)
Logstashのfilterプラグインの多くはIngest Nodeの機能にProcessorとして移植されている。Processor一覧はElasticsearch5.xのドキュメントにあるが、大まかな対応は以下の通り。Logstashのfilter名とIngest NodeのProcessor名には同じ名前が付いていることが多いが、LogstashのmutateがオプションごとにProcessorに分割されていたり、日付ごとのインデックス切り替えをProcessorで定義できたり、一部差分もある。
Logstash | Ingest NodeのProcessor |
---|---|
grok | Grok Processor |
date | Data Processor |
mutate | Convert Processor, Gsub Processor, Join Processor, Lowercase Processor ... |
ruby | Script Processor (言語はElasticsearch固有のPainless Scripting Language) |
useragent | Ingest user agent processor plugin (非バンドルプラグイン) |
日付ごとのインデックス切り替え (elasticsearch {index => "logstash-%{+YYYY.MM.dd}"}) | Date Index Name Processor |
if文使える | 少なくともAlpha5の段階では使えない |
また、Ingest Nodeはあくまで変換機能のみを担うため、Logstashのinputプラグインにあるような、Kafkaからの入力や、HTTP Pollerによる定期的なREST API経由のメトリクス収集、JDBCによるRDBMSからのデータ吸い上げのような機能のサポートはない。また、Kibana5にCSVファイルアップロード機能が追加されているからか、csvフィルタ相当のProcessorも今のところない。
まとめると、Elasticsearch5.0.0 alpha5の段階では、以下のようなケースでは引き続きLogstashの方が便利。
- Kafka、HTTP Poller、JDBCなどを入力ソースとしたい場合
- Logstashのif文相当を実行したい場合
Ingest Nodeのpipeline設定
以下のような障害解析でよく可視化するログを対象に、Logstashの場合とIngest Nodeの場合の設定例をそれぞれまとめる。
Apache (combined形式)のアクセスログ
apache-loggenを使って生成したダミーのcombined形式ログを対象とする。
168.66.82.175 - - [12/Sep/2016:18:26:03 +0900] "GET /item/books/3538 HTTP/1.1" 200 82 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
Logstashの場合
input { stdin {} } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] remove_field => "timestamp" } mutate { convert => { "bytes" => "integer" } } } output { elasticsearch { index => "logstash-accesslog-%{+YYYY.MM.dd}" } }
Ingest Nodeのpipelineの場合
curl -XPUT localhost:9200/_ingest/pipeline/httpd-accesslog -d ' { "description": "Parse HTTPD accesslog", "processors": [ { "grok": { "field": "message", "patterns": ["%{COMBINEDAPACHELOG}"] } }, { "convert": { "field": "bytes", "type": "integer" } }, { "date": { "field": "timestamp", "formats": ["dd/MMM/yyyy:HH:mm:ss Z"] } }, { "date_index_name": { "field": "@timestamp", "index_name_prefix": "filebeat-accesslog-", "date_rounding": "d" } }, { "remove": { "field": "timestamp" } } ] } '
インデックス名をfilebeat-*始まる名前にしている理由は、後述するログ送信時にFilebeatを利用しており、Filebeatのインデックステンプレートを効かせるため。
sar (CPU使用率)
バイナリファイルのsarはsadfで変換するとパースしやすい。
sadf -t -d -P ALL <sarファイル名>
以下のようなログをパース対象とする。
centos7;-1;2016-09-14 21:02:39;LINUX-RESTART # hostname;interval;timestamp;CPU;%user;%nice;%system;%iowait;%steal;%idle centos7;599;2016-09-14 21:20:01;-1;0.20;0.00;0.12;0.01;0.00;99.67 centos7;599;2016-09-14 21:20:01;0;0.26;0.00;0.11;0.01;0.00;99.61 centos7;599;2016-09-14 21:20:01;1;0.14;0.00;0.13;0.01;0.00;99.73 centos7;-1;2016-09-14 22:05:04;LINUX-RESTART
Logstashの場合
input { stdin {} } filter { # drop restart line. ex centos7;-1;2016-09-16 00:12:40;LINUX-RESTART if [message] =~ /LINUX-RESTART$/ { drop {} } # drop header line if [message] =~ /^# hostname/ { drop {} } csv { separator => ";" columns => ["hostname","interval","timestamp","cpu","user","nice","system","iowait","steal","idle"] convert => { "interval" => "integer" "user" => "float" "nice" => "float" "system" => "float" "iowait" => "float" "steal" => "float" "idle" => "float" } } date { match => [ "timestamp", "yyyy-MM-dd HH:mm:ss"] remove_field => "timestamp" } } output { elasticsearch { index => "logstash-sar-cpu-%{+YYYY.MM.dd}" } }
Ingest Nodeのpipelineの場合
curl -H "Expect:" -XPUT localhost:9200/_ingest/pipeline/sadf-cpu -d '{ "description": "sadf -t -d -P ALL", "processors": [ { "grok": { "field": "message", "patterns": ["%{DATA:hostname};%{NUMBER:interval};%{TIMESTAMP_ISO8601:timestamp};%{DATA:cpu};%{NUMBER:user};%{NUMBER:nice};%{NUMBER:system};%{NUMBER:iowait};%{NUMBER:steal};%{NUMBER:idle}"] } }, { "date": { "field": "timestamp", "formats": ["yyyy-MM-dd HH:mm:ss"] } }, { "date_index_name": { "field": "@timestamp", "index_name_prefix": "filebeat-sar-cpu-", "date_rounding": "d" } }, { "remove": { "field": "timestamp" } }, { "convert": { "field": "user", "type": "float" } }, { "convert": { "field": "nice", "type": "float" } }, { "convert": { "field": "system", "type": "float"} }, { "convert": { "field": "iowait", "type": "float"} }, { "convert": { "field": "steal", "type": "float"} }, { "convert": { "field": "idle", "type": "float"} } ] }'
Expectヘッダを明示的に付与しているのは、1024バイト以上のリクエスト送るとElasticsearch側でエラーになる問題を回避するため。v5.0.0-beta1 で修正予定なっているため、GAになる頃にはこの回避策は不要になるだろう。
- https://github.com/elastic/elasticsearch/issues/19893
- https://github.com/elastic/elasticsearch/issues/19834
また、drop相当の機能はAlpha5の段階では見当たらないため、『centos7;-1;2016-09-14 21:02:39;LINUX-RESTART』のようにgrokでパースできない行にあたると、Elasticsearch側でエラーログが出力され、該当行のpipelineの処理はスキップされる。エラーログが気になる場合は、Filebeatのinclude-lineで送信したい行のフォーマットの正規表現が指定でき、Elasticsearchに送信するログをFilebeat側で絞り込むことが可能。
pidstat (プロセス別CPU使用率)
不安定なシステムにおいて定常的にcronで収集する。
JavaやPostgreSQLが同一のマシンのサービスとして稼働しているが、どれが重くなっているのかがわからないときに必要なデータ。
pidstat -U -h -l -p ALL
- -U ユーザIDではなくユーザ名で表示
- -h タイムスタンプをロケール依存ではなくエポックで出力するために使用
- -l コマンドラインオプションも収集。特にjavaはオプションまで見ないと、APサーバのプロセスなのかバッチタスクか識別できない
- -p ALL 全ユーザのプロセスのデータを収集
以下のようなログをパース対象とする。
$ pidstat -U -h -l -p ALL Linux 3.10.0-327.28.3.el7.x86_64 (centos7) 2016年09月16日 _x86_64_ (2 CPU) # Time USER PID %usr %system %guest %CPU CPU Command 1473953274 root 1 0.00 0.00 0.00 0.00 0 /usr/lib/systemd/systemd --switched-root --system --deserialize 21 1473953274 root 2 0.00 0.00 0.00 0.00 1 kthreadd
Logstashの場合
input { stdin {} } filter { if [message] !~ /^\s\d{10}/ { drop {} } grok { match => { "message" => "%{WORD:timestamp}%{SPACE}%{WORD:user}%{SPACE}%{WORD:pid}%{SPACE}%{NUMBER:usr}%{SPACE}%{NUMBER:system}%{SPACE}%{NUMBER:guest}%{SPACE}%{NUMBER:cpu_percent}%{SPACE}%{WORD:cpu}%{SPACE}%{GREEDYDATA:command}" } } date { match => [ "timestamp", "UNIX"] remove_field => "timestamp" } mutate { convert => { "usr" => "float" "system" => "float" "guest" => "float" "cpu_percent" => "float" } } } output { elasticsearch { index => "logstash-pidstat-%{+YYYY.MM.dd}" } }
Ingest Nodeのpipelineの場合
curl -H "Expect:" -XPUT localhost:9200/_ingest/pipeline/pidstat -d '{ "description": "pidstat -U -h -l -p ALL", "processors": [ { "grok": { "field": "message", "patterns": ["%{WORD:timestamp}%{SPACE}%{WORD:user}%{SPACE}%{WORD:pid}%{SPACE}%{NUMBER:usr}%{SPACE}%{NUMBER:system}%{SPACE}%{NUMBER:guest}%{SPACE}%{NUMBER:cpu_percent}%{SPACE}%{WORD:cpu}%{SPACE}%{GREEDYDATA:command}"] } }, { "date": { "field": "timestamp", "formats": ["UNIX"] } }, { "date_index_name": { "field": "@timestamp", "index_name_prefix": "filebeat-pidstat", "date_rounding": "d" } }, { "remove": { "field": "timestamp" } }, { "convert": { "field": "usr", "type": "float" } }, { "convert": { "field": "system", "type": "float"} }, { "convert": { "field": "guest", "type": "float"} }, { "convert": { "field": "cpu_percent", "type": "float"} } ] }'
FilebeatによるIngest Nodeへのログ送信
前述までの設定はあくまでpipelineの定義であるため、何らかの方法でこれらpipelineを宛て先にログを送信する必要がある。
ログを1行ずつ読み取ってcurlで送ることもできなくはないだろうが、ここではFilebeatを使う。一般的には低リソースで継続的にログを収集するエージェントとしてFilebeatが使われることが多いと思うが、Filebeatは標準入力にも対応しているため、ログを一式受領して問題解析のために解析する用途にも使える。
標準入力向けのFilebeatの設定ファイルを以下のように作成する。parameters.pipeline
で宛て先のパイプラインを設定する。以下の例では、前述で定義したパイプラインpidstatを宛て先としている。console用出力はログが流れているかの確認用のため、大量データを取り込む際にはコメントアウトした方がロードが早い。
filebeat.prospectors: - input_type: stdin output.elasticsearch: hosts: ["localhost:9200"] parameters.pipeline: "pidstat" output.console: # Boolean flag to enable or disable the output module. enabled: true # Pretty print json event pretty: true
標準入力経由でFilebeatに流し込む。
cat pidstat.log | ./filebeat -c filebeat.pidstat.yml
Filebeat 5.0.0 alpha5の時点では、すべてのログをロードしてもFilebeatは終了せず、そのままさらなる標準入力を待ち続ける。以下のissueがベータ1リリース時に入ってくるとこの問題も解決すると思う。
Allow filebeat to only run once #2456
まとめ
- Logstashのfilterは、Ingest NodeのProcessorで同じようなことが実現できる
- Ingest Nodeに定義したパイプラインへのログ送信はFilebeatの
parameters.pipeline
で簡単にできる
今回の例のように手動によるバルク取り込みでは、Logstash + Elasticsearchの組合せがFilebeat + Elasticsearchの組合せに変わるだけなのでメリットがないように感じられるが、elasticsearch-ingest-node-vs-logstash-performance-2など記事ではIngest Nodeの方が軽量とレポートされているため、実機で試してIngest Nodeの方が軽い場合は積極的に使うとより早くログが可視化できると思う。