Apache Kafka

[Apache Kafka] 카프카 CLI 정리

jngsngjn 2024. 11. 14. 22:25

kafka-topics

토픽을 생성, 삭제, 조회, 설정 등 토픽 관련 작업을 수행할 때 사용

 

(1) 토픽 생성

  • 파티션 개수, 복제 개수 등 다양한 옵션이 있음
  • 따로 지정하지 않으면 브로커에 설정된 기본값으로 생성됨
# 클러스터 정보와 토픽명은 필수값
./kafka-topics.sh --create --bootstrap-server my-kafka:9092 --topic hello-kafka
./kafka-topics.sh --create \\
--bootstrap-server my-kafka:9092 \\
--partitions 10 \\ # 파티션 수
--replication-factor 1 \\ # 복제 단위
--topic hello-kafka2 \\ # 토픽명
--config retention.ms=172800000 # 삭제 주기
  • 복제 단위 = 1 → 해당 데이터가 복제되지 않고 단일 브로커에만 저장된다는 의미

 

  • kafka-topics 명령을 통한 토픽 생성 외에도 토픽을 생성할 수 있는 방법이 한 가지 더 있음
  • 프로듀서나 컨슈머가 존재하지 않는 토픽에 대해 데이터를 요청한 경우, 브로커의 기본 설정이 적용된 토픽이 자동 생성됨
    • server.properties에 auto.create.topics.enable 옵션이 true인 경우 자동 생성
    • 운영 환경에서는 false로 설정하는 것이 일반적임
    • false일 때 위와 같이 데이터를 요청하면 오류가 발생함
  • 토픽 생성 시 kafka-topics 명령을 통해 명시적으로 생성하는 것이 추후 유지보수와 시스템 성능을 위한 더 좋은 방법임

 

(2) 특정 토픽 정보 확인

./kafka-topics.sh --bootstrap-server my-kafka:9092 --topic hello-kafka --describe

 

 

(3) 토픽 목록 확인

./kafka-topics.sh --bootstrap-server my-kafka:9092 --list

 

 

(4) 이미 생성된 토픽의 파티션 개수 늘리기

./kafka-topics.sh --create \\
--bootstrap-server my-kafka:9092 \\
--topic test \\
--partitions 3

./kafka-topics.sh --alter \\
--bootstrap-server my-kafka:9092 \\
--topic test \\
--partitions 4


kafka.configs

브로커, 토픽, 컨슈머 그룹의 구성 정보를 확인하고 변경할 때 사용

 

(1) min.insync.replicas 설정

  • Kafka에서 프로듀서가 메시지를 성공적으로 기록하기 위해 요구되는 최소 복제본의 수
  • 기본값 = 1
  • 데이터의 내구성과 신뢰성을 보장하기 위해 사용
  • 프로듀서 acks 설정이 all인 경우, ISR이 최소 복제본의 수(min.insync.replicas)를 충족하지 않으면 프로듀서가 데이터를 보내는 데 실패함
    • 프로듀서 acks=all : 메시지를 성공적으로 쓰기 위해 리더 파티션과 ISR(In-Sync Replicas) 내 모든 복제본이 데이터를 성공적으로 받았는지 확인함
    • ISR : 현재 리더와 동기화 상태를 유지하고 있는 모든 복제본의 집합
./kafka-configs.sh --bootstrap-server my-kafka:9092 \\
--alter \\
--add-config min.insync.replicas=2 \\
--topic test

 

 

(2) 브로커 설정 정보 조회

  • server.properties 확인하지 않고 조회 가능
./kafka-configs.sh --bootstrap-server my-kafka:9092 \\
--broker 0 \\
--all \\
--describe

./kafka-configs.sh --bootstrap-server my-kafka:9092 \\
--broker 0 \\
--all \\
--describe | grep log.retention

kafka-console-producer

토픽에 데이터를 넣는 테스트 용도로 사용
  • 별다른 설정 없이 보내면 메시지 값만 보내는 것임
    • 이때 메시지 키는 null임 → 라운드 로빈 방식으로 파티션에 분배
  • 키 구분자 설정을 통해 메시지 키도 보낼 수 있음
    • 특정 레코드에 대해서 순서를 보장하고 싶을 때 메시지 키를 사용
      • 메시지 키 : 메시지 값을 구분하는 용도
    • 특정 메시지 키를 가진 레코드는 하나의 파티션에만 할당됨
      • 예를 들어, 메시지 키가 k1인 레코드가 파티션 0번에 적재되었다면, 그 다음부터 적재되는 메시지 키가 k1인 레코드는 항상 파티션 0번에 적재됨
  • Enter 키로 데이터 전송
  • Ctrl + C로 빠져나옴
./kafka-console-producer.sh --bootstrap-server my-kafka:9092 \\
--topic hello.kafka

./kafka-console-producer.sh --bootstrap-server my-kafka:9092 \\
--topic hello.kafka \\
--property "parse.key=true" \\
--property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3

kafka-console-consumer

토픽을 구독하고 데이터를 읽어오는 테스트 용도로 사용
  • 필수 옵션:  --bootstrap-server, --topic
  • --from-beginning 옵션을 통해 토픽에 저장된 가장 처음 데이터부터 출력
  • 기본적으로 메시지 값만 출력하지만 옵션을 통해 메시지 키도 출력 가능
  • --max-messages 옵션을 통해 최대 컨슘 메시지 개수 설정
  • --partition 옵션을 통해 특정 파티션만 컨슘
  • --group 옵션을 사용하면 컨슈머 그룹을 기반으로 동작함
    • 사용하지 않는 기본 console-consumer의 경우에는 컨슈머 그룹을 기반으로 동작하지 않음
    • 컨슈머 그룹 : 특정 오프셋까지 데이터를 읽었다는 것을 커밋시키기 위한 용도
      • 컨슈머 그룹이 없으면 커밋이 이루어지지 않음
      • 컨슈머 그룹으로 토픽의 레코드를 가져가면, 어느 레코드까지 읽었는지에 대한 데이터가 카프카 브로커에 저장됨 (__consumer_offsets 토픽)
./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka --from-beginning

./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka \\
--property print.key=true \\
--property key.separator="-" \\
--from-beginning

./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka --from-beginning \\
--partition 2 # 특정 파티션 번호 (개수가 아님)

# 이 경우 커밋을 하기 때문에 처음부터 모든 데이터를 읽지 않음
./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka --from-beginning \\
--group hello-group

 

가장 처음부터 2개만 읽기


kafka-consumer-groups

특정 컨슈머 그룹의 상태를 조회하거나 오프셋을 리셋할 때 사용

 

(1) 컨슈머 그룹 목록 조회

./kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \\
--list

 

 

(2) 특정 컨슈머 그룹의 상세 정보 조회

./kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \\
--group hello-group --describe

  • describe 옵션을 사용하면 해당 컨슈머 그룹이 어떤 토픽을 대상으로 데이터를 읽어 들인 상세 정보를 조회할 수 있음
  • 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙, 컨슈머 ID, 호스트 등
  • 컨슈머 랙 : 파티션 마지막 레코드의 오프셋과 현재까지 가져간 레코드의 오프셋의 차이 (→ 컨슈머의 지연 정도 확인 가능)

 

 

(3) 오프셋 리셋

  • 컨슈머 그룹 내 모든 컨슈머의 오프셋 상태를 수정
./kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \\
--group hello-group \\
--topic hello-kafka \\
--reset-offsets --to-earliest --execute
  • --to-earliest : 가장 처음 오프셋으로 리셋
  • --to-latest : 가장 마지막 오프셋으로 리셋
  • --to-current : 현 시점 기준 오프셋으로 리셋
  • --to-datetime [시간] : 특정 일시로 오프셋 리셋 (레코드 타임스탬프 기준)
  • --to-offset [숫자] : 특정 오프셋으로 리셋
  • --shift-by [+/- 숫자] : 현재 컨슈머 오프셋에서 앞/뒤로 옮겨 리셋

 

  • current-offset이 5였던 컨슈머 그룹의 오프셋을 0으로 변경
  • 이후 describe로 확인 시 컨슈머 랙이 5로 바뀜
  • console-consumer로 읽은 결과 처음부터 5개를 다시 읽어들임
  • 다시 describe로 확인하면 current-offset이 5가 됨

기타 CLI

(1) kafka-producer-perf-test

  • 프로듀서 성능 테스트 - 메시지 생성 및 전송 성능 측정
./kafka-producer-perf-test.sh \\

# 브로커 주소 지정
--producer-props bootstrap.servers=my-kafka:9092 \\
--topic hello-kafka \\

# 총 10개의 메시지 생성하여 전송
--num-records 10 \\

# 초당 메시지 전송 속도 (-1로 설정하면 가능한 최대 속도로 전송)
--throughput 1 \\

# 각 메시지의 크기를 바이트 단위로 설정
--record-size 100 \\

# 전송 작업이 끝난 후 성능 지표 출력
--print-metric

# 실행 결과
7 records sent, 1.4 records/sec (0.00 MB/sec), 15.0 ms avg latency, 76.0 ms max latency.
10 records sent, 1.097695 records/sec (0.00 MB/sec), 11.80 ms avg latency, 76.00 ms max latency, 5 ms 50th, 76 ms 95th, 76 ms 99th, 76 ms 99.9th.

 

 

(2) kafka-consumer-perf-test

  • 컨슈머 성능 테스트 - 카프카 브로커와 컨슈머 간의 네트워크를 체크할 때 사용 가능
./kafka-consumer-perf-test.sh \\
--bootstrap-server my-kafka:9092 \\
--topic hello-kafka \\

# 컨슈머가 소비할 메시지의 총 개수
--messages 10 \\

# 테스트 후 통계 정보 출력
--show-detailed-stats

 

 

(3) kafka-reassign-partitions

  • 파티션을 재할당하거나 리더를 수동으로 조정할 때 사용
  • 리더 파티션과 팔로워 파티션의 위치를 조정할 수 있음
  • 브로커 옵션 중 auto.leader.rebalance.enable 옵션의 기본값은 true → 클러스터 단위에서 리더 파티션을 자동 리밸런싱하도록 도와줌
    • 브로커의 백그라운드 쓰레드가 일정한 간격으로 리더의 위치를 파악하고 필요 시 위치를 조정함
  • 리더 파티션을 통해서만 읽기/쓰기 작업이 이루어지기 때문에 리더 파티션이 브로커에 알맞게 분배되어 있는 것이 중요함
./kafka-reassign-partitions \\
--zookeeper my-kafka:2181 \\
--generate --reassignment-json-file [파일 경로] --execute

 

json file 예시

{
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 0,
      "replicas": [1, 2],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "my-topic",
      "partition": 1,
      "replicas": [2, 3],
      "log_dirs": ["any", "any"]
    }
  ],
  "version": 1
}

 

 

(4) kafka-delete-record

  • 특정 토픽 파티션의 레코드를 논리적으로 삭제함
  • 파티션의 특정 오프셋까지 데이터를 잘라내기하는 방식으로 작동함
  • 논리적으로 삭제된 레코드는 이후 컨슈머에게 더 이상 제공되지 않으며, 실제 파일 시스템에서는 브로커의 log.retention 설정에 의해 제거됨
./kafka-delete-records --bootstrap-server my-kafka:9092 \\
--offset-json-file [JSON 파일 경로]

 

 

json file 예시

  • my-topic의 0번 파티션에서 오프셋 100 이전의 데이터 삭제
{
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 0,
      "offset": 100
    }
  ]
}

 

 

(5) kafka-dump-log

  • 로그 세그먼트 파일을 읽어 내부 데이터를 분석 및 디버깅하는 데 사용
  • 브로커가 저장하는 로그 세그먼트 파일(log, index, timeindex)의 내용을 출력하여, 특정 파티션의 메시지나 메타데이터를 확인하는 데 유용함
  • —-files : 읽을 카프카 로그 세그먼트 파일 경로 지정
  • —-deep-iteration : 상세 출력 (모든 메시지 및 레코드 정보)
  • —-offset : 특정 오프셋부터 시작하여 데이터를 읽음
  • —-max-message-size : 출력한 최대 메시지 크기 제한 (기본값 : 5MB)
  • —-print-data-log : 메시지의 실제 데이터(Key, Value)를 함께 출력
./kafka-dump-log.sh \\
--files ../data/hello-kafka-0/00000000000000000000.log \\
--deep-iteration

 

 

(6) kafka-broker-api-versions

  • 각 브로커의 API 호환성 정보 확인
./kafka-broker-api-versions --bootstrap-server kafka1:9092

 

(7) kafka-run-class

  • 버전 정보 확인
./kafka-run-class kafka.Kafka --version
=> 7.7.1-ccs