LogstashからIngest Nodeへの移行

Elasticsearch 5.xからはIngest Nodeを使うと、Logstashを使わなくてもElasticsearchでログの変換ができるとElastic社のスライドにまとまっていたので、LogstashからIngest Nodeへの移行についてメモ。

LogstashからIngest Nodeへの移行

今までFilebeatで集めてきたログをLogstashに送ってjson変換していたところ、Elasticsearchで直接json変換できるようになるため、Logstashを使わなくてもログの収集と可視化が可能となる。

  • Filebeat(収集) -> Logstash(変換) -> Elasticsearch(蓄積)
  • Filebeat(収集) -> Elasticsearch(変換/蓄積)

Logstashのfilterプラグインの多くはIngest Nodeの機能にProcessorとして移植されている。Processor一覧はElasticsearch5.xのドキュメントにあるが、大まかな対応は以下の通り。Logstashのfilter名とIngest NodeのProcessor名には同じ名前が付いていることが多いが、LogstashのmutateがオプションごとにProcessorに分割されていたり、日付ごとのインデックス切り替えをProcessorで定義できたり、一部差分もある。

Logstash Ingest NodeのProcessor
grok Grok Processor
date Data Processor
mutate Convert Processor, Gsub Processor, Join Processor, Lowercase Processor ...
ruby Script Processor (言語はElasticsearch固有のPainless Scripting Language)
useragent Ingest user agent processor plugin (非バンドルプラグイン)
日付ごとのインデックス切り替え (elasticsearch {index => "logstash-%{+YYYY.MM.dd}"}) Date Index Name Processor
if文使える 少なくともAlpha5の段階では使えない

また、Ingest Nodeはあくまで変換機能のみを担うため、Logstashのinputプラグインにあるような、Kafkaからの入力や、HTTP Pollerによる定期的なREST API経由のメトリクス収集、JDBCによるRDBMSからのデータ吸い上げのような機能のサポートはない。また、Kibana5にCSVファイルアップロード機能が追加されているからか、csvフィルタ相当のProcessorも今のところない。

まとめると、Elasticsearch5.0.0 alpha5の段階では、以下のようなケースでは引き続きLogstashの方が便利。

  • Kafka、HTTP Poller、JDBCなどを入力ソースとしたい場合
  • Logstashのif文相当を実行したい場合

Ingest Nodeのpipeline設定

以下のような障害解析でよく可視化するログを対象に、Logstashの場合とIngest Nodeの場合の設定例をそれぞれまとめる。

Apache (combined形式)のアクセスログ

apache-loggenを使って生成したダミーのcombined形式ログを対象とする。

168.66.82.175 - - [12/Sep/2016:18:26:03 +0900] "GET /item/books/3538 HTTP/1.1" 200 82 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

Logstashの場合

input {
  stdin {}
}

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }

  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    remove_field => "timestamp"
  }

  mutate {
    convert => {
      "bytes" => "integer"
    }
  }
}

output {
  elasticsearch {
    index => "logstash-accesslog-%{+YYYY.MM.dd}"
  }
}

Ingest Nodeのpipelineの場合

curl -XPUT localhost:9200/_ingest/pipeline/httpd-accesslog -d '
{
  "description": "Parse HTTPD accesslog",
  "processors": [
    { "grok": { "field": "message", "patterns": ["%{COMBINEDAPACHELOG}"] } },
    { "convert": { "field": "bytes", "type": "integer" } },
    { "date": { "field": "timestamp", "formats": ["dd/MMM/yyyy:HH:mm:ss Z"] } },
    {
      "date_index_name": {
        "field": "@timestamp",
        "index_name_prefix": "filebeat-accesslog-",
        "date_rounding": "d"
      }
    },
    { "remove": { "field": "timestamp" } }
  ]
}
'

インデックス名をfilebeat-*始まる名前にしている理由は、後述するログ送信時にFilebeatを利用しており、Filebeatのインデックステンプレートを効かせるため。

sar (CPU使用率)

バイナリファイルのsarはsadfで変換するとパースしやすい。

sadf -t -d -P ALL <sarファイル名>
  • -t 時刻をUTCではなくローカルのタイムゾーン時間を表示
  • -d ; 区切りで表示
  • -P ALL CPUコアごとにCPU使用率を出力。-1がALL相当。0はcpu0、1はcpu1。

以下のようなログをパース対象とする。

centos7;-1;2016-09-14 21:02:39;LINUX-RESTART
# hostname;interval;timestamp;CPU;%user;%nice;%system;%iowait;%steal;%idle
centos7;599;2016-09-14 21:20:01;-1;0.20;0.00;0.12;0.01;0.00;99.67
centos7;599;2016-09-14 21:20:01;0;0.26;0.00;0.11;0.01;0.00;99.61
centos7;599;2016-09-14 21:20:01;1;0.14;0.00;0.13;0.01;0.00;99.73
centos7;-1;2016-09-14 22:05:04;LINUX-RESTART

Logstashの場合

input {
  stdin {}
}

filter {
  # drop restart line. ex centos7;-1;2016-09-16 00:12:40;LINUX-RESTART
  if [message] =~ /LINUX-RESTART$/ {
    drop {}
  }

  # drop header line
  if [message] =~ /^# hostname/ {
    drop {}
  }

  csv {
    separator => ";"
    columns => ["hostname","interval","timestamp","cpu","user","nice","system","iowait","steal","idle"]
    convert => {
      "interval" => "integer"
      "user" => "float"
      "nice" => "float"
      "system" => "float"
      "iowait" => "float"
      "steal" => "float"
      "idle" => "float"
    }
  }

  date {
    match => [ "timestamp", "yyyy-MM-dd HH:mm:ss"]
    remove_field => "timestamp"
  }
}

output {
  elasticsearch {
    index => "logstash-sar-cpu-%{+YYYY.MM.dd}"
  }
}

Ingest Nodeのpipelineの場合

curl -H "Expect:" -XPUT localhost:9200/_ingest/pipeline/sadf-cpu -d '{
    "description": "sadf -t -d -P ALL",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": ["%{DATA:hostname};%{NUMBER:interval};%{TIMESTAMP_ISO8601:timestamp};%{DATA:cpu};%{NUMBER:user};%{NUMBER:nice};%{NUMBER:system};%{NUMBER:iowait};%{NUMBER:steal};%{NUMBER:idle}"]
        }
      },
      { "date": { "field": "timestamp", "formats": ["yyyy-MM-dd HH:mm:ss"] } },
      {
        "date_index_name": {
          "field": "@timestamp",
          "index_name_prefix": "filebeat-sar-cpu-",
          "date_rounding": "d"
        }
      },
      { "remove": { "field": "timestamp" } },
      { "convert": { "field": "user", "type": "float" } },
      { "convert": { "field": "nice", "type": "float" } },
      { "convert": { "field": "system", "type": "float"} },
      { "convert": { "field": "iowait", "type": "float"} },
      { "convert": { "field": "steal", "type": "float"} },
      { "convert": { "field": "idle", "type": "float"} }
    ]
}'

Expectヘッダを明示的に付与しているのは、1024バイト以上のリクエスト送るとElasticsearch側でエラーになる問題を回避するため。v5.0.0-beta1 で修正予定なっているため、GAになる頃にはこの回避策は不要になるだろう。

また、drop相当の機能はAlpha5の段階では見当たらないため、『centos7;-1;2016-09-14 21:02:39;LINUX-RESTART』のようにgrokでパースできない行にあたると、Elasticsearch側でエラーログが出力され、該当行のpipelineの処理はスキップされる。エラーログが気になる場合は、Filebeatのinclude-lineで送信したい行のフォーマットの正規表現が指定でき、Elasticsearchに送信するログをFilebeat側で絞り込むことが可能。

pidstat (プロセス別CPU使用率)

不安定なシステムにおいて定常的にcronで収集する。
JavaPostgreSQLが同一のマシンのサービスとして稼働しているが、どれが重くなっているのかがわからないときに必要なデータ。

pidstat -U -h -l -p ALL
  • -U ユーザIDではなくユーザ名で表示
  • -h タイムスタンプをロケール依存ではなくエポックで出力するために使用
  • -l コマンドラインオプションも収集。特にjavaはオプションまで見ないと、APサーバのプロセスなのかバッチタスクか識別できない
  • -p ALL 全ユーザのプロセスのデータを収集

以下のようなログをパース対象とする。

$ pidstat -U -h -l -p ALL
Linux 3.10.0-327.28.3.el7.x86_64 (centos7)     	20160916日 	_x86_64_       	(2 CPU)

#      Time     USER       PID    %usr %system  %guest    %CPU   CPU  Command
 1473953274     root         1    0.00    0.00    0.00    0.00     0  /usr/lib/systemd/systemd --switched-root --system --deserialize 21
 1473953274     root         2    0.00    0.00    0.00    0.00     1  kthreadd

Logstashの場合

input {
  stdin {}
}

filter {
  if [message] !~ /^\s\d{10}/ {
    drop {}
  }

  grok {
    match => { "message" => "%{WORD:timestamp}%{SPACE}%{WORD:user}%{SPACE}%{WORD:pid}%{SPACE}%{NUMBER:usr}%{SPACE}%{NUMBER:system}%{SPACE}%{NUMBER:guest}%{SPACE}%{NUMBER:cpu_percent}%{SPACE}%{WORD:cpu}%{SPACE}%{GREEDYDATA:command}" }
  }

  date {
    match => [ "timestamp", "UNIX"]
    remove_field => "timestamp"
  }

  mutate {
    convert => {
      "usr" => "float"
      "system" => "float"
      "guest" => "float"
      "cpu_percent" => "float"
    }
  }
}

output {
  elasticsearch {
    index => "logstash-pidstat-%{+YYYY.MM.dd}"
  }
}

Ingest Nodeのpipelineの場合

curl -H "Expect:" -XPUT localhost:9200/_ingest/pipeline/pidstat -d '{
    "description": "pidstat -U -h -l -p ALL",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": ["%{WORD:timestamp}%{SPACE}%{WORD:user}%{SPACE}%{WORD:pid}%{SPACE}%{NUMBER:usr}%{SPACE}%{NUMBER:system}%{SPACE}%{NUMBER:guest}%{SPACE}%{NUMBER:cpu_percent}%{SPACE}%{WORD:cpu}%{SPACE}%{GREEDYDATA:command}"]
        }
      },
      { "date": { "field": "timestamp", "formats": ["UNIX"] } },
      {
        "date_index_name": {
          "field": "@timestamp",
          "index_name_prefix": "filebeat-pidstat",
          "date_rounding": "d"
        }
      },
      { "remove": { "field": "timestamp" } },
      { "convert": { "field": "usr", "type": "float" } },
      { "convert": { "field": "system", "type": "float"} },
      { "convert": { "field": "guest", "type": "float"} },
      { "convert": { "field": "cpu_percent", "type": "float"} }
    ]
}'

FilebeatによるIngest Nodeへのログ送信

前述までの設定はあくまでpipelineの定義であるため、何らかの方法でこれらpipelineを宛て先にログを送信する必要がある。

ログを1行ずつ読み取ってcurlで送ることもできなくはないだろうが、ここではFilebeatを使う。一般的には低リソースで継続的にログを収集するエージェントとしてFilebeatが使われることが多いと思うが、Filebeatは標準入力にも対応しているため、ログを一式受領して問題解析のために解析する用途にも使える。

標準入力向けのFilebeatの設定ファイルを以下のように作成する。parameters.pipelineで宛て先のパイプラインを設定する。以下の例では、前述で定義したパイプラインpidstatを宛て先としている。console用出力はログが流れているかの確認用のため、大量データを取り込む際にはコメントアウトした方がロードが早い。

filebeat.prospectors:
- input_type: stdin

output.elasticsearch:
  hosts: ["localhost:9200"]
  parameters.pipeline: "pidstat"

output.console:
  # Boolean flag to enable or disable the output module.
  enabled: true

  # Pretty print json event
  pretty: true

標準入力経由でFilebeatに流し込む。

cat pidstat.log | ./filebeat -c filebeat.pidstat.yml

Filebeat 5.0.0 alpha5の時点では、すべてのログをロードしてもFilebeatは終了せず、そのままさらなる標準入力を待ち続ける。以下のissueがベータ1リリース時に入ってくるとこの問題も解決すると思う。
Allow filebeat to only run once #2456

まとめ

  • Logstashのfilterは、Ingest NodeのProcessorで同じようなことが実現できる
  • Ingest Nodeに定義したパイプラインへのログ送信はFilebeatのparameters.pipelineで簡単にできる

今回の例のように手動によるバルク取り込みでは、Logstash + Elasticsearchの組合せがFilebeat + Elasticsearchの組合せに変わるだけなのでメリットがないように感じられるが、elasticsearch-ingest-node-vs-logstash-performance-2など記事ではIngest Nodeの方が軽量とレポートされているため、実機で試してIngest Nodeの方が軽い場合は積極的に使うとより早くログが可視化できると思う。