study/blog

Ingest Data from Databases into Kafka with Change Data Capture (CDC); Confluent

22promenade 2024. 4. 29. 15:54

개요

Change Data Capture (CDC) 는 데이터베이스에 이미 존재하는 것과 더불어 일어난 모든 변경점에 대해서 지속적으로 포착할 수 있게 해준다. CDC 에는 query-based , log-based 두 가지 방식이 존재한다. 각각의 장/단점, 한계, 차이에 대해 인지하고 선택해야 한다.


query-based

query-based 는 쿼리를 데이터베이스에 날려서 증분을 추출하는 방식을 의미한다. 일반적인 쿼리를 기반으로 하기 때문에 데이터베이스 내부에 접근하지 않아도 사용할 수 있는 방법이다. 당연하지만 데이터 증분을 위해 쿼리에 타임스탬프, 증가하는 아이디 값과 같은 증분을 정의해주는 필드를 포함해야 한다. 실제 증분이 이루어지는 과정은 아래와 같다.
 
query-based CDC 동작 과정

좌측이 데이터베이스, 우측이 카프카

1. 2개의 행이 데이터베이스에 존재한다고 가정한다. 2개의 긴 막대기가 행을 의미한다.
2. 카프카로 이 두 개의 행을 얻기 위해서 첫 쿼리를 날린다. 여기서 초기 타임스탬프 조건값은 0이 될 것이다.
3. 카프카는 전체 2개의 행을 얻고 그 값을 저장한다.

4. 1개의 행이 데이터베이스에 추가된다.
5. 카프카는 또다른 쿼리를 날린다. 이때 where 절 조건의 타임스탬프 값을 이전 쿼리보다 증가시킨다.
6. 카프카는 추가된 1개의 행만을 저장한다.
 
query-based 방식은 사용방식이 간단하며 JDBC 가 호환되는 대부분의 데이터베이스에 사용가능하다. 다만 이 방식에는 두 가지 단점이 있다.
첫 번째로, 데이터베이스에 쿼리를 날려서 폴링하는 방식이기 때문에 데이터베이스에 부하가 가해질 수 있다.
두 번째로, 히스토리 추적이 불가능하다. 쿼리를 날렸을 때의 데이터베이스의 상태만 반영될 수 있다. 따라서 폴링 구간의 변화를 감지할 수 없다.
 
Kafka Connect JDBC Source Connector from Confluent 가 이 방식으로 구동된다. (JDBC가 호환되는 데이터베이스에 사용가능)


log-based

트랜잭션 로그

log-based는 데이터베이스에 존재하는 바이너리 파일인 트랜잭션 로그를 이용하는 방식을 의미한다. 삽입, 수정, 제거와 같이 데이터베이스에 변화(mutation)를 주는 행위는 트랜잭션 로그에 기록되게 된다. 트랜잭션 로그는 데이터베이스 구현에 따라 redo log, bin log 등으로 부른다. MySQL, PostgreSQL, Oracle 각각 모두 다른 로그 파일 형태를 가진다. log-based cdc 동작 과정은 아래와 같다.
 
log-based cdc 동작 과정

1. 두 개의 행이 존재하고 두 개의 행에 대한 트랜잭션 로그가 있다고 가정하자.

2. 커넥터는 트랜잭션 로그를 구독한다. 커넥터와 데이터베이스는 네트워크로 연결되며 커넥터는 데이터베이스 트랜잭션 로그의 변경점을 인지할 수 있다.

 

3. 데이터베이스에 변경사항이 발생한다.

4. 카프카는 데이터베이스 변경사항을 반영한다. 트랜잭션 로그는 데이터베이스마다 다른 형태를 가지고 있기 때문에 데이터베이스 의존적인 내부 포멧을 형식 메시지 형태(Avro or Protobuf or JSON)로 해석하는 과정을 걸쳐 토픽에 저장한다.
 
log-based 방식은 query-based 와 다르게 모든 변화 히스토리를 파악할 수 있다. log-based 방식의 기반이 되는 폴링은 사이 구간이 존재하기 때문에 쿼리가 주는 상태만 반영할 수 있다. 예를 들어 10초 마다 쿼리를 데이터베이스에 폴링한다고 가정할 때 10초 동안 어떤 한 행이 20번 업데이트 되었다 할지라도 쿼리를 보냈을 때의 상태만 알 수 있다. 반면에 log-based 방식은 트랜잭션 로그를 기반으로 하기 때문에 모든 변화 히스토리를 반영할 수 있다.
 
Confluent Oracle CDC Connector, Debezium Project Connect 가 이 방식으로 구동된다.


출처

https://developer.confluent.io/courses/data-pipelines/kafka-data-ingestion-with-cdc/?session_ref=https://developer.confluent.io/?build=pipelines