[AWS] Kafka connector 분산 처리
kafka 토픽의 경우 partition 을 주어 병렬 처리를 할수 있다.
connector 의 경우에도 mode 중 distributed 로 실행하면 고가용성, 처리 성능을 높힐 수 있다.
고가용성은 서비스가 계속적으로 중단없이 유지 되는 성질을 의미하는데 conector 한대가 죽어도 과반수 이상 살아 있을시 서비스 정상 운영 가능하다.
해당 글에서 다룰것은 '처리 성능' 부분인데 처리 성능의 경우 분산 처리 기능을 이용하여 높힐 수 있다.
connection 에는 'tasks.max' 라는 옵션이 있다. 이는 source connector, sink connector 모두에게 줄 수 있는데
정확한 의미는 '일을 진행할 업무자 최대수' 이다.
분산 처리시 connector 별로 '업무자 최대 수'를 지정하여 빠르게 처리해야할 업무는 비중을 높혀 효율적으로 처리 할 수 있다.
저자가 하둡 운영시 sqoop이라는 도구로 DB에 쿼리를 호출하여 데이터를 적재하였고, 하나의 테이블을 가져올시 병렬 처리할 수를 지정하면 데이터량을 처리수만큼 여러 서버에서 나누어 병렬로 처리한다.
kafka source connector도 'tasks.max' 옵션을 주면 데이터량을 해당 수로 나누어 처리할것으로 예상하였으나 테스트 결과는 예상과 달랐다.
결론부터 말하자면 '테이블:task' 가 '1:N' 이 아닌 'N:1' 구조 이다.
아래는 저자가 tasks.max 를 2로 설정 하였을때 task가 한 개 생성 된 테스트 결과 이다.
// 설정 값
{{kafka-connect-01}}/connectors
{
"name": "kafka-connect-test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ip:3306/kafka",
"connection.user":"id",
"connection.password":"passwort",
"mode":"bulk",
"poll.interval.ms" : 86400000,
"table.whitelist" : "tb_user",
"topic.prefix" : "topic_",
"tasks.max" : "2"
}
}
// 상태 확인
{{kafka-connect-02}}/connectors/kafka-connect-test/status
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "ip:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "ip:8083"
}
],
"type": "source"
}
다시한번 생각해보니 업무자 최대수는 말그대로 최대 수이며 무조건 설정한 수치 만큼 할당되지 않을 것이다.
그럼 업무자를 할당하는 기준은 무엇일까? 이부분을 알면 업무자를 최대로 할당할 수 있을 것이고 원하는 결과를 도출할 수 있을것이라 생각하였다.
구글링 결과,
task 수는 테이블 수 만큼 생성되며 테이블 수가 tasks.max 를 넘을시 max 만큼 생성 된다.
아래는 테스트 결과이다.
1. table : 3, max task : 2
// 설정
{
"name": "kafka-connect-test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ip:3306/kafka",
"connection.user":"id",
"connection.password":"password",
"mode":"bulk",
"poll.interval.ms" : 86400000,
"table.whitelist" : "tb_user,tb_user_01,tb_user_03",
"topic.prefix" : "topic_",
"tasks.max" : "2"
}
}
// 결과
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "IP:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "IP:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "IP:8083"
}
],
"type": "source"
}
// 생성된 토픽
$ /home/ec2-user/kafka_2.12-3.0.0/bin/kafka-topics.sh --list --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092
topic_tb_user
topic_tb_user_01
topic_tb_user_03
// 토픽 정보
$ /home/ec2-user/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic topic_tb_user --describe
Topic: topic_tb_user TopicId: _eTXX_cbQVy4j2FLtsuPTQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: topic_tb_user Partition: 0 Leader: 1 Replicas: 1 Isr: 1
- table : 3, max task : 4
// 설정
{
"name": "kafka-connect-test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://IP:3306/kafka",
"connection.user":"ID",
"connection.password":"PW",
"mode":"bulk",
"poll.interval.ms" : 86400000,
"table.whitelist" : "tb_user,tb_user_01,tb_user_03",
"topic.prefix" : "topic_",
"tasks.max" : "4"
}
}
// 결과
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "IP:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "IP:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "IP:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "IP:8083"
}
],
"type": "source"
}
- table : 4, max task : 4
{
"name": "kafka-connect-test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://IP:3306/kafka",
"connection.user":"ID",
"connection.password":"PW",
"mode":"bulk",
"poll.interval.ms" : 86400000,
"table.whitelist" : "tb_user,tb_user_01,tb_user_02,tb_user_03",
"topic.prefix" : "topic_",
"tasks.max" : "4"
}
}
// 상태
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "IP:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "IP:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "IP:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "IP:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "IP:8083"
}
],
"type": "source"
}
// 생성된 토픽
$ /home/ec2-user/kafka_2.12-3.0.0/bin/kafka-topics.sh --list --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092
topic_tb_user
topic_tb_user_01
topic_tb_user_02
topic_tb_user_03
// 토픽 정보 (partition 1개)
/home/ec2-user/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server kafka-01:9092,kafka-02:9092,kafka-03:9092 --topic topic_tb_user --describe
Topic: topic_tb_user TopicId: _82liJn2Qmexw1wqIx6YWg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: topic_tb_user Partition: 0 Leader: 3 Replicas: 3 Isr: 3
그럼 connector 하나로 해당 DB의 다수의 테이블 가져오는 구조로 만들고, 전체 데이터가 아닌 테이블의 일부 데이터만 커스텀하게 가져오고 싶다면 따로 connector를 만들어 처리하면 될 것 같다.
커스텀할 테이블은 table.blacklist 옵션으로 지정하여 다수 테이블 가져오는 connector 에서 쉽게 제외할 수있으며 커스텀하게 설정할 connector 에는 table.whitelist 옵션으로 지정하여 가져온다. 두개의 옵션을 같이줄시 어느 하나가 적용이 안될 수도 있다고 공식문서에 나와 있으니 참고한다. (blacklist 옵션이 왜 필요한지 몰랐으나 이제 이해가 된다.)
connector 의 이러한 처리 방식은 RDB의 수많은 데이터를 쉽게 가져올때 좋은것 같으며 (단, 필터링 없이 그대로 원천 데이터를 가져오고 싶을때) 중요도에 따라 task 수를 지정하여 빠르게 처리 될 수 있도록 할 수 있는점도 좋다.
만약 두가지 방식(하나의 connetor에 여러 테이블 처리, 테이블 별로 connector 생성하여 처리)이 모두 가능하다면
두가지 방식 모두 하나의 테이블 처리하는 task가 죽을시 다른 task 가 처리 한다면
아래 task 수에 따른 테스트를 진행하고 판단하면 좋을 것 같다. (전자는 task 수를 제어 할 수 있음)
만약 모든 테스트후에도 차이가 없다고 판단되면 저자의 경우 전자가 관리하기 좋을것 같아 전자를 선택할 것 같다.
// 테스트
1. 전자
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.50.142:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.50.142:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "172.31.50.172:8083"
}
],
"type": "source"
}
// 죽인후
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.50.172:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.50.172:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "172.31.50.216:8083"
}
],
"type": "source"
}
// 모두 죽인후
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.50.216:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.50.216:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "172.31.50.216:8083"
}
],
"type": "source"
}
2. 후자
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.50.172:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.50.172:8083"
}
],
"type": "source"
}
// 죽인후
{
"name": "kafka-connect-test",
"connector": {
"state": "RUNNING",
"worker_id": "172.31.50.142:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "172.31.50.216:8083"
}
],
"type": "source"
}
테스트 결과 모두 장애에 대한 처리가 가능하므로, 병렬 처리 하기에 자원이 부족하다면 max task 를 주어 조절하도록 한다.
병렬 처리가 가능하나 bulk 방식의 경우 한번에 전체 테이블 조회에 대한 쿼리를 날리기에 부하가 클 것이다.
이를 조절할 수 있는 방법이 필요한데 다음 시간에 찾아 보기로 한다.