FluentdでPostfixのログをつなぎ合わせる

前からやろうやろうと思ってなかなか手をつけられずにいたのですが、やっと実装しました。
(エラーハンドリングとかまだだけど)

Postfix に限らず SMTP サーバーのログは from と to などが別の行として出力されます。
そして、各行ごとに出力される項目が異なります。from の行には size と nrcpt など。to には relay 先、delay, delays, status など。接続元や message-id はまた別の行です。

そこで IOS ビッグデータ技術ブログ: Postfixのログをfluentdを使ってTreasureDataに送る を見つけてとりあえず fluent-plugin-multi-format-parser を使って elasticsearch に送って配信エラー監視したり kibana で確認したりしてましたが、to の行で bounced になってるのを見て from は何かなと思ったら queue id から from を検索して確認するのがかなり面倒なのでやっぱりこれは1行にまとめなければと。

大雑把には to 以外のデータを redis なり memcached に登録しておいて to でそれを引っ張りだして返せば良いわけだけども、どうやって実装するのかなと。

最初は Filter プラグインで実装しようかと考えていましたが redis なり memcached なりを使うとするとそこでエラーになる可能性があるため、エラーの場合にリトライできるように BufferedOutput で実装を始めました。が、処理した結果をまた fluentd に戻す方法がわからなくて、困ってしまいました。どうしようかなって fluentd のリポジトリを眺めているうちに out_exec_filter というものがあることに気づいてこれだ!!ということで実装しました。

エラーハンドリングがないけど

#!/opt/td-agent/embedded/bin/ruby

require 'redis'
require 'json'

redis = Redis.new

while line = STDIN.gets
  record = JSON.load(line)
  if record.has_key?('from')
    # time は to のもので上書きされてしまうので from の時の値を別名で保存しておく
    record['received_at'] = Time.at(record['time'].to_i).strftime('%Y-%m-%dT%H:%M:%S%:z')
  end

  if record.has_key?('qid')
    key = record['host'] + ':' + record['qid']

    stored = redis.hgetall(key) || {}

    if record.has_key?('to')
      print JSON.generate(stored.merge(record)) + "\n"
    elsif record['message'] == 'removed'
      redis.del(key)
    else
      redis.mapped_hmset(key, stored.merge(record))
      # postfix の bounce_queue_lifetime の値に合わせて expire を設定する
      redis.expire(key, 86400)
    end
  end
end

Postfix のログの parse を fluent-plugin-multi-format-parser ではなく Parser プラグインを書きました。gem 化してないので td-agent では /etc/td-agent/plugin/ ディレクトリに置いて使います。

module Fluent
  class TextParser
    class PostfixLogParser < Parser
      Plugin.register_parser('postfix_log_parser', self)

      config_param :time_format, :string, :default => nil

      def configure(conf)
        super
        @time_parser = TimeParser.new(@time_format)
        @addr_keys = ['to', 'from', 'orig_to']
      end

      def parse_message(message)
        log = {}

        if m = message.match(/^(?<qid>[A-F0-9]+):\s+/)
          log['qid'] = m['qid']
          message.gsub!(/^[A-F0-9]+:\s+/, '')
        else
          log['message'] = message
          return log
        end

        if m = message.match(/, status=(?<status>\S+)\s+(?<message>.*)$/)
          log['status'] = m['status']
          log['message'] = m['message']
          message.gsub!(/, status=.*$/, '')
        end

        if ! message.match(/^[a-z0-9\-]+=/)
          log['message'] = message
          return log
        end

        message.split(/, /).each do |kv|
          (key, value) = kv.split('=')
          if @addr_keys.include?(key)
            log[key] = value.gsub!(/^\<(.*)\>$/, '\1')
          else
            log[key] = value
          end
        end

        log
      end

      # to や from のドメインで集計したいこともあるのでドメイン抽出
      def domain(addr)
        local, domain = addr.split('@', 2)
        parts = domain.split('.').reverse
        # 属性型JPドメインでは後ろから3つ分 (ISPとかのサブドメインを無視する)
        if parts[0] == 'jp' and parts[1].length == 2
          return [parts[2], parts[1], parts[0]].join('.')
        elsif parts.length >= 2
          return [parts[1], parts[0]].join('.')
        else
          return domain
        end
      end

      def parse(text)
        m = text.match(/^(?<time>\S+\s+\S+\s+\S+)\s+(?<host>\S+)\s+(?<process>[^\[]+)\[(?<pid>\d+)\]:\s+(?<message>.*)/)
        time = @time_parser.parse(m['time'])
        record = parse_message(m['message'])
        record['host']    = m['host']
        record['process'] = m['process']
        record['pid']     = m['pid']

        # nrcpt は後ろに "(queue active)" とかついてるけど削って数値として扱えるようにしておく
        if record.has_key?('nrcpt')
          record['nrcpt'].gsub!(/\s.*$/, '')
        end
        if record.has_key?('to')
          record['to_domain'] = domain(record['to'].downcase)
        end
        if record.has_key?('from') && record['from'] != ''
          record['from_domain'] = domain(record['from'].downcase)
        end

        yield time, record
      end
    end
  end
end

td-agent.conf はこんな感じ(必要に応じて Buffer まわりの調整を)

<source>
  type tail
  format postfix_log_parser
  time_format %b %e %T
  path /var/log/maillog
  pos_file /var/lib/td-agent/mail.pos
  tag mail.syslog
</source>

<match mail.syslog>
  type exec_filter
  command /etc/td-agent/exec_filter/postfix_log_binder.rb
  in_format json
  out_format json
  tag mail.filtered
  time_key time
</match>

<match mail.filtered>
  type elasticsearch
  hosts kibana2:9200,kibana3:9200
  type_name postfix
  logstash_format true
  logstash_prefix postfix
  flush_interval 5s
</match>

/var/log/maillog はそのままでは td-agent ユーザーでは読めないので rsyslog の設定を変更します。

$DirCreateMode 0750
$FileCreateMode 0640
$DirGroup sys
$FileGroup sys

td-agent ユーザーを sys グループに所属させます。

sudo usermod -a -G sys td-agent

プラグインの書き方は公式ドキュメントを参照しました http://docs.fluentd.org/articles/plugin-development

fluent-plugin-fortigate-log-parser ってのを書いたけど Parser なのに Output プラグインとして実装してしまったので書き直したいな。