fluentdで、レコードの値がある文字列と一致するとき、別のレコードに特定の文字列を追加する
■ fluentd
□ やること
レコード「DETECTION_TYPE」を追加
→レコード:SIGNATURE_DOWNCASE が以下の値と一致するとき
Value:
①.*(command and control).*
②.*(candc).*
③.*(c-and-c).*
④.*(C2).*
⑤.*(c2).*
レコード:DETECTION_TYPE Field に「c_and_c」を代入する
□ プラグインインストール
https://docs.fluentd.org/v1.0/articles/plugin-management#if-using-td-agent,-use-/usr/sbin/td-agent-gem
# td-agent-gem install fluent-plugin-rewrite-tag-filter
https://github.com/sonots/fluent-plugin-record-reformer
# td-agent-gem install fluent-plugin-record-reformer
□ 実装方法
rewrite_tag_filter Output Pluginを使用して、matchで「SIGNATURE_DOWNCASE」に対して正規表現マッチをかけ、
タグにシグネチャdetection_typeに入れたい値を追加
record-reformerを使用し、match内でレコードDETECTION_TYPEにタグの末尾の値を追加する
□ タグの遷移
psql.default→psql.default.シグネチャ名→psql.default.reform.シグネチャ名
最後のmatch
match psql.default.reform.**
□ テスト用DB
fluent_test_database
□ テスト用テーブル
create table fluent_test_table (
host_addr text,
host_name text,
signature_downcase text,
detection_type text);
□/etc/td-agent/td-agent.conf
## FOR postgreSQL #################################### <source> @type tail format json path /var/log/remotelog/test_psql.log tag psql.default </source> <filter psql.default> @type record_transformer enable_ruby true remove_keys HOST_ADDR_TEST, HOST_NAME_TEST, APPLICATION_NAME_TEST, SIGNATURE_TEST, SIGNATURE_DESCRIPTION_TEST, SRC_ADDR_TEST, SRC_PORT_TEST, DEST_ADDR_TEST, DEST_PORT_TEST, protocol_TEST, PACKET_STATE_TEST, message_TEST, TEST_DATA1_TEST <record> # DATE_TIME #{Time.now.strftime('%Y-%m-%d %H:%M:%S')} HOST_ADDR ${record[""HOST_ADDR_TEST""]} HOST_NAME ${record[""HOST_NAME_TEST""]} # APPLICATION_NAME ${record[""APPLICATION_NAME_TEST""]} # SIGNATURE ${record[""SIGNATURE_TEST""]} # SIGNATURE_DESCRIPTION ${record[""SIGNATURE_DESCRIPTION_TEST""]} # SRC_ADDR ${record[""SRC_ADDR_TEST""]} # SRC_PORT ${record[""SRC_PORT_TEST""]} # DEST_ADDR ${record[""DEST_ADDR_TEST""]} # DEST_PORT ${record[""DEST_PORT_TEST""]} SIGNATURE_DOWNCASE ${record[""SIGNATURE_TEST""].downcase} # protocol ${record[""protocol_TEST""]} # PACKET_STATE ${record[""PACKET_STATE_TEST""]} # message ${record[""message_TEST""]} # TEST_DATA1 ${record[""TEST_DATA1_TEST""]} DETECTION_TYPE ${tag} </record> </filter> <match psql.default> @type rewrite_tag_filter capitalize_regex_backreference yes <rule> key SIGNATURE_DOWNCASE pattern .*(command and control).* tag psql.default.c_and_c </rule> </match> <match psql.default.c_and_c> @type record_reformer tag psql.default.reform.c_and_c <record> DETECTION_TYPE ${tag_parts[-1]} </record> </match> <match psql.default.reform.**> @type sql host 192.168.56.112 port 5432 database fluent_test_db adapter postgresql username postgres password platon131 socket /tmp/.s.PGSQL.5432 <table> table fluent_test_table column_mapping 'HOST_ADDR:host_addr,HOST_NAME:host_name,SIGNATURE_DOWNCASE:signature_downcase,DETECTION_TYPE:detection_type' </table> </match>