Kafka/kafka-connect

JDBC Sink Connector

Thewayhj 2022. 3. 3. 05:29
반응형

Confluent에서 제공하는 기능으로 JDBCSinkConnector는 SourceConnector 또는 debezium 등으로 특정 topic에 있는 데이터를 sink connector에 맞게 이관 또는 마이그레이션 등을 처리할 수 있도록 하는 기능입니다.


JdbcSinkConnector REST API (MySQL)
curl --location --request POST 'http://localhost:8083/connectors'
--header 'Content-Type: application/json'
--data-raw '{
    "name": "mysql-sink",
    "config": {
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.user": "${user id}",
        "connection.password": "${password}",
        "connection.url": "jdbc:mysql://localhost:3306/test",
        "auto.create": "false" ,
        "auto.evolve": "false",
        "pk.mode": "record_key",
        "pk.fields": "number",
        "insert.mode": "upsert",
        "topics.regex": "test.test.member",
        "table.name.format":"test.member_info_sink",
        "fields.whitelist": "number, member_id, status, reg_dt",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "db.timezome": "UTC"
    }
}'

JdbcSinkConnector Option

  • tasks.max : connector에서 실행되는 최소 작업 수
  • connector.class : sinkconnector의 속성 (여러가지 connector format을 지원함.)
    • ex)
      • JdbcSinkConnector
      • FileStreamSinkConnector
  • connection.user : DB 접속 위한 ID
  • connection.password : DB 접속 위한 패스워드
  • connection.url : JDBC connection URL 정보
    • ex)
      • Oracle : jdbc:oracle:thin:@localhost:1521:{db_name}
      • MySQL : jdbc:mysql://localhost/{db_name}
      • MSSQL : jdbc:sqlserver://localhost;databaseName={db_name}
  • auto.create : true로 설정 시 테이블을 자동하는 옵션
    • confluent document에는 해당 옵션을 true로 설정하지 말라고 되어있음.
  • auro.evolve : schema 데이터가 누락될 경우 테이블 schema에 자동으로 추가하는 옵션
  • insert.mode : 데이터를 쓰는 방법 
    • insert
    • upsert : 해당 옵션 사용 시 pk.mode 와 pk.field 옵션도 함께 사용해야 합니다.
    • update
  • key.converter / value.converter : topic에 저장된 직렬화된 데이터를 변환할 때 사용.
  • key.converter.schema.enable / value.converter.schema.enable : schema가 있는 포맷 사용 여부
    • 해당 값을 true, false 바꿔보면서 테스트 해봤지만 topic에 쌓이는 데이터가 바뀌지 않는 것으로 확인. (재 확인 필요)


참고 사이트

https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html
반응형

'Kafka > kafka-connect' 카테고리의 다른 글

Amazon S3 sink connector  (0) 2022.03.03