반응형

Kafka 4

JDBC Sink Connector

Confluent에서 제공하는 기능으로 JDBCSinkConnector는 SourceConnector 또는 debezium 등으로 특정 topic에 있는 데이터를 sink connector에 맞게 이관 또는 마이그레이션 등을 처리할 수 있도록 하는 기능입니다. JdbcSinkConnector REST API (MySQL) curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{ "name": "mysql-sink", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.jd..

Kafka/kafka-connect 2022.03.03

Amazon S3 sink connector

Amazon S3에 데이터를 쌓기 위해 우선 AWS 셋팅이 필요합니다. 1. AWS S3 bucket 메뉴에 접속합니다. 2. 버킷 만들기 S3에 생성할 bucket 이름을 생성한다. (bucket 이름은 고유한 이름으로 정하기 때문에 기존에 S3 bucket 이름 확인 후 생성한다.) AWS 리전은 S3를 생성하고 싶은 리전 확인 후 생성한다. AWS에서 객체 소유권 셋팅은 bucket에 작성한 각각의 객체를 개별적으로 제어해야하는 상황을 제외하고 ACL 기능을 비활성화 하는 것을 권장한다고 합니다. bucket 생성 시 외부에서 무단으로 접근하는 환경으로부터 데이터를 보호하기위해 퍼블릭 액세스 차단을 권장. bucket 버전 관리는 임의로 테스트를 위해 생성하는 S3이므로 비활성화 처리로 선택. 추후..

Kafka/kafka-connect 2022.03.03

SinkConnector 각 종 에러

1. Task threw an uncaught and unrecoverable exception 원인 sinkConnector를 실행시킬 때 Kafka에 기록한 직렬화된 데이터를 변환하기 위해 사용한 "key.converter"과 "value.converter"을 사용했는데 connect.log 확인 시 schema 정보를 갖고 있는 key를 찾을 수 없다는 에러 로그를 확인하였다. 하여 topic에 쌓인 데이터를 확인해보니 아래와 같이 쌓인 것을 볼 수 있었다. 해결 방안 sinkConnector에서 "key.converter" 또는 "value.converter" 옵션을 사용하기 위해서는 debezium 또는 sourceConnector 를 실행시킬 때 "key.converter", "value.co..

Kafka Troubleshooting

snapshot.mode : schema_only_recovery 간혹 debezium으로 테스트를 진행 중에 테스트를 위해 docker로 실행시킨 MySQL을 stop 후 다시 start를 하면 connector가 정상적으로 싱행이 되지 않을 때가 있었다. 정상적으로 connector가 실행되지 않을 때 처음으로 발생하는 에러는 MySQL에 접근하는 계정에 권한이 없다고 하는 에러였다. 저는 MySQL8 이상을 사용하고 있어 아래와 같이 명령어를 통해 권한을 주었다. (MySQL 5.7 이하는 identified를 통해 가능.) GRANT ALL PRIVILEGES ON {DB명}.* TO {계정 ID}@'%' WITH GRANT OPTION; // 권한 부여 FLUSH PRIVILEGES; // 권한..

반응형