Search

Opensearch sink connector(feat. MSK)

์œ„ ๋‘ ๊ฐœ์˜ ํŒŒ์ผ์„ ๋ชจ๋‘ ๋‹ค์šด ๋ฐ›์•„์•ผ ํ•ฉ๋‹ˆ๋‹ค

์‚ฌ์šฉ์ž ์ •์˜ ํ”Œ๋Ÿฌ๊ทธ์ธ ์ƒ์„ฑ

๋‘ ์ปค๋„ฅํ„ฐ๋ฅผ ๋‹ค์šด๋กœ๋“œํ•˜๊ณ  ์••์ถ•์„ ํ’€๋ฉด โ€œtransforms-for-apache-kafka-connect-1.5.0/transforms-for-apache-kafka-connect-1.5.0โ€ ํด๋”๋กœ ์ด๋™ํ•˜์‹ญ์‹œ์˜ค. ์—ฌ๊ธฐ์—๋Š” ๋‘ ๊ฐœ์˜ .jar ํŒŒ์ผ์ด ์žˆ์Šต๋‹ˆ๋‹ค. โ€œtransforms-for-apache-kafka-connect-1.5.0.jarโ€์„ โ€œopensearch-connector-for-apache-kafka-3.1.1/opensearch-connector-for-apache-kafka-3.1.1โ€ ํด๋”๋กœ ๋ณต์‚ฌํ•˜์‹ญ์‹œ์˜ค.
โ€œslf4j-api-1.7.36.jarโ€์€ ์ด๋ฏธ โ€œopensearch-connector-for-apache-kafka-3.1.1โ€ ํด๋”์— ์žˆ์œผ๋ฏ€๋กœ ๋ณต์‚ฌํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.
์ด ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด โ€œopensearch-connector-for-apache-kafka-3.1.1โ€ ํด๋”๋ฅผ ์••์ถ•ํ•˜๊ณ , ์šฐ๋ฆฌ์˜ ์‚ฌ์šฉ์ž ์ •์˜ ํ”Œ๋Ÿฌ๊ทธ์ธ์ด ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.
zip -r opensearch-connector-for-apache-kafka-3.1.1.zip opensearch-connector-for-apache-kafka-3.1.1
Shell
๋ณต์‚ฌ
์œ„์˜ ๊นƒํ—ˆ๋ธŒ ๋‚ด์šฉ๋Œ€๋กœ ์••์ถ•์„ ํ’€์–ด์„œ, plugin ํด๋” ์•ˆ์— ๋ฐฐ์น˜ํ•ฉ๋‹ˆ๋‹ค.
์••์ถ• ํ•ด์ œ๋œ ๋ฐ”์ด๋„ˆ๋ฆฌ๋ฅผ ๊ฐ Connect ์ž‘์—… ๋…ธ๋“œ์˜ ๋””๋ ‰ํ† ๋ฆฌ์— ๋ฐฐ์น˜ํ•ฉ๋‹ˆ๋‹ค.
์˜ˆ๋ฅผ ๋“ค์–ด, /connectors
์ด ๊ฒฝ์šฐ, opensearch-connector-for-kafka๋ฅผ connectors ํด๋”์— ๋ฐฐ์น˜ํ•ฉ๋‹ˆ๋‹ค
connectors โ””โ”€โ”€ opensearch-connector-for-apache-kafka
Plain Text
๋ณต์‚ฌ
๊ทธ๋Ÿฐ ๋‹ค์Œ, ๊ฐ ์—ฐ๊ฒฐ ์ž‘์—…์ž ๊ตฌ์„ฑ์—์„œ /connectors์„ plugin.path ๊ตฌ์„ฑ์— ์ถ”๊ฐ€ํ•˜์‹ญ์‹œ์˜ค:
plugin.path=/connectors
$ tree connectors/ connectors/ โ”œโ”€โ”€ msk-datagen โ”‚ โ””โ”€โ”€ msk-data-generator.jar โ”œโ”€โ”€ opensearch-connector ... โ”‚ โ”œโ”€โ”€ transforms-for-apache-kafka-connect-1.5.0.jar // ์ถ”๊ฐ€ํ•œ ํŒŒ์ผ โ”‚ โ”œโ”€โ”€ opensearch-2.6.0.jar โ”‚ โ”œโ”€โ”€ opensearch-cli-2.6.0.jar โ”‚ โ”œโ”€โ”€ opensearch-common-2.6.0.jar โ”‚ โ”œโ”€โ”€ opensearch-connector-for-apache-kafka-3.1.0.jar โ”‚ โ”œโ”€โ”€ opensearch-core-2.6.0.jar ... โ””โ”€โ”€ opensearch-connector.zip
Shell
๋ณต์‚ฌ
opensearch-connector ํด๋”๋ฅผ ์••์ถ•ํ•ด์„œ opensearch-connector.zip์œผ๋กœ ๋งŒ๋“  ํ›„,
MSK์šฉ ์ปค์Šคํ…€ ์ปค๋„ฅํ„ฐ ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ๋งŒ๋“ค ์˜ˆ์ •์ด๋ผ๋ฉด, "msk-custom-plugins"๋ผ๋Š” ์ด๋ฆ„์˜ S3 ๋ฒ„ํ‚ท์„ ๋งŒ๋“  ํ›„, ์šฐ๋ฆฌ์˜ zip ํŒŒ์ผ "opensearch-connector-for-apache-kafka-3.1.1.zip"์„ ๊ทธ ์•ˆ์— ์—…๋กœ๋“œํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.
MSK ์ปค๋„ฅํ„ฐ ํ”Œ๋Ÿฌ๊ทธ์ธ ์ƒ์„ฑ ํ”„๋กœ์„ธ์Šค๋Š” ์•„๋ž˜ ๊ธ€์„ ์ฐธ๊ณ ํ•˜์‹œ๋ฉด ๋” ๋„์›€์ด ๋  ๊ฒƒ์ž…๋‹ˆ๋‹ค.
์ฐธ๊ณ ๋กœ connectors ๋ฅผ ํ”Œ๋Ÿฌ๊ทธ์ธ path๋กœ ์žก์€ ์ดํ›„, MSK๊ฐ€ ์•„๋‹Œ ๊ธฐ์กด ์นดํ”„์นด ์ปค๋„ฅํ„ฐ๋ฅผ ์žฌ์‹คํ–‰ํ•˜๋ฉด,
์•„๋ž˜ ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด ํ”Œ๋Ÿฌ๊ทธ์ธ ์ƒ์„ฑ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
curl http://localhost:8083/connector-plugins | jq . [ ... { "class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "type": "sink", "version": "3.0.0" }, ... ]
Shell
๋ณต์‚ฌ

Sink connector ์ƒ์„ฑ

์ด์ œ ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ๋งŒ๋“ค์—ˆ์œผ๋‹ˆ, sink connector๋ฅผ ์ƒ์„ฑํ•  ์ฐจ๋ก€์ž…๋‹ˆ๋‹ค.
๋จผ์ € sink connector๊ฐ€ ์‚ฌ์šฉํ•  ํ† ํ”ฝ์„ ๋งŒ๋“ค์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
์œ„์˜ ๋‚ด์šฉ์„ ์ฐธ๊ณ ํ•˜์—ฌ sink connector json config(configs/source.json)๋ฅผ ์ž‘์„ฑํ•ด๋ด…์‹œ๋‹ค
{ "name": "opensearch-sink", "config": { "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector", "tasks.max": "2", "topics": "impressions", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": false, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "connection.url": "https://opensearch-node1:9200", "connection.username": "admin", "connection.password": "admin", "schema.ignore": true, "key.ignore": true, "type.name": "_doc", "behavior.on.malformed.documents": "fail", "behavior.on.null.values": "ignore", "behavior.on.version.conflict": "ignore", "errors.deadletterqueue.topic.name": "ad-tech-dl", "errors.tolerance": "all", "errors.deadletterqueue.context.headers.enable": true, "errors.deadletterqueue.topic.replication.factor": 1, "transforms": "insertTS,formatTS", "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertTS.timestamp.field": "created_at", "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss", "transforms.formatTS.field": "created_at", "transforms.formatTS.target.type": "string" } }
Shell
๋ณต์‚ฌ
์„ค์ • ์„ค๋ช…
์œ„์™€ ๊ฐ™์ด ์‹ฑํฌ ์ปค๋„ฅํ„ฐ ์„ค์ •์„ ๋งˆ์ณค๋‹ค๋ฉด, ์ด์ œ ์•„๋ž˜ rest api๋ฅผ ํ†ตํ•ด ์ƒ์„ฑ์„ ํ•ด๋ด…์‹œ๋‹ค.
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @configs/source.json
Shell
๋ณต์‚ฌ
์•„๋ž˜์™€ ๊ฐ™์ด ๋œจ๋ฉด ์„ฑ๊ณต์ž…๋‹ˆ๋‹ค.
$ curl http://localhost:8083/connectors/ad-tech-sink/status | json_pp { "connector" : { "state" : "RUNNING", "worker_id" : "172.19.0.8:8083" }, "name" : "ad-tech-sink", "tasks" : [ { "id" : 0, "state" : "RUNNING", "worker_id" : "172.19.0.8:8083" }, { "id" : 1, "state" : "RUNNING", "worker_id" : "172.19.0.8:8083" } ], "type" : "source" }
Shell
๋ณต์‚ฌ
์ฐธ๊ณ ๋กœ data ingestion์„ ํ•  ๋•Œ, created dt์™€ ๊ฐ™์€ ํƒ€์ž„ ์Šคํƒฌํ”„๋ฅผ sink์‹œ์— ๋‚จ๊ธฐ๊ณ  ์‹ถ์œผ๋ฉด, SMT๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค. (์ด๋ฒคํŠธ ํƒ€์ž„์Šคํƒฌํ”„ ์†์„ฑ์€ ์‹œ๊ฐ„์  ๋ถ„์„์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฐ ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.)
์›๋ณธ ์ปค๋„ฅํ„ฐ์—์„œ ์ด๋ฅผ ์„ค์ •ํ•˜๋Š” ์ข…ํ•ฉ์ ์ธ ๋ฐฉ๋ฒ•์„ ์ฐพ์ง€ ๋ชปํ•˜๋ฏ€๋กœ, ์ƒˆ๋กœ์šด ํ•„๋“œ์ธ created_at์ด ๋‹จ์ผ ๋ฉ”์‹œ์ง€ ๋ณ€ํ™˜ (SMTs)์„ ์‚ฌ์šฉํ•˜์—ฌ ์ถ”๊ฐ€๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ๊ตฌ์ฒด์ ์œผ๋กœ๋Š” ๋‘ ๊ฐ€์ง€ ๋ณ€ํ™˜, insertTS์™€ formatTS๋ฅผ ์ถ”๊ฐ€ํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฆ„์—์„œ ์•Œ ์ˆ˜ ์žˆ๋“ฏ์ด ์ „์ž๋Š” ์‹œ์Šคํ…œ ํƒ€์ž„์Šคํƒฌํ”„ ๊ฐ’์„ ์‚ฝ์ž…ํ•˜๊ณ  ํ›„์ž๋Š” ์ด๋ฅผ yyyy-MM-dd HH:mm:ss ํ˜•์‹์œผ๋กœ ํฌ๋งทํŒ…ํ•ฉ๋‹ˆ๋‹ค.
"transforms": "insertTS,formatTS", "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertTS.timestamp.field": "created_at", "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss", "transforms.formatTS.field": "created_at", "transforms.formatTS.target.type": "string"
Shell
๋ณต์‚ฌ
์œ„์™€ ๊ฐ™์€ ์„ค์ •์„ json config์— ๋„ฃ์–ด์„œ sink connector๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๊ฑฐ๋‚˜ ์žฌ์ƒ์„ฑํ•˜๋ฉด, ๋ฉ๋‹ˆ๋‹ค.