postgreSQLで、あるデータがあるとき、他のデータもまとめて取ってきたい
■目的
タイトルだけでは解らないので具体例を挙げていきます。
テーブル(洋菓子、和菓子、人名)のデータがあるとき、
人名が「多田」の「洋菓子、和菓子(例:クレープ、えびせんべい)」のセットがあれば、
他の、人名が異なる「洋菓子、和菓子(クレープ、えびせんべい)」のセットも取ってきたい、
でも、「洋菓子、和菓子(例:クッキー、麩菓子)」のセットが「多田」しか無いときは取ってきたくない。
という場合の実装方法メモです。
■参考
重複している列Aを含む全列取得
https://qiita.com/necoyama3/items/4c24defd6f504366aebe
IN句で複数条件指定
http://blog.fusic.co.jp/archives/1765/
結合で迷ったら(例充実)
http://sak.cool.coocan.jp/w_sak3/doc/sysbrd/psql_k08.htm
重複レコードを削除
https://kanamelogic.com/20180407/redundant-data2/
■実装
CREATE TABLE okashi (yougashi text, wagashi text, name text);
データ投入
INSERT INTO okashi VALUES ('クッキー', '温泉饅頭', '沼田'); INSERT INTO okashi VALUES ('クッキー', 'ゆべし', '沼田'); INSERT INTO okashi VALUES ('ショートケーキ', '温泉饅頭', '葛城'); INSERT INTO okashi VALUES ('マカロン', 'みたらし団子', '大津'); INSERT INTO okashi VALUES ('マカロン', 'ゆべし', '葛城'); INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '石田'); INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '大津'); INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '安藤'); INSERT INTO okashi VALUES ('クレープ', 'えびせんべい', '多田'); INSERT INTO okashi VALUES ('クレープ', 'えびせんべい', '多田'); INSERT INTO okashi VALUES ('チョコケーキ', '桜餅', '多田');
期待する結果
人名が「多田」の時、
('チョコケーキ', '桜餅', '石田');
('チョコケーキ', '桜餅', '大津');
('チョコケーキ', '桜餅', '安藤');
('チョコケーキ', '桜餅', '多田');
('クレープ', 'えびせんべい', '多田');
('クレープ', 'えびせんべい', '多田');
準備
CREATE TABLE okashi_only (yougashi text, wagashi text, name text); CREATE TABLE okashi_next (yougashi text, wagashi text, name text);
①人名「多田」を含むレコードを抽出し、テーブル「okashi_only」に挿入する
INSERT INTO okashi_only (yougashi, wagashi, name) SELECT * FROM okashi WHERE name LIKE '%多田%' AND (yougashi, wagashi) IN ( SELECT yougashi, wagashi FROM okashi GROUP BY yougashi, wagashi HAVING COUNT(yougashi) >= 2 AND COUNT(wagashi) >= 2 ); select * FROM okashi_only; クレープ | えびせんべい | 多田 クレープ | えびせんべい | 多田 チョコケーキ | 桜餅 | 多田
テーブル「okashi_only」に挿入したいので、INSERT INTOから始まります。
今回は都合でname LIKEを使っていますがname = '多田'で充分です。
多田の他に、多田と同じ洋菓子、和菓子の組み合わせを持つレコードがあるもののみ抽出したいので、(yougashi, wagashi)についてIN句を用います。
IN句の中について、yougashi, wagashiの2つについてGROUP BYし、HAVING句でCOUNTし、2以上の物を取ってきています。
②洋菓子と和菓子の組み合わせを一意に集約します。
これは、このままだと「クレープ、えびせんべい」について、③を実行したとき、この組み合わせを持つログを2回取ってきてしまうためです。
DELETE FROM okashi_only oo1 WHERE EXISTS ( SELECT * FROM okashi_only oo2 WHERE oo2.yougashi = oo1.yougashi AND oo2.wagashi = oo1.wagashi AND oo2.ctid > oo1.ctid ); select * FROM okashi_only; クレープ | えびせんべい | 多田 チョコケーキ | 桜餅 | 多田
レコードを削除するのでDELETEから始まります。
イメージとしては、okashi_onlyテーブルと全く同じテーブルをイメージして、比較して、yougashiが同じかつwagashiが同じものが存在したら削除する、という感じでしょうか。
ctidは、テーブル内における、行バージョンの物理的位置を表します(updateやvacuumで変化するので注意)。
③okashi_onlyテーブルのyougashiとwagashiの組み合わせが一致するログをokashiテーブルから抽出し、データをokashi_nextテーブルに挿入する
INSERT INTO okashi_next (yougashi, wagashi, name) SELECT okashi.yougashi, okashi.wagashi, okashi.name FROM okashi_only, okashi WHERE okashi.yougashi = okashi_only.yougashi AND okashi.wagashi = okashi_only.wagashi ; select * FROM okashi_next; クレープ | えびせんべい | 多田 クレープ | えびせんべい | 多田 チョコケーキ | 桜餅 | 多田 チョコケーキ | 桜餅 | 石田 チョコケーキ | 桜餅 | 大津 チョコケーキ | 桜餅 | 安藤
以上で期待する結果が得られました。
おまけ
手順②で重複排除せず③を行った場合の結果
=# SELECT * FROM okashi_next ; クレープ | えびせんべい | 多田 クレープ | えびせんべい | 多田 クレープ | えびせんべい | 多田 クレープ | えびせんべい | 多田 チョコケーキ | 桜餅 | 多田 チョコケーキ | 桜餅 | 石田 チョコケーキ | 桜餅 | 大津 チョコケーキ | 桜餅 | 安藤
fluentdで、レコードの値がある文字列と一致するとき、別のレコードに特定の文字列を追加する
■ fluentd
□ やること
レコード「DETECTION_TYPE」を追加
→レコード:SIGNATURE_DOWNCASE が以下の値と一致するとき
Value:
①.*(command and control).*
②.*(candc).*
③.*(c-and-c).*
④.*(C2).*
⑤.*(c2).*
レコード:DETECTION_TYPE Field に「c_and_c」を代入する
□ プラグインインストール
https://docs.fluentd.org/v1.0/articles/plugin-management#if-using-td-agent,-use-/usr/sbin/td-agent-gem
# td-agent-gem install fluent-plugin-rewrite-tag-filter
https://github.com/sonots/fluent-plugin-record-reformer
# td-agent-gem install fluent-plugin-record-reformer
□ 実装方法
rewrite_tag_filter Output Pluginを使用して、matchで「SIGNATURE_DOWNCASE」に対して正規表現マッチをかけ、
タグにシグネチャdetection_typeに入れたい値を追加
record-reformerを使用し、match内でレコードDETECTION_TYPEにタグの末尾の値を追加する
□ タグの遷移
psql.default→psql.default.シグネチャ名→psql.default.reform.シグネチャ名
最後のmatch
match psql.default.reform.**
□ テスト用DB
fluent_test_database
□ テスト用テーブル
create table fluent_test_table (
host_addr text,
host_name text,
signature_downcase text,
detection_type text);
□/etc/td-agent/td-agent.conf
## FOR postgreSQL #################################### <source> @type tail format json path /var/log/remotelog/test_psql.log tag psql.default </source> <filter psql.default> @type record_transformer enable_ruby true remove_keys HOST_ADDR_TEST, HOST_NAME_TEST, APPLICATION_NAME_TEST, SIGNATURE_TEST, SIGNATURE_DESCRIPTION_TEST, SRC_ADDR_TEST, SRC_PORT_TEST, DEST_ADDR_TEST, DEST_PORT_TEST, protocol_TEST, PACKET_STATE_TEST, message_TEST, TEST_DATA1_TEST <record> # DATE_TIME #{Time.now.strftime('%Y-%m-%d %H:%M:%S')} HOST_ADDR ${record[""HOST_ADDR_TEST""]} HOST_NAME ${record[""HOST_NAME_TEST""]} # APPLICATION_NAME ${record[""APPLICATION_NAME_TEST""]} # SIGNATURE ${record[""SIGNATURE_TEST""]} # SIGNATURE_DESCRIPTION ${record[""SIGNATURE_DESCRIPTION_TEST""]} # SRC_ADDR ${record[""SRC_ADDR_TEST""]} # SRC_PORT ${record[""SRC_PORT_TEST""]} # DEST_ADDR ${record[""DEST_ADDR_TEST""]} # DEST_PORT ${record[""DEST_PORT_TEST""]} SIGNATURE_DOWNCASE ${record[""SIGNATURE_TEST""].downcase} # protocol ${record[""protocol_TEST""]} # PACKET_STATE ${record[""PACKET_STATE_TEST""]} # message ${record[""message_TEST""]} # TEST_DATA1 ${record[""TEST_DATA1_TEST""]} DETECTION_TYPE ${tag} </record> </filter> <match psql.default> @type rewrite_tag_filter capitalize_regex_backreference yes <rule> key SIGNATURE_DOWNCASE pattern .*(command and control).* tag psql.default.c_and_c </rule> </match> <match psql.default.c_and_c> @type record_reformer tag psql.default.reform.c_and_c <record> DETECTION_TYPE ${tag_parts[-1]} </record> </match> <match psql.default.reform.**> @type sql host 192.168.56.112 port 5432 database fluent_test_db adapter postgresql username postgres password platon131 socket /tmp/.s.PGSQL.5432 <table> table fluent_test_table column_mapping 'HOST_ADDR:host_addr,HOST_NAME:host_name,SIGNATURE_DOWNCASE:signature_downcase,DETECTION_TYPE:detection_type' </table> </match>
fluentdからpostgreSQLへログを流し込む(元データはjson形式)
■ fluentdからpostgreSQLへログを流し込む
□ fluentdプラグイン
https://github.com/fluent/fluent-plugin-sql/blob/master/README.md
□ テスト用データ
# echo '{ "HOST_ADDR": "192.168.56.108", "HOST_NAME": "myhostname" }' >> /var/log/remotelog/test_psql.log
DB名:fluent_test_db
テーブル名:fluent_test_table
テスト用カラム:host_addr,host_name
■ postgreSQL側の準備
□ ユーザpostgresのパスワード設定
# passwd postgres
□ postgreSQLログイン
# su - postgres
# psql
□ テスト用データベース作成
=# create database fluent_test_db;
=# \c fluent_test_db;
□ テスト用テーブル作成
=# CREATE TABLE fluent_test_table (host_addr TEXT, host_name TEXT);
=# \dt;
□ DBポート確認
# ss -atn
→ 5432が開いていることを確認
□ 他ホストからpostgreSQLに接続できるようにする
# vi /var/lib/pgsql/11/data/postgresql.conf
コメントアウトされている「listen_addresses」を以下のように書き換え(以下を追記)
listen_addresses = '*'
# vi /var/lib/pgsql/11/data/pg_hba.conf
以下のように編集する(一部追記)
# IPv4 local connections: host all all 127.0.0.1/32 ident host all all 192.168.56.108/32 trust host all all 192.168.56.102/32 trust
□ postgreSQL再起動
# systemctl restart postgresql-11
□ ポート確認
# netstat -tln
□ (他ホストから)telnet
# telnet 192.168.56.112 5432
□ tcpdumpでパケットが来てることを見たい場合
# tcpdump -ni enp0s8 port 5432
※ enp0s8は、#ip a コマンドで見た時のネットワークインタフェース名
■ fluentd側
□ postgreSQL用のプラグインインストール
# yum install ruby-deve
# yum install gcc make automake autogen
# td-agent-gem install pg -v 1.1.4 --no-document
# td-agent-gem install pg -v 0.21.0 --no-document
# td-agent-gem install fluent-plugin-sql --no-document
□ fluentd設定
# vi /etc/td-agent/td-agent.conf
例 <source> @type tail format json path /var/log/remotelog/test_psql.log tag psql </source> <match psql.**> @type sql host 192.168.56.112 port 5432 database fluent_test_db adapter postgresql username postgres password yourpassword socket /tmp/.s.PGSQL.5432 <table> table fluent_test_table column_mapping 'HOST_ADDR:host_addr,HOST_NAME:host_name' </table> </match>
□ fluentd再起動
# systemctl restart td-agent
postgreSQL+pgAdmin4
qiita.com
こちらの記事のとおりやることで上手く行きました。
記事ではpostgreSQL9.6ですが、11に置き換えても上手く行きました。
flink+centOS
■ flink 日本語マニュアル
http://mogile.web.fc2.com/flink/flink-docs-release-1.3/dev/connectors/kafka.html
■ flink ダウンロードページ
https://flink.apache.org/downloads.html
■ flink install(centOS)
□ yum をupdateしておく
# yum update
□ wget をインストールする
# yum install wget
□ java をインストールする
Javaインストール
# yum install java-1.8.0-openjdk-devel
javahome設定
# vi /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-2.el7_6.x86_64
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
□ flink最新版をダウンロード
# wget https://www-eu.apache.org/dist/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz
# tar xzf flink-1.8.0-bin-scala_2.12.tgz
# cd flink-1.8.0
□ selinux無効化★必ず無効化すること
https://qiita.com/hanaita0102/items/5d3675e4dc1530b255ba
□ iptables★無視してOK
□ firewalld無効化★必ず無効化すること
https://www.server-memo.net/centos-settings/centos7/firewalld-stop.html
□ flinkスタート
# ./bin/start-cluster.sh
http://192.168.56.111:8081/
□ flink自動起動設定
# vi /etc/rc.local
以下を追記
sh /test01/flink-1.8.0/bin/start-cluster.sh
# chmod u+x /etc/rc.d/rc.local
□ quick startを続ける
# yum install nc
# cd /test01/flink-1.8.0
# nc -l 9000
# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
※ 上手くいかないときは、思い切ってreboot
zabbix appliance のパスワード
zabbix 4.0 mysql パスワード
6. Zabbix appliance [Zabbix Documentation 4.0]
インストール時にランダムに生成される
mysqlのrootユーザのパスワード
/root/.my.cnf
に記載されている
mysqlのzabbixユーザのパスワード
/etc/zabbix/zabbix_server.conf
と
/etc/zabbix/web/zabbix.conf.php
に記載されている
zabbixユーザのユーザ名やパスワードを変更した際には上記2つのファイルも書き換える必要がある。
fluentdで現在時刻をレコードに出す方法
DATE_TIME #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}
→ 以下のようなログを取得できる
DATE_TIME
2019-03-27 23:11:49