fluentdで、レコードの値がある文字列と一致するとき、別のレコードに特定の文字列を追加する

■ fluentd
□ やること
レコード「DETECTION_TYPE」を追加
→レコード:SIGNATURE_DOWNCASE が以下の値と一致するとき
Value
①.*(command and control).*
②.*(candc).*
③.*(c-and-c).*
④.*(C2).*
⑤.*(c2).*
レコード:DETECTION_TYPE Field に「c_and_c」を代入する

プラグインインストール
https://docs.fluentd.org/v1.0/articles/plugin-management#if-using-td-agent,-use-/usr/sbin/td-agent-gem

# td-agent-gem install fluent-plugin-rewrite-tag-filter

https://github.com/sonots/fluent-plugin-record-reformer

# td-agent-gem install fluent-plugin-record-reformer

□ 実装方法
rewrite_tag_filter Output Pluginを使用して、matchで「SIGNATURE_DOWNCASE」に対して正規表現マッチをかけ、
タグにシグネチャdetection_typeに入れたい値を追加
record-reformerを使用し、match内でレコードDETECTION_TYPEにタグの末尾の値を追加する

□ タグの遷移
psql.default→psql.default.シグネチャ名→psql.default.reform.シグネチャ
最後のmatch
match psql.default.reform.**

□ テスト用DB
fluent_test_database

□ テスト用テーブル
create table fluent_test_table (
host_addr text,
host_name text,
signature_downcase text,
detection_type text);

□/etc/td-agent/td-agent.conf

## FOR postgreSQL ####################################
<source>
  @type tail
  format json
  path /var/log/remotelog/test_psql.log
  tag psql.default
</source>
<filter psql.default>
  @type record_transformer
  enable_ruby true
  remove_keys HOST_ADDR_TEST, HOST_NAME_TEST, APPLICATION_NAME_TEST, SIGNATURE_TEST, SIGNATURE_DESCRIPTION_TEST, SRC_ADDR_TEST, SRC_PORT_TEST, DEST_ADDR_TEST, DEST_PORT_TEST, protocol_TEST, PACKET_STATE_TEST, message_TEST, TEST_DATA1_TEST
  <record>
#    DATE_TIME #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}
    HOST_ADDR ${record[""HOST_ADDR_TEST""]}
    HOST_NAME ${record[""HOST_NAME_TEST""]}
#    APPLICATION_NAME ${record[""APPLICATION_NAME_TEST""]}
#    SIGNATURE ${record[""SIGNATURE_TEST""]}
#    SIGNATURE_DESCRIPTION ${record[""SIGNATURE_DESCRIPTION_TEST""]}
#    SRC_ADDR ${record[""SRC_ADDR_TEST""]}
#    SRC_PORT ${record[""SRC_PORT_TEST""]}
#    DEST_ADDR ${record[""DEST_ADDR_TEST""]}
#    DEST_PORT ${record[""DEST_PORT_TEST""]}
    SIGNATURE_DOWNCASE ${record[""SIGNATURE_TEST""].downcase}
#    protocol ${record[""protocol_TEST""]}
#    PACKET_STATE ${record[""PACKET_STATE_TEST""]}
#    message ${record[""message_TEST""]}
#    TEST_DATA1 ${record[""TEST_DATA1_TEST""]}
    DETECTION_TYPE ${tag}
  </record>
</filter>
<match psql.default>
  @type rewrite_tag_filter
  capitalize_regex_backreference yes
  <rule>
    key SIGNATURE_DOWNCASE
    pattern .*(command and control).*
    tag psql.default.c_and_c
  </rule>
</match>
<match psql.default.c_and_c>
  @type record_reformer
  tag psql.default.reform.c_and_c
  <record>
    DETECTION_TYPE ${tag_parts[-1]}
  </record>
</match>
<match psql.default.reform.**>
  @type sql
  host 192.168.56.112
  port 5432
  database fluent_test_db
  adapter postgresql
  username postgres
  password platon131
  socket /tmp/.s.PGSQL.5432

  <table>
    table fluent_test_table
    column_mapping 'HOST_ADDR:host_addr,HOST_NAME:host_name,SIGNATURE_DOWNCASE:signature_downcase,DETECTION_TYPE:detection_type'
  </table>
</match>