Kafka/kafka-connect

Amazon S3 sink connector

Thewayhj 2022. 3. 3. 04:54
반응형

Amazon S3에 데이터를 쌓기 위해 우선 AWS 셋팅이 필요합니다.

 

1. AWS S3 bucket 메뉴에 접속합니다.

2. 버킷 만들기

  • S3에 생성할 bucket 이름을 생성한다. (bucket 이름은 고유한 이름으로 정하기 때문에 기존에 S3 bucket 이름 확인 후 생성한다.)
  • AWS 리전은 S3를 생성하고 싶은 리전 확인 후 생성한다.
  • AWS에서 객체 소유권 셋팅은 bucket에 작성한 각각의 객체를 개별적으로 제어해야하는 상황을 제외하고 ACL 기능을 비활성화 하는 것을 권장한다고 합니다.

버킷 만들기 버튼 클릭했을 떄 노출되는 화면

 

  • bucket 생성 시 외부에서 무단으로 접근하는 환경으로부터 데이터를 보호하기위해 퍼블릭 액세스 차단을 권장.

 

  • bucket 버전 관리는 임의로 테스트를 위해 생성하는 S3이므로 비활성화 처리로 선택.
    • 추후 서비스 운영을 할때는 해당 기능의 활성화를 통해 장애 대응을 해야할 것으로 확인.
  • AWS에서는 퍼블릭 액세스 차단 외 기본 암호화 활성화를 통해 bucket에 저장된 데이터를 암호화하는 것을 권장한다고 함. 하지만 테스트를 위해 만드는 부분으로 암호화 설정을 하지 않았으나 "Amazon S3 관리형 키(SSE-S3)" 또는 "AWS 키 관리 서비스 키(SSE-MS)" 기능을 제공하는 부분을 확인하여 추후 암호화 복호화 확인 예정.

AWS console을 통해 위의 설정으로 bucket 생성.

 

3. AWS S3에 접근 시 필요한 access key와 secret key를 발급 받습니다.

  • 위치
    • IAM > 사용자 메뉴 > 사용자 선택 > 보안자격 증명 > 액세스 키 만들기
    • 오른쪽 상단에 있는 계정 선택 > 보안 자격 증명 메뉴 선택 > 액세스 키(액세스 키 ID 및 비밀 액세스 키) > 새 액세스 키 만들기
  • 주의할 점
    • secret key는 처음 액세스 키를 생성할 때 확인 및 다운이 가능합니다. 처음 생성 시 잘 보관을 해야하며, secret key 분실 시 액세스 키를 재 생성하여야 합니다.


위의 AWS S3 셋팅이 끝난 후 s3 sink connector를 실행하기 위해 아래 confluent 사이트에서 관련 파일을 다운을 받습니다.

 

Sink Connector Download

Download link : https://www.confluent.io/hub/confluentinc/kafka-connect-s3

 

다운 받은 후 kafka connect를 단일 또는 분산 모드에 맞게 아래 properties에 plugin.path에 추가하거나 설정한 경로에 jar 파일들을 추가한다.

  • 단일모드 : connect-standalone.properties
  • 분산모드 : connect-distributed.properties

 

request POST 'http://localhost:8083/connectors'
--header 'Content-Type: application/json'
--data-raw '{
    "name": "s3-sink",
    "config": {
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "flush.size": "100",
        "partition.duration.ms": "10000",
        "locale": "ko_KR",
        "timezone": "UTC",
        "topics": "${topic}",
        "s3.region": "${s3 region}",
        "s3.bucket.name": "${s3 bucket name}",
        "path.format": "YYYY/MM/dd",
        "aws.access.key.id": "${aws access key}",
        "aws.secret.access.key": "${aws secret key}",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}'
  • flush.size : S3에 올라갈 file에 저장되는 recort 수.
    • (100000으로 설정 시 S3에 파일이 생성되지 않았으며, 100으로 셋팅을 했을 때 정상적으로 파일 생성되는 부분을 확인하여 왜 정상적으로 동작을 하지 않았는지 확인 필요.)
  • partitioner.class : 저장소에 데이터를 저장할 때 사용하는 class
    • Default Kafka Partitioner
    • Field Partitioner
    • Time Based Partitioner
    • Daily Partitioner
  • s3.region : bucket을 생성한 region
    • AWS console 사이트에서 상단 region 클릭하면 각 region 정보 확인 가능.
    • ex) 서울 : ap-northeast-2
  • s3.bucket.name : S3에 생성한 bucket 이름
  • aws.access.key.id : AWS에서 발급받은 access key
  • aws.secret.access.key : AWS에서 발급받은 secret key

 

S3 bucket에 생성되는 파일 경로는 보통 아래와 같으며,

<prefix>/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>

위의 REST API를 통해 connector를 생성하면 S3에 만들어지는 경로는 아래와 같다.

topics/${topic명}/2022/03/01/${topic명}+0+${startOffset}.json

 

참고 사이트

https://docs.confluent.io/kafka-connect-s3-sink/current/overview.html
https://docs.confluent.io/kafka-connect-s3-sink/current/configuration_options.html

 

반응형

'Kafka > kafka-connect' 카테고리의 다른 글

JDBC Sink Connector  (0) 2022.03.03