Apache Kafka
[Apache Kafka] 카프카 CLI 정리
jngsngjn
2024. 11. 14. 22:25
kafka-topics
토픽을 생성, 삭제, 조회, 설정 등 토픽 관련 작업을 수행할 때 사용
(1) 토픽 생성
- 파티션 개수, 복제 개수 등 다양한 옵션이 있음
- 따로 지정하지 않으면 브로커에 설정된 기본값으로 생성됨
# 클러스터 정보와 토픽명은 필수값
./kafka-topics.sh --create --bootstrap-server my-kafka:9092 --topic hello-kafka
./kafka-topics.sh --create \\
--bootstrap-server my-kafka:9092 \\
--partitions 10 \\ # 파티션 수
--replication-factor 1 \\ # 복제 단위
--topic hello-kafka2 \\ # 토픽명
--config retention.ms=172800000 # 삭제 주기
- 복제 단위 = 1 → 해당 데이터가 복제되지 않고 단일 브로커에만 저장된다는 의미
- kafka-topics 명령을 통한 토픽 생성 외에도 토픽을 생성할 수 있는 방법이 한 가지 더 있음
- 프로듀서나 컨슈머가 존재하지 않는 토픽에 대해 데이터를 요청한 경우, 브로커의 기본 설정이 적용된 토픽이 자동 생성됨
- server.properties에 auto.create.topics.enable 옵션이 true인 경우 자동 생성
- 운영 환경에서는 false로 설정하는 것이 일반적임
- false일 때 위와 같이 데이터를 요청하면 오류가 발생함
- 토픽 생성 시 kafka-topics 명령을 통해 명시적으로 생성하는 것이 추후 유지보수와 시스템 성능을 위한 더 좋은 방법임
(2) 특정 토픽 정보 확인
./kafka-topics.sh --bootstrap-server my-kafka:9092 --topic hello-kafka --describe
(3) 토픽 목록 확인
./kafka-topics.sh --bootstrap-server my-kafka:9092 --list
(4) 이미 생성된 토픽의 파티션 개수 늘리기
./kafka-topics.sh --create \\
--bootstrap-server my-kafka:9092 \\
--topic test \\
--partitions 3
./kafka-topics.sh --alter \\
--bootstrap-server my-kafka:9092 \\
--topic test \\
--partitions 4
kafka.configs
브로커, 토픽, 컨슈머 그룹의 구성 정보를 확인하고 변경할 때 사용
(1) min.insync.replicas 설정
- Kafka에서 프로듀서가 메시지를 성공적으로 기록하기 위해 요구되는 최소 복제본의 수
- 기본값 = 1
- 데이터의 내구성과 신뢰성을 보장하기 위해 사용
- 프로듀서 acks 설정이 all인 경우, ISR이 최소 복제본의 수(min.insync.replicas)를 충족하지 않으면 프로듀서가 데이터를 보내는 데 실패함
- 프로듀서 acks=all : 메시지를 성공적으로 쓰기 위해 리더 파티션과 ISR(In-Sync Replicas) 내 모든 복제본이 데이터를 성공적으로 받았는지 확인함
- ISR : 현재 리더와 동기화 상태를 유지하고 있는 모든 복제본의 집합
./kafka-configs.sh --bootstrap-server my-kafka:9092 \\
--alter \\
--add-config min.insync.replicas=2 \\
--topic test
(2) 브로커 설정 정보 조회
- server.properties 확인하지 않고 조회 가능
./kafka-configs.sh --bootstrap-server my-kafka:9092 \\
--broker 0 \\
--all \\
--describe
./kafka-configs.sh --bootstrap-server my-kafka:9092 \\
--broker 0 \\
--all \\
--describe | grep log.retention
kafka-console-producer
토픽에 데이터를 넣는 테스트 용도로 사용
- 별다른 설정 없이 보내면 메시지 값만 보내는 것임
- 이때 메시지 키는 null임 → 라운드 로빈 방식으로 파티션에 분배
- 키 구분자 설정을 통해 메시지 키도 보낼 수 있음
- 특정 레코드에 대해서 순서를 보장하고 싶을 때 메시지 키를 사용
- 메시지 키 : 메시지 값을 구분하는 용도
- 특정 메시지 키를 가진 레코드는 하나의 파티션에만 할당됨
- 예를 들어, 메시지 키가 k1인 레코드가 파티션 0번에 적재되었다면, 그 다음부터 적재되는 메시지 키가 k1인 레코드는 항상 파티션 0번에 적재됨
- 특정 레코드에 대해서 순서를 보장하고 싶을 때 메시지 키를 사용
- Enter 키로 데이터 전송
- Ctrl + C로 빠져나옴
./kafka-console-producer.sh --bootstrap-server my-kafka:9092 \\
--topic hello.kafka
./kafka-console-producer.sh --bootstrap-server my-kafka:9092 \\
--topic hello.kafka \\
--property "parse.key=true" \\
--property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3
kafka-console-consumer
토픽을 구독하고 데이터를 읽어오는 테스트 용도로 사용
- 필수 옵션: --bootstrap-server, --topic
- --from-beginning 옵션을 통해 토픽에 저장된 가장 처음 데이터부터 출력
- 기본적으로 메시지 값만 출력하지만 옵션을 통해 메시지 키도 출력 가능
- --max-messages 옵션을 통해 최대 컨슘 메시지 개수 설정
- --partition 옵션을 통해 특정 파티션만 컨슘
- --group 옵션을 사용하면 컨슈머 그룹을 기반으로 동작함
- 사용하지 않는 기본 console-consumer의 경우에는 컨슈머 그룹을 기반으로 동작하지 않음
- 컨슈머 그룹 : 특정 오프셋까지 데이터를 읽었다는 것을 커밋시키기 위한 용도
- 컨슈머 그룹이 없으면 커밋이 이루어지지 않음
- 컨슈머 그룹으로 토픽의 레코드를 가져가면, 어느 레코드까지 읽었는지에 대한 데이터가 카프카 브로커에 저장됨 (__consumer_offsets 토픽)
./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka --from-beginning
./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka \\
--property print.key=true \\
--property key.separator="-" \\
--from-beginning
./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka --from-beginning \\
--partition 2 # 특정 파티션 번호 (개수가 아님)
# 이 경우 커밋을 하기 때문에 처음부터 모든 데이터를 읽지 않음
./kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \\
--topic hello-kafka --from-beginning \\
--group hello-group
가장 처음부터 2개만 읽기
kafka-consumer-groups
특정 컨슈머 그룹의 상태를 조회하거나 오프셋을 리셋할 때 사용
(1) 컨슈머 그룹 목록 조회
./kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \\
--list
(2) 특정 컨슈머 그룹의 상세 정보 조회
./kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \\
--group hello-group --describe
- describe 옵션을 사용하면 해당 컨슈머 그룹이 어떤 토픽을 대상으로 데이터를 읽어 들인 상세 정보를 조회할 수 있음
- 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙, 컨슈머 ID, 호스트 등
- 컨슈머 랙 : 파티션 마지막 레코드의 오프셋과 현재까지 가져간 레코드의 오프셋의 차이 (→ 컨슈머의 지연 정도 확인 가능)
(3) 오프셋 리셋
- 컨슈머 그룹 내 모든 컨슈머의 오프셋 상태를 수정
./kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \\
--group hello-group \\
--topic hello-kafka \\
--reset-offsets --to-earliest --execute
- --to-earliest : 가장 처음 오프셋으로 리셋
- --to-latest : 가장 마지막 오프셋으로 리셋
- --to-current : 현 시점 기준 오프셋으로 리셋
- --to-datetime [시간] : 특정 일시로 오프셋 리셋 (레코드 타임스탬프 기준)
- --to-offset [숫자] : 특정 오프셋으로 리셋
- --shift-by [+/- 숫자] : 현재 컨슈머 오프셋에서 앞/뒤로 옮겨 리셋
- current-offset이 5였던 컨슈머 그룹의 오프셋을 0으로 변경
- 이후 describe로 확인 시 컨슈머 랙이 5로 바뀜
- console-consumer로 읽은 결과 처음부터 5개를 다시 읽어들임
- 다시 describe로 확인하면 current-offset이 5가 됨
기타 CLI
(1) kafka-producer-perf-test
- 프로듀서 성능 테스트 - 메시지 생성 및 전송 성능 측정
./kafka-producer-perf-test.sh \\
# 브로커 주소 지정
--producer-props bootstrap.servers=my-kafka:9092 \\
--topic hello-kafka \\
# 총 10개의 메시지 생성하여 전송
--num-records 10 \\
# 초당 메시지 전송 속도 (-1로 설정하면 가능한 최대 속도로 전송)
--throughput 1 \\
# 각 메시지의 크기를 바이트 단위로 설정
--record-size 100 \\
# 전송 작업이 끝난 후 성능 지표 출력
--print-metric
# 실행 결과
7 records sent, 1.4 records/sec (0.00 MB/sec), 15.0 ms avg latency, 76.0 ms max latency.
10 records sent, 1.097695 records/sec (0.00 MB/sec), 11.80 ms avg latency, 76.00 ms max latency, 5 ms 50th, 76 ms 95th, 76 ms 99th, 76 ms 99.9th.
(2) kafka-consumer-perf-test
- 컨슈머 성능 테스트 - 카프카 브로커와 컨슈머 간의 네트워크를 체크할 때 사용 가능
./kafka-consumer-perf-test.sh \\
--bootstrap-server my-kafka:9092 \\
--topic hello-kafka \\
# 컨슈머가 소비할 메시지의 총 개수
--messages 10 \\
# 테스트 후 통계 정보 출력
--show-detailed-stats
(3) kafka-reassign-partitions
- 파티션을 재할당하거나 리더를 수동으로 조정할 때 사용
- 리더 파티션과 팔로워 파티션의 위치를 조정할 수 있음
- 브로커 옵션 중 auto.leader.rebalance.enable 옵션의 기본값은 true → 클러스터 단위에서 리더 파티션을 자동 리밸런싱하도록 도와줌
- 브로커의 백그라운드 쓰레드가 일정한 간격으로 리더의 위치를 파악하고 필요 시 위치를 조정함
- 리더 파티션을 통해서만 읽기/쓰기 작업이 이루어지기 때문에 리더 파티션이 브로커에 알맞게 분배되어 있는 것이 중요함
./kafka-reassign-partitions \\
--zookeeper my-kafka:2181 \\
--generate --reassignment-json-file [파일 경로] --execute
json file 예시
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"replicas": [1, 2],
"log_dirs": ["any", "any"]
},
{
"topic": "my-topic",
"partition": 1,
"replicas": [2, 3],
"log_dirs": ["any", "any"]
}
],
"version": 1
}
(4) kafka-delete-record
- 특정 토픽 파티션의 레코드를 논리적으로 삭제함
- 파티션의 특정 오프셋까지 데이터를 잘라내기하는 방식으로 작동함
- 논리적으로 삭제된 레코드는 이후 컨슈머에게 더 이상 제공되지 않으며, 실제 파일 시스템에서는 브로커의 log.retention 설정에 의해 제거됨
./kafka-delete-records --bootstrap-server my-kafka:9092 \\
--offset-json-file [JSON 파일 경로]
json file 예시
- my-topic의 0번 파티션에서 오프셋 100 이전의 데이터 삭제
{
"partitions": [
{
"topic": "my-topic",
"partition": 0,
"offset": 100
}
]
}
(5) kafka-dump-log
- 로그 세그먼트 파일을 읽어 내부 데이터를 분석 및 디버깅하는 데 사용
- 브로커가 저장하는 로그 세그먼트 파일(log, index, timeindex)의 내용을 출력하여, 특정 파티션의 메시지나 메타데이터를 확인하는 데 유용함
- —-files : 읽을 카프카 로그 세그먼트 파일 경로 지정
- —-deep-iteration : 상세 출력 (모든 메시지 및 레코드 정보)
- —-offset : 특정 오프셋부터 시작하여 데이터를 읽음
- —-max-message-size : 출력한 최대 메시지 크기 제한 (기본값 : 5MB)
- —-print-data-log : 메시지의 실제 데이터(Key, Value)를 함께 출력
./kafka-dump-log.sh \\
--files ../data/hello-kafka-0/00000000000000000000.log \\
--deep-iteration
(6) kafka-broker-api-versions
- 각 브로커의 API 호환성 정보 확인
./kafka-broker-api-versions --bootstrap-server kafka1:9092
(7) kafka-run-class
- 버전 정보 확인
./kafka-run-class kafka.Kafka --version
=> 7.7.1-ccs