ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Apache Kafka] 카프카의 멱등성과 트랜잭션
    Apache Kafka 2024. 11. 17. 17:06

    멱등성 프로듀서

    1. 기본 개념

    • 멱등성 : 여러 번 연산을 수행해도 동일한 결과를 나타내는 것
    • 멱등성 프로듀서가 동일 데이터를 여러 번 전송해도 클러스터에 단 한 번만 저장됨
    • 기본 프로듀서 동작 : 적어도 한 번 전달(at least once delivery) → 중복 전송 가능
    • 멱등성 프로듀서 : 정확히 한 번 전달(exactly once delivery)
    • enable.idempotence 옵션을 true로 설정하여 멱등성 프로듀서로 사용 가능
      • 기본값 : false (2.5.0) / true (3 이상)
    • 멱등성 프로듀서 = acks 옵션 all(-1)

    2. 멱등성 프로듀서의 동작

    • 멱등성 프로듀서는 데이터를 브로커로 전달할 때 PID SID를 함께 전달함 → 이 값은 브로커 메모리에서 관리됨
      • PID (Producer Unique ID) : 프로듀서의 고유 ID
      • SID (Sequence ID) : 프로듀서가 보낸 메시지의 순서를 나타내는 번호
    • 위 두 개의 값을 통해 중복을 방지함

    3. 멱등성 프로듀서의 한계

    • 멱등성 프로듀서는 동일 세션에서만 정확히 한 번 전달을 보장함
    • 멱등성 프로듀서 애플리케이션에 이슈가 발생하여 종료되었다가 재시작하면 PID가 달라짐
    • 이후 동일 데이터를 보내더라도 PID가 달라졌기 때문에 브로커는 다른 데이터라고 판단 → 중복 데이터 발생!

    4. 멱등성 프로듀서 옵션

    • enable.idempotence를 true로 설정하면 일부 프로듀서 옵션들이 강제로 설정됨
      • retries : Integer.MAX_VALUE (전송 실패 시 재시도 횟수 무제한)
      • acks : all(-1)
    • 정확히 한 번 전달(exactly once delivery)을 달성하기 위한 설정

    5. OutOfOrderSequenceException

    • 브로커가 예상한 시퀀스 넘버와 다른 번호의 데이터가 들어왔을 때 발생함
    • 순서가 중요한 데이터를 전송하는 프로듀서의 경우 해당 Exception이 발생했을 경우의 대응 방안을 고려해야 함

    트랜잭션 프로듀서와 컨슈머

    1. 트랜잭션 프로듀서

    • 다수의 파티션에 데이터를 저장할 경우 그 모든 데이터에 대해 동일한 원자성을 만족시키기 위해 사용됨
    • 다수의 데이터를 동일 트랜잭션으로 묶음 → 전체 데이터를 처리하거나 처리하지 않도록 하는 것
    • 트랜잭션 프로듀서는 트랜잭션의 시작과 끝을 알리기 위한 트랜잭션 레코드(Commit)를 한 개 더 보냄

    2. 트랜잭션 컨슈머

    • 트랜잭션 컨슈머는 커밋 전까지의 레코드는 가져가지 않고 대기함
    • 트랜잭션 프로듀서가 트랜잭션 레코드를 보내면 가져감
    • 트랜잭션 레코드 : 실질적인 데이터는 아니며 트랜잭션이 끝난 상태를 표시하는 정보만 갖고 있음

    3. 트랜잭션 설정

    (1) 트랜잭션 프로듀서

    • 트랜잭션 프로듀서로 동작하기 위해서 트랜잭션 ID를 설정해야 함
    • 프로듀서별로 고유한 ID를 사용해야 함
    • init → begin → send … → commit
    public class TransactionProducer {
        private static final String TOPIC_NAME = "test";
        private static final String MSG_VALUE = "testMessage";
        private static final String BOOTSTRAP_SERVERS = "my-kafka:9092";
    
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID());
    
            KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, MSG_VALUE);
            
            producer.initTransactions();
            producer.beginTransaction();
            producer.send(record);
            producer.commitTransaction();
    
            producer.close();
        }
    }

     

     

    (2) 트랜잭션 컨슈머

    • 트랜잭션 컨슈머로 동작하기 위해 isolation.level 옵션을 read_committed로 설정함
      • 기본값 : read_uncommitted
    configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");