-
[Apache Kafka] 카프카의 멱등성과 트랜잭션Apache Kafka 2024. 11. 17. 17:06
멱등성 프로듀서
1. 기본 개념
- 멱등성 : 여러 번 연산을 수행해도 동일한 결과를 나타내는 것
- 멱등성 프로듀서가 동일 데이터를 여러 번 전송해도 클러스터에 단 한 번만 저장됨
- 기본 프로듀서 동작 : 적어도 한 번 전달(at least once delivery) → 중복 전송 가능
- 멱등성 프로듀서 : 정확히 한 번 전달(exactly once delivery)
- enable.idempotence 옵션을 true로 설정하여 멱등성 프로듀서로 사용 가능
- 기본값 : false (2.5.0) / true (3 이상)
- 멱등성 프로듀서 = acks 옵션 all(-1)
2. 멱등성 프로듀서의 동작
- 멱등성 프로듀서는 데이터를 브로커로 전달할 때 PID와 SID를 함께 전달함 → 이 값은 브로커 메모리에서 관리됨
- PID (Producer Unique ID) : 프로듀서의 고유 ID
- SID (Sequence ID) : 프로듀서가 보낸 메시지의 순서를 나타내는 번호
- 위 두 개의 값을 통해 중복을 방지함
3. 멱등성 프로듀서의 한계
- 멱등성 프로듀서는 동일 세션에서만 정확히 한 번 전달을 보장함
- 멱등성 프로듀서 애플리케이션에 이슈가 발생하여 종료되었다가 재시작하면 PID가 달라짐
- 이후 동일 데이터를 보내더라도 PID가 달라졌기 때문에 브로커는 다른 데이터라고 판단 → 중복 데이터 발생!
4. 멱등성 프로듀서 옵션
- enable.idempotence를 true로 설정하면 일부 프로듀서 옵션들이 강제로 설정됨
- retries : Integer.MAX_VALUE (전송 실패 시 재시도 횟수 무제한)
- acks : all(-1)
- 정확히 한 번 전달(exactly once delivery)을 달성하기 위한 설정
5. OutOfOrderSequenceException
- 브로커가 예상한 시퀀스 넘버와 다른 번호의 데이터가 들어왔을 때 발생함
- 순서가 중요한 데이터를 전송하는 프로듀서의 경우 해당 Exception이 발생했을 경우의 대응 방안을 고려해야 함
트랜잭션 프로듀서와 컨슈머
1. 트랜잭션 프로듀서
- 다수의 파티션에 데이터를 저장할 경우 그 모든 데이터에 대해 동일한 원자성을 만족시키기 위해 사용됨
- 다수의 데이터를 동일 트랜잭션으로 묶음 → 전체 데이터를 처리하거나 처리하지 않도록 하는 것
- 트랜잭션 프로듀서는 트랜잭션의 시작과 끝을 알리기 위한 트랜잭션 레코드(Commit)를 한 개 더 보냄
2. 트랜잭션 컨슈머
- 트랜잭션 컨슈머는 커밋 전까지의 레코드는 가져가지 않고 대기함
- 트랜잭션 프로듀서가 트랜잭션 레코드를 보내면 가져감
- 트랜잭션 레코드 : 실질적인 데이터는 아니며 트랜잭션이 끝난 상태를 표시하는 정보만 갖고 있음
3. 트랜잭션 설정
(1) 트랜잭션 프로듀서
- 트랜잭션 프로듀서로 동작하기 위해서 트랜잭션 ID를 설정해야 함
- 프로듀서별로 고유한 ID를 사용해야 함
- init → begin → send … → commit
public class TransactionProducer { private static final String TOPIC_NAME = "test"; private static final String MSG_VALUE = "testMessage"; private static final String BOOTSTRAP_SERVERS = "my-kafka:9092"; public static void main(String[] args) { Properties configs = new Properties(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID()); KafkaProducer<String, String> producer = new KafkaProducer<>(configs); ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, MSG_VALUE); producer.initTransactions(); producer.beginTransaction(); producer.send(record); producer.commitTransaction(); producer.close(); } }
(2) 트랜잭션 컨슈머
- 트랜잭션 컨슈머로 동작하기 위해 isolation.level 옵션을 read_committed로 설정함
- 기본값 : read_uncommitted
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
'Apache Kafka' 카테고리의 다른 글
[Apache Kafka] 카프카의 데이터 복제 방식 (0) 2024.11.21 [Apache Kafka] 카프카 KRaft에 대해서 (0) 2024.11.18 [Apache Kafka] 카프카 컨슈머 파헤치기 (0) 2024.11.17 [Apache Kafka] 카프카 프로듀서 파헤치기 (0) 2024.11.16 [Apache Kafka] 카프카 CLI 정리 (0) 2024.11.14