코알못

[AWS] EC2를 이용한 KAFKA Connect 구축 본문

BIG DATA

[AWS] EC2를 이용한 KAFKA Connect 구축

코린이s 2022. 1. 17. 23:40
728x90

저번 시간에는 kafka cluster 를 구축 하고 어떻게 동작하는지 알아보았다!

이제 connect 를 구축하여 데이터 허브를 구성한다!

connect 는 원하는(예 : RDB, S3 등) 데이터를 알아서 가져오고(producer) 원하는 형태(예: RDB, S3 등)로 넣어주는(consumer) 솔루션 이다.

직접 producer, comsumer 을 만들어서 사용해도 되지만 관리/성능 측면에서 내가 만드는 것 보다 잘 만들어둔 솔루션(connect)을 사용하는것이 낫기에 사용하도록 한다!

테스트를 위해 자주 사용하는 명령어는 아래 정리하였으니 참고하도록 한다.

// topic list
./kafka-topics.sh --list --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092

// 메세지 생성 
./kafka-console-producer.sh --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic topic-A
>hi
>bye

// 메세지 받기
./kafka-console-consumer.sh --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic topic-A --from-beginning --group group-01

confluent 에서 connect 받아 설치해본다!  

- confluent 사이트 : https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

 

JDBC Connector (Source and Sink) for Confluent Platform | Confluent Platform 5.5.1

JDBC Connector (Source and Sink) for Confluent Platform You can use the Kafka Connect JDBC source connector to import data from any relational database with a JDBC driver into Apache Kafka® topics. You can use the JDBC sink connector to export data from K

docs.confluent.io

위 사이트에서 원하는 설치 방식으로 설치 하면 되며 아래와 같이 2가지 방식이 있으며 저자는 권장 방식인 'confluent hub client' 를 통해 설치를 진행 하였다.

- confluent hub client 를 통한 설치

- 직접 zip 다운로드

hub client 를 설치 해본다

$ mkdir confluent
$ cd confluent
$ wget http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
$ tar -xvf confluent-hub-client-latest.tar.gz
> /home/ec2-user/confluent/bin/confluent-hub 명령어
> confluent-hub install <owner>/<component>:<version>
$ /home/ec2-user/confluent/bin/confluent-hub install confluentinc/kafka-connect-jdbc:latest
Unable to detect Confluent Platform installation. Specify --component-dir and --worker-configs explicitly.

위와 같이 --component-dir, --worker-configs  옵션을 넣어 달라는 오류가 발생하여 옵션에 해당하는 파일 및 디렉토리를 생성한다.

# component 디렉토리 생성
$ mkdir /home/ec2-user/confluent/component
# 설정 파일 경로 생성
$ mkdir /home/ec2-user/confluent/config
# 설정 파일로 들어가서 설정 파일(worker.properties) 생성
$ cd /home/ec2-user/confluent/config
# 빈 파일로 생성한다.
$ vi worker.properties :wq!
# 옵션을 넣어 설치 진행한다. (아래 설치를 묻는 메세지 나오면 y를 입력하여 설치 진행)
$ /home/ec2-user/confluent/bin/confluent-hub install confluentinc/kafka-connect-jdbc:latest --component-dir /home/ec2-user/confluent/component --worker-configs /home/ec2-user/confluent/config/worker.properties
> y
/home/ec2-user/confluent/config/worker.properties
Completed

설치 완료가 되면 component 폴더에 jdbc 폴더가 만들어지며 해당 lib 폴더를 가니 connector jar 파일이 보인다.

저자는 mysql 로 테스트 해볼 예정이였으나 mssql, oracle 은 존재하나 mysql 이 안보인다....

아래 mysql connector 다운로드 사이트에서 원하는 버전에 대한 다운로드 주소를 복사한다.

- https://dev.mysql.com/downloads/connector/j/ 

 

MySQL :: Download Connector/J

MySQL Connector/J 8.0 is highly recommended for use with MySQL Server 8.0, 5.7 and 5.6. Please upgrade to MySQL Connector/J 8.0.

dev.mysql.com

$ wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz
$ tar -xvf mysql-connector-java-8.0.27.tar.gz

위와 같이 다운로드 및 압축 풀기를 완료 하였으면 해당 폴더에 mysql-connector-java-8.0.27.jar 파일을 connector lib 디렉토리에 넣는다.

$ cp mysql-connector-java-8.0.27.jar /home/ec2-user/confluent/component/confluentinc-kafka-connect-jdbc/lib

 

-  /home/ec2-user/confluent/config/worker.properties 설정

# 설치시 자동으로 설정됨
plugin.path = /home/ec2-user/confluent/component

# 카프카 브로커 서버
bootstrap.servers : kafka-01:9092,kafka-02:9092,kafka-03:9092

# 다수의 카프카 프로세스들을 묶을 그룹 이름
group.id=connect-cluster

# 데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는데 사용
# 카프카 커넥트는 JsonConverter, StringConverter, ByteArrayConverter를 기본으로 제공
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# key value converter 를 사용하기 위해 true로 설정
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 분산 모드 커넥트는 카프카 내부 토픽에 오프셋 정보를 저장.
# 이 오프셋 정보는 소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용함.
offset.storage.topic : connect-offsets
offset.storage.replication.factor : 1
config.storage.topic : connect-configs
config.storage.replication.factor : 1
status.storage.topic : connect-status
status.storage.replication.factor : 1

# 오프셋을 커밋하는 주기 (10초)
offset.flush.interval.ms=10000

이제 설정을 마쳤으면 kafka connect 실행 alias 를 설정 한다.

alias start_connect='nohup /home/ec2-user/kafka_2.12-3.0.0/bin/connect-distributed.sh /home/ec2-user/confluent/config/worker.properties > /data/app_logs/kafka-connect.logs 2>&1 &'

security group 은 아래와 같이 설정 되었는지 꼭 확인 하여 방화벽 이슈가 없도록 한다.

이제 옮길 테스트 데이터를 만들어 본다!

mysql 접속

$ mysql.server start

테스트 데이터를 만든다.

# table 만들기
mysql> create database kafka
    -> ;
Query OK, 1 row affected (0.01 sec)

mysql> use kafka
Database changed
mysql> create table tb_user(
    -> id int AUTO_INCREMENT primary key,
    -> name varchar(10) not null
    -> );
mysql> insert into tb_user(name) values('yoolee');
Query OK, 1 row affected (0.01 sec)

mysql> select * from tb_user;
+----+--------+
| id | name   |
+----+--------+
|  1 | yoolee |
+----+--------+


mysql 의 경우 외부 IP(AWS) 에서 접속을 위해 2가지 설정이 필요하다.

1. mysql conf 파일에서 허용 IP 설정

2. mysql 유저에 IP 권한 부여 (저자는 카프카 테스트용 유저 생성 하고 해당 유저에 권한을 부여한다. 기존 유저 사용시 유저 생성 부분은 패스 해도 된다.)


1번은 아래와 같이 진행 한다. (기존에 127.0.0.1 로 되어 있던것을 0.0.0.0으로 변경하여 모든 IP 에 대해 허용한다. 기호에 맞게 허용 IP 를 설정해도 된다. 저자는 테스트 데이터만 넣을 예정이라 모두 열어 둔다.)

$ vi /usr/local/etc/my.conf
bind-address = 0.0.0.0
mysqlx-bind-address = 0.0.0.0
:wq!

그리고 mysql 서버를 재기동 하여 적용 한다.

$ mysql.server stop
Shutting down MySQL
. SUCCESS!
$ mysql.server start
Starting MySQL
. SUCCESS!
$ mysql -u root -p
use mysql;
mysql> select host, user from mysql.user;
+-----------+------------------+
| host      | user             |
+-----------+------------------+
| localhost | mysql.infoschema |
| localhost | mysql.session    |
| localhost | mysql.sys        |
| localhost | root             |
+-----------+------------------+
mysql> create user 'kafka-user'@'%' IDENTIFIED by 'Kafka@1234'
mysql> select host, user from mysql.user;
+-----------+------------------+
| host      | user             |
+-----------+------------------+
| %         | kafka-user       |
| localhost | mysql.infoschema |
| localhost | mysql.session    |
| localhost | mysql.sys        |
| localhost | root             |
+-----------+------------------+

// 권한 부여 (kafka DB 에 모든 테이블에 모든 권한을 해당 유저로 접속하는 모든 IP 에 대해 허용)
mysql> grant all privileges on kafka.* to 'kafka-user'@'%';
mysql> show grants for `kafka-user`;
+-------------------------------------------------------+
| Grants for kafka-user@%                               |
+-------------------------------------------------------+
| GRANT USAGE ON *.* TO `kafka-user`@`%`                |
| GRANT ALL PRIVILEGES ON `kafka`.* TO `kafka-user`@`%` |
+-------------------------------------------------------+
mysql> quit

권한 부여 명령어 아래 참고하여 기호에 맞게 설정해도 된다.

- 특정 ip 허용
grant all privileges on *.* to ‘root’@‘192.168.1.2’ identified by '패스워드';

- 특정 ip 대역 허용
grant all privileges on *.* to ‘root’@‘192.168.1.%’ identified by ‘패스워드’;

- 모든 ip 허용
grant all privileges on *.* to ‘root’@‘%’ identified by ‘패스워드’;

생성한 유저로 접속한뒤 정상적으로 조회되는지 확인 해보면 kafka db 만 접속 가능하며 테이블도 정상적으로 보인다.

$ mysql -u kafka-user -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 9
Server version: 8.0.23 Homebrew

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| kafka              |
+--------------------+
2 rows in set (0.00 sec)

mysql> use kafka;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+-----------------+
| Tables_in_kafka |
+-----------------+
| tb_user         |
+-----------------+
1 row in set (0.00 sec)

mysql> select * from tb_user;
+----+--------+
| id | name   |
+----+--------+
|  1 | yoolee |
+----+--------+
mysql> insert into tb_user(name) values('roro');
Query OK, 1 row affected (0.01 sec)

mysql> select * from tb_user;
+----+--------+
| id | name   |
+----+--------+
|  1 | yoolee |
|  2 | roro   |
+----+--------+
2 rows in set (0.00 sec)

 

저자의 경우 공유기를 사용하여 네트워크를 이용하고 있었기에 외부에서 로컬 mysql 서버를 접속할 수 없어 공유기의 포트 포워딩 설정도 진행한다.

공유기는 next 를 사용하고 있어 아래 주소로 접속하여 설정이 가능하다. 그외 제조사별 설정 URL 은 http://www.codns.com/b/B05-52 에서 확인 가능하다.

- http://192.168.1.254/index.html

ifconfig 를 통해 내 사설 IP를 확인한뒤 '내부 IP' 에 값을 입력한뒤 포트는 mysql 포트를 입력 하여 추가한다.

자 이제 외부에서 공인IP:3306 으로 접속시 와이파이에 연결된 내 로컬 PC의 mysql 에 접속할 수 있다.

테스트를 위해 aws 카프카 클러스터 구성한 s3 를 접속하여 telnet 을 설치한다.

$ sudo yum install telnet

설치가 완료 되면 내 공인 IP 를 네이버에서 쉽게 확인 가능 하다.

이제 s3 에서 정상적으로 로컬 PC 의 mysql 에 접속 되는지 확인한다.

// telnet 을 이용해 port 가 열려있는지 확인
$ telnet 14.55.241.234 3306
// Trying 14.55.241.234... 상태로 무한 대기 아니면 정상

내 로컬 PC 에서도 공인 IP 를 통해 접속 했을때 정상적으로 찾아 가는지 확인 한다.

$ mysql -h 공인IP -u root -p

자 이제 connect 를 실행 시켜 본다 (이전에 kafka cluster 모두 구동, zookeeper 모두 구동 되어 있어야 한다.)

$ start_connect

자 이제 포스트맨을 활용하여 컨넥트 리스트를 조회한다. 아직 만든게 없어서 안나온다.

// kafka connect list

{{kafka-connect-01}}/connectors

이제 source connect 를 생성하여 DB의 테이블을 연결 한다.

각 속성의 의미는 아래 참고하여 설정한다.
name : source connector 이름(JdbcSourceConnector를 사용)
config.connector.class : 커넥터 종류(JdbcSourceConnector 사용)
config.connection.url : jdbc이므로 DB의 정보 입력
config.connection.user : DB 유저 정보
config.connection.password : DB 패스워드
config.mode : "테이블에 데이터가 추가됐을 때 데이터를 polling 하는 방식"(bulk, incrementing, timestamp, timestamp+incrementing)
config.incrementing.column.name : incrementing mode일 때 자동 증가 column 이름
config.table.whitelist : 데이터를 변경을 감지할 table 이름
config.topic.prefix : kafka 토픽에 저장될 이름 형식 지정 위 같은경우 whitelist를 뒤에 붙여 example_topic_users에 데이터가 들어감
tasks.max : 커넥터에 대한 작업자 수

{{kafka-connect-01}}/connectors
{
    "name": "kafka-connect-test",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://14.55.241.234:3306/kafka",
        "connection.user":"kafka-user",
        "connection.password":"Kafka@1234",
        "mode":"incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist" : "tb_user",
        "topic.prefix" : "bk_",
        "tasks.max" : "2"
    }
}

만약 컨넥트 로그 확인시 아래와 같은 오류 발생시 해결법 대로 해결하면 된다.
// error

org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: No suitable driver found for jdbc:mysql://14.55.241.234:3306/kafka\n\tat


- 해결법
plugin 경로 설정한 곳의 lib 폴더에 mysql jdbc connecter 을 넣는다.
/home/ec2-user/confluent/component/confluentinc-kafka-connect-jdbc/lib/mysql-connector-java-8.0.27.jar

정상적으로 연결 되었다면 이제 해당 토픽을 받아 처리할 sink 컨넥터를 만든다.

설정은 source 컨넥터와 다른 부분만 아래 정리 하였으니 참고하여 생성한다.
topics 는 위에 소스 컨넥터에 설정한 토픽명으로 적어 해당 토픽에 대해 처리하도록 한다. 토픽명은 topic.prefix+whitelist 명칭으로 만들어 진다. 
auto.create : 데이터를 넣을 테이블이 누락되었을 경우 자동 테이블 생성 여부
auto.evolve : 특정 데이터의 열이 누락된 경우 대상 테이블에 ALTER 구문을 날려 자동으로 테이블 구조를 바꾸는지 여부 (하지만 데이터 타입 변경, 컬럼 제거, 키본 키 제약 조건 추가등은 시도되지 않는다)
delete.enabled : 삭제 모드 여부

{{kafka-connect-01}}/connectors
{
    "name": "kafka-connect-sink-test",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://14.55.241.234:3306/kafka",
        "connection.user":"kafka-user",
        "connection.password":"Kafka@1234",
        "auto.create":"true",
        "auto.evolve":"false",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"bk_tb_user"
    }
}

이제 테스트를 진행해본다! 소스 컨넥터에 mode를 incrementing으로 했기에 소스 컨넥터를 만든 시점 이후 증가되는(증가 기준을 id 칼럼으로 설정 함) 시점에 데이터를 추가하도록 되어있다. 해당 테이블에 데이터를 넣어서 실시간으로 데이터가 신규 테이블(bk_tb_user) 에 넣어지는지 본다.

// 컨슈머 처리를 실시간으로 확인한다.
$ ./kafka-console-consumer.sh --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic opic-tb_user-tb_user --from-beginning

데이터를 넣는다.

$ mysql -u kafka-user -p
mysql> insert into tb_user(name) values('corin');
Query OK, 1 row affected (0.00 sec)

 

아래와 같이 테이블이 없던게

생겼다! (bk_tb_user)

consumer 실시간 로그 확인시 3개의 컨슈머 모두 받는다. 

consumer 로 확인

중복으로 데이터가 들어갔을까? 해서 확인 하였으나 데이터는 중복되지 않고 들어가 있다. 

그외 connect 관련 API 는 아래 참고 하여 사용해본다.

 

// kafka connect status  (상태 확인)

{{kafka-connect-01}}/connectors/{{connect-name}}/status

// kafka connect delete (제거)

{{kafka-connect-01}}/connectors/{{connect-name}}

자 이제 DB에 넣어봤다면 S3에 넣는 실습을 해본다!

일별 전체 데이터를 읽어서 S3 에 저장하는 것을 진행 해본다.

우선 source 는 아래와 같이 mode 를 bulk 로 지정한다.

poll.interval.ms 를 입력하지 않을시 이벤트가 일어날때마다 계속적으로 전체 데이터를 가져오니 (기존 데이터를 지우고 넣는것이 아니라 계속 적으로 누적된다.) 꼭 ms 를 입력한다. (저자는 하루로 지정하였다.)

{
    "name": "kafka-connect-test",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://14.55.240.240:3306/kafka",
        "connection.user":"kafka-user",
        "connection.password":"Kafka@1234",
        "mode":"bulk",
        "poll.interval.ms" : 86400000,
        "table.whitelist" : "tb_user",
        "topic.prefix" : "topic_",
        "tasks.max" : "2"
    }
}

이제 s3 에 데이터를 넣기위해 라이브러리를 다운받아야 하므로 아래 사이트에서 다운로드 하거나 저자는 컨플루언트 허브를 다운받아 설치 했으므로 해당 서비스를 이용하여 설치한다.

- https://www.confluent.io/hub/confluentinc/kafka-connect-s3?_ga=2.55508591.594070642.1625744982-1603746189.1625477668&_gac=1.88220265.1625581902.CjwKCAjw_o-HBhAsEiwANqYhp9A8Gwr2nrUrcKmCU44lUi0qmBpIQFW6PgDk0Dgd3FLNzBNyHoqjJRoCJtYQAvD_BwE 

$ /home/ec2-user/confluent/bin/confluent-hub install confluentinc/kafka-connect-s3:latest --component-dir /home/ec2-user/confluent/component --worker-configs /home/ec2-user/confluent/config/worker.properties
{
    "name": "kafka-connect-sink-s3-test",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "topics.regex" : "topic_tb_user",
        "flush.size": 10,
        "s3.bucket.name": "emr-hong",
        "s3.region": "ap-northeast-2",
        "topics.dir": "dir/test",
        "locale": "ko_KR",
        "timezone": "Asia/Seoul"
    }
}

"connector.class":  컨넥터
"format.class": 토픽 포맷
storage.class": 저장소

topics.regex : 토픽명
"flush.size": 데이터를 몇개씩 처리할지
"s3.bucket.name": bucket name
"s3.region": region name
"topics.dir": s3로 시작하지 않고 버킷명으로 시작하여 끝에 /는 제거하여야 해당 디렉토리 밑에 토픽명으로 디렉토리가 생성되어 쌓인다. (예 : test/data)
"locale": "ko_KR", // 장소
"timezone": "Asia/Seoul" // 타임

- 자세한 설정값 가이드 : https://docs.confluent.io/5.4.2/connect/kafka-connect-s3/index.html

 

Amazon S3 Sink Connector for Confluent Platform | Confluent Platform 5.4.2

Amazon S3 Sink Connector for Confluent Platform You can use the Kafka Connect Amazon S3 sink connector to export data from Apache Kafka® topics to S3 objects in either Avro, JSON, or Bytes formats. Depending on your environment, the S3 connector can expor

docs.confluent.io

싱크를 생성해보면 아래와 같은 에러 발생 할 수 있다 .이는 aws 자격 증명이 없기 때문에 발생하는 것으로 자격증명을 생성한다.

kafka 서버는 aws 에서 만든 서버로 aws 명령어를 사용할 수 있다. 만약 aws 에서 만든 ec2 인스턴스가 아니라면 아래와 같이 aws client 를 설치 하여 진행한다.

$ pip install awscli

우선 aws 키는 aws 사이트에서 아래 내 계정명 클릭 > 보안 자격 증명 > 엑세스 키 만들기를 통해 발급 받는다.

 발급 받은뒤 kafka 서버에서 아래와 같이 자격증명을 등록한다.

$ aws configure
AWS Access Key ID [None]: 키 ID 입력
AWS Secret Access Key [None]: 시크릿 키 입력
Default region name [None]: ap-northeast-2 // 서울 리전 사용시
Default output format [None]: json // json 형식으로 내보냄
// aws 디렉토리가 생성 된다. 
$ cd ~/.aws/
$ ll
-rw------- 1 ec2-user ec2-user  48  1월 19 11:06 config
-rw------- 1 ec2-user ec2-user 116  1월 19 11:06 credentials

이제 아까 만든 sink 는 삭제한뒤 다시 만든다.(재 등록해야 오류가 안난다.)

싱크 컨넥션 삭제
sink 컨넥션 재 생성

상태 체크시 정상으로 나오는것을 볼 수 있다.

이제 S3 에 정상적으로 데이터가 업로드 되는지 확인한다.

버킷/디렉토리/토픽명/파티션번호 안에 데이터가 만들어졌다.

확인 해보면 'flush.size' 설정한 대로 10 개의 행마다 저장된것을 볼 수 있다.

데이터를 보면 어느정도 누락도 있고, 전체 데이터 모두 전송되지 않았다.

해당 부분은 다음에 보도록 하고 오늘은 여기까지 한다 ! 

 

728x90
Comments