postgreSQLで、あるデータがあるとき、他のデータもまとめて取ってきたい

■目的
タイトルだけでは解らないので具体例を挙げていきます。
テーブル(洋菓子、和菓子、人名)のデータがあるとき、
人名が「多田」の「洋菓子、和菓子(例:クレープ、えびせんべい)」のセットがあれば、
他の、人名が異なる「洋菓子、和菓子(クレープ、えびせんべい)」のセットも取ってきたい、
でも、「洋菓子、和菓子(例:クッキー、麩菓子)」のセットが「多田」しか無いときは取ってきたくない。
という場合の実装方法メモです。

■参考
重複している列Aを含む全列取得
https://qiita.com/necoyama3/items/4c24defd6f504366aebe
IN句で複数条件指定
http://blog.fusic.co.jp/archives/1765/
結合で迷ったら(例充実)
http://sak.cool.coocan.jp/w_sak3/doc/sysbrd/psql_k08.htm
重複レコードを削除
https://kanamelogic.com/20180407/redundant-data2/

■実装

CREATE TABLE okashi (yougashi text, wagashi text, name text);

データ投入

INSERT INTO okashi VALUES ('クッキー', '温泉饅頭', '沼田');
INSERT INTO okashi VALUES ('クッキー', 'ゆべし', '沼田');
INSERT INTO okashi VALUES ('ショートケーキ', '温泉饅頭', '葛城');
INSERT INTO okashi VALUES ('マカロン', 'みたらし団子', '大津');
INSERT INTO okashi VALUES ('マカロン', 'ゆべし', '葛城');
INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '石田');
INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '大津');
INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '安藤');
INSERT INTO okashi VALUES ('クレープ', 'えびせんべい', '多田');
INSERT INTO okashi VALUES ('クレープ', 'えびせんべい', '多田');
INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '多田');

期待する結果
人名が「多田」の時、
('チョコケーキ', '桜餅', '石田');
('チョコケーキ', '桜餅', '大津');
('チョコケーキ', '桜餅', '安藤');
('チョコケーキ', '桜餅', '多田');
('クレープ', 'えびせんべい', '多田');
('クレープ', 'えびせんべい', '多田');

準備

CREATE TABLE okashi_only (yougashi text, wagashi text, name text);
CREATE TABLE okashi_next (yougashi text, wagashi text, name text);

①人名「多田」を含むレコードを抽出し、テーブル「okashi_only」に挿入する

INSERT INTO okashi_only (yougashi, wagashi, name)
SELECT * FROM okashi
WHERE name LIKE '%多田%'
AND (yougashi, wagashi) IN (
  SELECT yougashi, wagashi FROM okashi
  GROUP BY yougashi, wagashi
  HAVING COUNT(yougashi) >= 2
  AND COUNT(wagashi) >= 2
);

select * FROM okashi_only;
 クレープ     | えびせんべい | 多田
 クレープ     | えびせんべい | 多田
 チョコケーキ | 桜餅         | 多田

テーブル「okashi_only」に挿入したいので、INSERT INTOから始まります。
今回は都合でname LIKEを使っていますがname = '多田'で充分です。
多田の他に、多田と同じ洋菓子、和菓子の組み合わせを持つレコードがあるもののみ抽出したいので、(yougashi, wagashi)についてIN句を用います。
IN句の中について、yougashi, wagashiの2つについてGROUP BYし、HAVING句でCOUNTし、2以上の物を取ってきています。

②洋菓子と和菓子の組み合わせを一意に集約します。
これは、このままだと「クレープ、えびせんべい」について、③を実行したとき、この組み合わせを持つログを2回取ってきてしまうためです。

DELETE FROM okashi_only oo1
WHERE EXISTS (
  SELECT * FROM okashi_only oo2
  WHERE oo2.yougashi = oo1.yougashi
  AND oo2.wagashi = oo1.wagashi
  AND oo2.ctid > oo1.ctid
);

select * FROM okashi_only;
 クレープ     | えびせんべい | 多田
 チョコケーキ | 桜餅         | 多田

レコードを削除するのでDELETEから始まります。
イメージとしては、okashi_onlyテーブルと全く同じテーブルをイメージして、比較して、yougashiが同じかつwagashiが同じものが存在したら削除する、という感じでしょうか。
ctidは、テーブル内における、行バージョンの物理的位置を表します(updateやvacuumで変化するので注意)。

③okashi_onlyテーブルのyougashiとwagashiの組み合わせが一致するログをokashiテーブルから抽出し、データをokashi_nextテーブルに挿入する

INSERT INTO okashi_next (yougashi, wagashi, name)
SELECT okashi.yougashi, okashi.wagashi, okashi.name
FROM okashi_only, okashi
WHERE okashi.yougashi = okashi_only.yougashi
AND okashi.wagashi = okashi_only.wagashi
;

select * FROM okashi_next;
 クレープ     | えびせんべい | 多田
 クレープ     | えびせんべい | 多田
 チョコケーキ | 桜餅         | 多田
 チョコケーキ | 桜餅         | 石田
 チョコケーキ | 桜餅         | 大津
 チョコケーキ | 桜餅         | 安藤

以上で期待する結果が得られました。

おまけ
手順②で重複排除せず③を行った場合の結果

=# SELECT * FROM okashi_next ;
 クレープ     | えびせんべい | 多田
 クレープ     | えびせんべい | 多田
 クレープ     | えびせんべい | 多田
 クレープ     | えびせんべい | 多田
 チョコケーキ | 桜餅         | 多田
 チョコケーキ | 桜餅         | 石田
 チョコケーキ | 桜餅         | 大津
 チョコケーキ | 桜餅         | 安藤

fluentdで、レコードの値がある文字列と一致するとき、別のレコードに特定の文字列を追加する

■ fluentd
□ やること
レコード「DETECTION_TYPE」を追加
→レコード:SIGNATURE_DOWNCASE が以下の値と一致するとき
Value
①.*(command and control).*
②.*(candc).*
③.*(c-and-c).*
④.*(C2).*
⑤.*(c2).*
レコード:DETECTION_TYPE Field に「c_and_c」を代入する

プラグインインストール
https://docs.fluentd.org/v1.0/articles/plugin-management#if-using-td-agent,-use-/usr/sbin/td-agent-gem

# td-agent-gem install fluent-plugin-rewrite-tag-filter

https://github.com/sonots/fluent-plugin-record-reformer

# td-agent-gem install fluent-plugin-record-reformer

□ 実装方法
rewrite_tag_filter Output Pluginを使用して、matchで「SIGNATURE_DOWNCASE」に対して正規表現マッチをかけ、
タグにシグネチャdetection_typeに入れたい値を追加
record-reformerを使用し、match内でレコードDETECTION_TYPEにタグの末尾の値を追加する

□ タグの遷移
psql.default→psql.default.シグネチャ名→psql.default.reform.シグネチャ
最後のmatch
match psql.default.reform.**

□ テスト用DB
fluent_test_database

□ テスト用テーブル
create table fluent_test_table (
host_addr text,
host_name text,
signature_downcase text,
detection_type text);

□/etc/td-agent/td-agent.conf

## FOR postgreSQL ####################################
<source>
  @type tail
  format json
  path /var/log/remotelog/test_psql.log
  tag psql.default
</source>
<filter psql.default>
  @type record_transformer
  enable_ruby true
  remove_keys HOST_ADDR_TEST, HOST_NAME_TEST, APPLICATION_NAME_TEST, SIGNATURE_TEST, SIGNATURE_DESCRIPTION_TEST, SRC_ADDR_TEST, SRC_PORT_TEST, DEST_ADDR_TEST, DEST_PORT_TEST, protocol_TEST, PACKET_STATE_TEST, message_TEST, TEST_DATA1_TEST
  <record>
#    DATE_TIME #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}
    HOST_ADDR ${record[""HOST_ADDR_TEST""]}
    HOST_NAME ${record[""HOST_NAME_TEST""]}
#    APPLICATION_NAME ${record[""APPLICATION_NAME_TEST""]}
#    SIGNATURE ${record[""SIGNATURE_TEST""]}
#    SIGNATURE_DESCRIPTION ${record[""SIGNATURE_DESCRIPTION_TEST""]}
#    SRC_ADDR ${record[""SRC_ADDR_TEST""]}
#    SRC_PORT ${record[""SRC_PORT_TEST""]}
#    DEST_ADDR ${record[""DEST_ADDR_TEST""]}
#    DEST_PORT ${record[""DEST_PORT_TEST""]}
    SIGNATURE_DOWNCASE ${record[""SIGNATURE_TEST""].downcase}
#    protocol ${record[""protocol_TEST""]}
#    PACKET_STATE ${record[""PACKET_STATE_TEST""]}
#    message ${record[""message_TEST""]}
#    TEST_DATA1 ${record[""TEST_DATA1_TEST""]}
    DETECTION_TYPE ${tag}
  </record>
</filter>
<match psql.default>
  @type rewrite_tag_filter
  capitalize_regex_backreference yes
  <rule>
    key SIGNATURE_DOWNCASE
    pattern .*(command and control).*
    tag psql.default.c_and_c
  </rule>
</match>
<match psql.default.c_and_c>
  @type record_reformer
  tag psql.default.reform.c_and_c
  <record>
    DETECTION_TYPE ${tag_parts[-1]}
  </record>
</match>
<match psql.default.reform.**>
  @type sql
  host 192.168.56.112
  port 5432
  database fluent_test_db
  adapter postgresql
  username postgres
  password platon131
  socket /tmp/.s.PGSQL.5432

  <table>
    table fluent_test_table
    column_mapping 'HOST_ADDR:host_addr,HOST_NAME:host_name,SIGNATURE_DOWNCASE:signature_downcase,DETECTION_TYPE:detection_type'
  </table>
</match>

fluentdからpostgreSQLへログを流し込む(元データはjson形式)

■ fluentdからpostgreSQLへログを流し込む
□ fluentdプラグイン
https://github.com/fluent/fluent-plugin-sql/blob/master/README.md
□ テスト用データ
# echo '{ "HOST_ADDR": "192.168.56.108", "HOST_NAME": "myhostname" }' >> /var/log/remotelog/test_psql.log
DB名:fluent_test_db
テーブル名:fluent_test_table
テスト用カラム:host_addr,host_name

postgreSQL側の準備
□ ユーザpostgresのパスワード設定
# passwd postgres
postgreSQLログイン
# su - postgres
# psql
□ テスト用データベース作成
=# create database fluent_test_db;
=# \c fluent_test_db;
□ テスト用テーブル作成
=# CREATE TABLE fluent_test_table (host_addr TEXT, host_name TEXT);
=# \dt;
□ DBポート確認
# ss -atn
→ 5432が開いていることを確認
□ 他ホストからpostgreSQLに接続できるようにする
# vi /var/lib/pgsql/11/data/postgresql.conf
コメントアウトされている「listen_addresses」を以下のように書き換え(以下を追記)
listen_addresses = '*'
# vi /var/lib/pgsql/11/data/pg_hba.conf
以下のように編集する(一部追記)

						# IPv4 local connections:
host    all             all             127.0.0.1/32            ident
host    all             all             192.168.56.108/32       trust
host    all             all             192.168.56.102/32       trust

postgreSQL再起動
# systemctl restart postgresql-11
□ ポート確認
# netstat -tln
□ (他ホストから)telnet
# telnet 192.168.56.112 5432
tcpdumpでパケットが来てることを見たい場合
# tcpdump -ni enp0s8 port 5432
※ enp0s8は、#ip a コマンドで見た時のネットワークインタフェース名

■ fluentd側
postgreSQL用のプラグインインストール
# yum install ruby-deve
# yum install gcc make automake autogen
# td-agent-gem install pg -v 1.1.4 --no-document
# td-agent-gem install pg -v 0.21.0 --no-document
# td-agent-gem install fluent-plugin-sql --no-document

□ fluentd設定
# vi /etc/td-agent/td-agent.conf

					例	
<source>
	@type tail
	format json
	path /var/log/remotelog/test_psql.log
	tag psql
</source>

<match psql.**>
	@type sql
	host 192.168.56.112
	port 5432
	database fluent_test_db
	adapter postgresql
	username postgres
	password yourpassword
	socket /tmp/.s.PGSQL.5432

	<table>
		table fluent_test_table
		column_mapping 'HOST_ADDR:host_addr,HOST_NAME:host_name'
	</table>
</match>

□ fluentd再起動
# systemctl restart td-agent

flink+centOS

■ flink 日本語マニュアル
http://mogile.web.fc2.com/flink/flink-docs-release-1.3/dev/connectors/kafka.html

■ flink ダウンロードページ
https://flink.apache.org/downloads.html

■ flink install(centOS)
yum をupdateしておく
# yum update

wget をインストールする
# yum install wget

java をインストールする
Javaインストール
# yum install java-1.8.0-openjdk-devel
javahome設定
# vi /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-2.el7_6.x86_64
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar

□ flink最新版をダウンロード
# wget https://www-eu.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz
# tar xzf flink-1.8.0-bin-scala_2.12.tgz
# cd flink-1.8.0

selinux無効化★必ず無効化すること
https://qiita.com/hanaita0102/items/5d3675e4dc1530b255ba
iptables★無視してOK

□ firewalld無効化★必ず無効化すること
https://www.server-memo.net/centos-settings/centos7/firewalld-stop.html

□ flinkスタート
# ./bin/start-cluster.sh
http://192.168.56.111:8081/

□ flink自動起動設定
# vi /etc/rc.local
以下を追記
sh /test01/flink-1.8.0/bin/start-cluster.sh
# chmod u+x /etc/rc.d/rc.local

□ quick startを続ける
# yum install nc
# cd /test01/flink-1.8.0
# nc -l 9000
# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
※ 上手くいかないときは、思い切ってreboot

zabbix appliance のパスワード

zabbix 4.0 mysql パスワード
6. Zabbix appliance [Zabbix Documentation 4.0]
インストール時にランダムに生成される

mysqlのrootユーザのパスワード
/root/.my.cnf
に記載されている

mysqlのzabbixユーザのパスワード
/etc/zabbix/zabbix_server.conf

/etc/zabbix/web/zabbix.conf.php
に記載されている

zabbixユーザのユーザ名やパスワードを変更した際には上記2つのファイルも書き換える必要がある。