์ ๋ ๊ฐ์ ํ์ผ์ ๋ชจ๋ ๋ค์ด ๋ฐ์์ผ ํฉ๋๋ค
์ฌ์ฉ์ ์ ์ ํ๋ฌ๊ทธ์ธ ์์ฑ
๋ ์ปค๋ฅํฐ๋ฅผ ๋ค์ด๋ก๋ํ๊ณ ์์ถ์ ํ๋ฉด โtransforms-for-apache-kafka-connect-1.5.0/transforms-for-apache-kafka-connect-1.5.0โ ํด๋๋ก ์ด๋ํ์ญ์์ค. ์ฌ๊ธฐ์๋ ๋ ๊ฐ์ .jar ํ์ผ์ด ์์ต๋๋ค. โtransforms-for-apache-kafka-connect-1.5.0.jarโ์ โopensearch-connector-for-apache-kafka-3.1.1/opensearch-connector-for-apache-kafka-3.1.1โ ํด๋๋ก ๋ณต์ฌํ์ญ์์ค.
โslf4j-api-1.7.36.jarโ์ ์ด๋ฏธ โopensearch-connector-for-apache-kafka-3.1.1โ ํด๋์ ์์ผ๋ฏ๋ก ๋ณต์ฌํ ํ์๊ฐ ์์ต๋๋ค.
์ด ์์
์ด ์๋ฃ๋๋ฉด โopensearch-connector-for-apache-kafka-3.1.1โ ํด๋๋ฅผ ์์ถํ๊ณ , ์ฐ๋ฆฌ์ ์ฌ์ฉ์ ์ ์ ํ๋ฌ๊ทธ์ธ์ด ์์ฑ๋ฉ๋๋ค.
zip -r opensearch-connector-for-apache-kafka-3.1.1.zip opensearch-connector-for-apache-kafka-3.1.1
Shell
๋ณต์ฌ
์์ ๊นํ๋ธ ๋ด์ฉ๋๋ก ์์ถ์ ํ์ด์, plugin ํด๋ ์์ ๋ฐฐ์นํฉ๋๋ค.
์์ถ ํด์ ๋ ๋ฐ์ด๋๋ฆฌ๋ฅผ ๊ฐ Connect ์์
๋
ธ๋์ ๋๋ ํ ๋ฆฌ์ ๋ฐฐ์นํฉ๋๋ค.
์๋ฅผ ๋ค์ด, /connectors
์ด ๊ฒฝ์ฐ, opensearch-connector-for-kafka๋ฅผ connectors ํด๋์ ๋ฐฐ์นํฉ๋๋ค
connectors
โโโ opensearch-connector-for-apache-kafka
Plain Text
๋ณต์ฌ
๊ทธ๋ฐ ๋ค์, ๊ฐ ์ฐ๊ฒฐ ์์
์ ๊ตฌ์ฑ์์ /connectors์ plugin.path ๊ตฌ์ฑ์ ์ถ๊ฐํ์ญ์์ค:
plugin.path=/connectors
$ tree connectors/
connectors/
โโโ msk-datagen
โ โโโ msk-data-generator.jar
โโโ opensearch-connector
...
โ โโโ transforms-for-apache-kafka-connect-1.5.0.jar // ์ถ๊ฐํ ํ์ผ
โ โโโ opensearch-2.6.0.jar
โ โโโ opensearch-cli-2.6.0.jar
โ โโโ opensearch-common-2.6.0.jar
โ โโโ opensearch-connector-for-apache-kafka-3.1.0.jar
โ โโโ opensearch-core-2.6.0.jar
...
โโโ opensearch-connector.zip
Shell
๋ณต์ฌ
opensearch-connector ํด๋๋ฅผ ์์ถํด์ opensearch-connector.zip์ผ๋ก ๋ง๋ ํ,
MSK์ฉ ์ปค์คํ
์ปค๋ฅํฐ ํ๋ฌ๊ทธ์ธ์ ๋ง๋ค ์์ ์ด๋ผ๋ฉด, "msk-custom-plugins"๋ผ๋ ์ด๋ฆ์ S3 ๋ฒํท์ ๋ง๋ ํ, ์ฐ๋ฆฌ์ zip ํ์ผ "opensearch-connector-for-apache-kafka-3.1.1.zip"์ ๊ทธ ์์ ์
๋ก๋ํ๋ฉด ๋ฉ๋๋ค.
MSK ์ปค๋ฅํฐ ํ๋ฌ๊ทธ์ธ ์์ฑ ํ๋ก์ธ์ค๋ ์๋ ๊ธ์ ์ฐธ๊ณ ํ์๋ฉด ๋ ๋์์ด ๋ ๊ฒ์
๋๋ค.
์ฐธ๊ณ ๋ก connectors ๋ฅผ ํ๋ฌ๊ทธ์ธ path๋ก ์ก์ ์ดํ, MSK๊ฐ ์๋ ๊ธฐ์กด ์นดํ์นด ์ปค๋ฅํฐ๋ฅผ ์ฌ์คํํ๋ฉด,
์๋ ๋ช
๋ น์ด๋ฅผ ํตํด ํ๋ฌ๊ทธ์ธ ์์ฑ์ ํ์ธํ ์ ์์ต๋๋ค.
curl http://localhost:8083/connector-plugins | jq .
[
...
{
"class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"type": "sink",
"version": "3.0.0"
},
...
]
Shell
๋ณต์ฌ
Sink connector ์์ฑ
์ด์ ํ๋ฌ๊ทธ์ธ์ ๋ง๋ค์์ผ๋, sink connector๋ฅผ ์์ฑํ ์ฐจ๋ก์
๋๋ค.
๋จผ์ sink connector๊ฐ ์ฌ์ฉํ ํ ํฝ์ ๋ง๋ค์ด์ผ ํฉ๋๋ค.
์์ ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ sink connector json config(configs/source.json)๋ฅผ ์์ฑํด๋ด
์๋ค
{
"name": "opensearch-sink",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"tasks.max": "2",
"topics": "impressions",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"connection.url": "https://opensearch-node1:9200",
"connection.username": "admin",
"connection.password": "admin",
"schema.ignore": true,
"key.ignore": true,
"type.name": "_doc",
"behavior.on.malformed.documents": "fail",
"behavior.on.null.values": "ignore",
"behavior.on.version.conflict": "ignore",
"errors.deadletterqueue.topic.name": "ad-tech-dl",
"errors.tolerance": "all",
"errors.deadletterqueue.context.headers.enable": true,
"errors.deadletterqueue.topic.replication.factor": 1,
"transforms": "insertTS,formatTS",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "created_at",
"transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss",
"transforms.formatTS.field": "created_at",
"transforms.formatTS.target.type": "string"
}
}
Shell
๋ณต์ฌ
์ค์ ์ค๋ช
์์ ๊ฐ์ด ์ฑํฌ ์ปค๋ฅํฐ ์ค์ ์ ๋ง์ณค๋ค๋ฉด, ์ด์ ์๋ rest api๋ฅผ ํตํด ์์ฑ์ ํด๋ด
์๋ค.
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @configs/source.json
Shell
๋ณต์ฌ
์๋์ ๊ฐ์ด ๋จ๋ฉด ์ฑ๊ณต์
๋๋ค.
$ curl http://localhost:8083/connectors/ad-tech-sink/status | json_pp
{
"connector" : {
"state" : "RUNNING",
"worker_id" : "172.19.0.8:8083"
},
"name" : "ad-tech-sink",
"tasks" : [
{
"id" : 0,
"state" : "RUNNING",
"worker_id" : "172.19.0.8:8083"
},
{
"id" : 1,
"state" : "RUNNING",
"worker_id" : "172.19.0.8:8083"
}
],
"type" : "source"
}
Shell
๋ณต์ฌ
์ฐธ๊ณ ๋ก data ingestion์ ํ ๋, created dt์ ๊ฐ์ ํ์ ์คํฌํ๋ฅผ sink์์ ๋จ๊ธฐ๊ณ ์ถ์ผ๋ฉด, SMT๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฉ๋๋ค. (์ด๋ฒคํธ ํ์์คํฌํ ์์ฑ์ ์๊ฐ์ ๋ถ์์ ์ํํ๋ ๋ฐ ์ ์ฉํ ์ ์์ต๋๋ค.)
์๋ณธ ์ปค๋ฅํฐ์์ ์ด๋ฅผ ์ค์ ํ๋ ์ข
ํฉ์ ์ธ ๋ฐฉ๋ฒ์ ์ฐพ์ง ๋ชปํ๋ฏ๋ก, ์๋ก์ด ํ๋์ธ created_at์ด ๋จ์ผ ๋ฉ์์ง ๋ณํ (SMTs)์ ์ฌ์ฉํ์ฌ ์ถ๊ฐ๋์์ต๋๋ค. ๊ตฌ์ฒด์ ์ผ๋ก๋ ๋ ๊ฐ์ง ๋ณํ, insertTS์ formatTS๋ฅผ ์ถ๊ฐํ์ต๋๋ค. ์ด๋ฆ์์ ์ ์ ์๋ฏ์ด ์ ์๋ ์์คํ
ํ์์คํฌํ ๊ฐ์ ์ฝ์
ํ๊ณ ํ์๋ ์ด๋ฅผ yyyy-MM-dd HH:mm:ss ํ์์ผ๋ก ํฌ๋งทํ
ํฉ๋๋ค.
"transforms": "insertTS,formatTS",
"transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field": "created_at",
"transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss",
"transforms.formatTS.field": "created_at",
"transforms.formatTS.target.type": "string"
Shell
๋ณต์ฌ
์์ ๊ฐ์ ์ค์ ์ json config์ ๋ฃ์ด์ sink connector๋ฅผ ์
๋ฐ์ดํธํ๊ฑฐ๋ ์ฌ์์ฑํ๋ฉด, ๋ฉ๋๋ค.