Infra Trouble Shooting - 2
kafka operator replica Set 2 Error
by HOON
0
Last updated on March 1, 2025, 1:32 p.m.
안녕하세요.
오늘은 팀프로젝트 진행 중 마주한 에러에 대해서 작성해보고자 합니다.
우선 저희 팀이 사용 할 스택중 Kafka가 포함 되어있습니다.
Kafka의 작동방식에 대해서 아주 간략하게 설명을 드리자면,
먼저 Producer, Broker, Consumer 로 나뉘고 여기에서 Pub/Sub 구조로 중간 매개체인 Broker를 중심으로 메세지를 보내고, 메세지를 가져오는 형식입니다.
서비스가 여러개로 나눠진다면 어떤 서비스가 어떤 데이터를 갖고와야할지 복잡해지겠죠??
따라서 중간에 Kafka 서버를 두고 실제 그 데이터가 필요한 Consumer들은 Broker Server에서 가져오기만 하면 되는 구조입니다.
문제
저는 팀 내, Infra 및 배포를 담당하고있습니다.
이에 제가 마주한 에러에 대해서 설명 드리겠습니다.
에러 현상은 다음과 같습니다.
Kafka 클러스터 구축 후 로컬 테스트 진행 시, Topic 생성, Broker 서버로 전송 까지는 가능했으나, Consumer가 작성된 메세지를 읽지 못하는 문제 발생
에러 해결에 앞서 우선 제 환경 부터 설명드리겠습니다.
현재 구조
우선 저는 EC2 여러개로 Public Subnet, Private Subnet을 구분하여 k8s 클러스터를 구축 하였습니다.(EKS 사용x)
물론 Master Node와 k8s-Kafka Node는 Private Subnet에 위치합니다.
Kafka는 Helm을 사용하여 bitnami/kafka 으로 설치하였습니다.
또한 EC2 자체 로컬 스토리지를 사용하기위해 StorageClass를 정의해서 사용중입니다.
helm install my-kafka -n kafka-namespace oci://registry-1.docker.io/bitnamicharts/kafka
ubuntu@k8s-master:~$ k get sc -o wide
NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
local-path rancher.io/local-path Retain WaitForFirstConsumer false 2d11h
local-path-kafka rancher.io/local-path Retain WaitForFirstConsumer false 2d8h
Helm으로 설치해줬으니 당연히 values.yaml을 작성해서 커스텀 해야겠죠??
우선 resource가 넉넉하지 않기때문에 replica를 2로 두고, StorageClass만 작성할게요.
#values.yaml
controller:
replicaCount: 2
enableKraft: true
podAnnotations:
sidecar.istio.io/inject: "true"
persistence:
enabled: true
storageClass: local-path-kafka
size: 10Gi
values.yaml을 작성했으니 당연히 install,upgrade 명령어 시 -f values.yaml으로 파일을 명시해주세요.
helm install my-kafka -n kafka-namespace oci://registry-1.docker.io/bitnamicharts/kafka -f values.yaml
설치가 정상적으로 되고 실행까지 확인했다면 이제 카프카를 테스트 해볼거에요.
ubuntu@k8s-master:~/kafka/kafka-helm$ k get pods
NAME READY STATUS RESTARTS AGE
my-kafka-controller-0 2/2 Running 2 (7m10s ago) 4h1m
my-kafka-controller-1 2/2 Running 2 (7m10s ago) 4h2m
Tip💡 제가 앞서 말씀드린 카프카의 구조는 Broker 서버가 있어야하는데 여기에선 안보입니다!.
현재 저는 Zookeeper를 사용하는게 아닌, Kraft 모드를 사용하고, 브로커 서버는 controller에 내장되어있어요. controller가 두 가지 역할을 하는 셈 인거죠!
자, 그럼 우선 테스트환경이니 sh로 topic 확인 및 pub/sub을 해볼게요.
우선 가동중인 컨트롤러에서 보겠습니다.
# 1. 토픽 목록
ubuntu@k8s-master:~/kafka/kafka-helm$ k exec -it my-kafka-controller-0 -- kafka-topics.sh --bootstrap-server my-kafka-controller-0.my-kafka-controller-headless.kafka-namespace.svc.cluster.local:9092 --list
__consumer_offsets
test-topic
test-topic2
your-topic
# 2. "test-topic2"가 있는지 확인 후, describe
ubuntu@k8s-master:~/kafka/kafka-helm$ k exec -it my-kafka-controller-0 -- kafka-topics.sh --bootstrap-server my-kafka-controller-0.my-kafka-controller-headless.kafka-namespace.svc.cluster.local:9092 \
--describe --topic test-topic2
Topic: test-topic2 TopicId: L0cxPAOWTAKBe320mrp_8g PartitionCount: 1 ReplicationFactor: 2 Configs:
Topic: test-topic2 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1 Elr: LastKnownElr:
# 3. test-topic2에 메세지 전송
# 이 단계에서는 > 로 표시되면 메세지 입력 후 엔터를 누르고 Ctrl+C로 종료하면 topic으로 메세지가 전송됨.
ubuntu@k8s-master:~/kafka/kafka-helm$ k exec -it my-kafka-controller-0 -- kafka-console-producer.sh --broker-list 10.111.164.71:9092 --topic test-topic2
> hi
> Kafka
> Working
>^Ccommand terminated with exit code 130
# 4. Consumer에서 Topic에 있는 메세지 확인
ubuntu@k8s-master:~/kafka/kafka-helm$ k exec -it my-kafka-controller-0 -- kafka-console-producer.sh \
--broker-list my-kafka-controller-0.my-kafka-controller-headless.kafka-namespace.svc.cluster.local:9092 \
--topic test-topic2
단계별로 설명을 드리겠습니다.
- 먼저 저는 test-topic2 라는 이름으로 토픽을 생성 한 상태입니다. 이미 생성된 다른 토픽들도 보이네요.
- 그럼 실제 test-topic2이 정상적으로 리더선출이 되었는지 확인합니다.(아까 제가 broker도 같이 구성된다고 했죠? 따라서 둘 중 하나는 리더가 되어야합니다.) Leader: 1로 설정된걸 보니 리더는 1번으로 제대로 정해졌네요. 그럼 여기까진 문제가 없죠?
- 실제로 Broker 서버로 메세지를 전송 해보겠습니다. (아래 로그를 참고해주세요.)
이부분은 로그를 확인하면 정확하게 알 수 있어요! - 자 그럼 대망의 Consumer에서 현재 test-topic2에 저장된 메세지를 가져와볼게요.
10분이 지났어도 메세지는 못가져왔어요…
#Broker 서버 내 Topic의 메세지 확인
I have no name!@my-kafka-controller-0:/bitnami/kafka/data/test-topic2-0$ kafka-dump-log.sh --files /bitnami/kafka/data/test-topic2-0/00000000000000000000.log --print-data-log
| offset: 49 CreateTime: 1740655825996 keySize: -1 valueSize: 2 sequence: 0 headerKeys: [] payload: hi
baseOffset: 50 lastOffset: 50 count: 1 baseSequence: 1 lastSequence: 1 producerId: 13000 producerEpoch: 0 partitionLeaderEpoch: 64 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 3259 CreateTime: 1740655828722 size: 73 magic: 2 compresscodec: none crc: 1067879559 isvalid: true
| offset: 50 CreateTime: 1740655828722 keySize: -1 valueSize: 5 sequence: 1 headerKeys: [] payload: Kafka
baseOffset: 51 lastOffset: 51 count: 1 baseSequence: 2 lastSequence: 2 producerId: 13000 producerEpoch: 0 partitionLeaderEpoch: 64 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 3332 CreateTime: 1740655831573 size: 75 magic: 2 compresscodec: none crc: 4119911206 isvalid: true
| offset: 51 CreateTime: 1740655831573 keySize: -1 valueSize: 7 sequence: 2 headerKeys: [] payload: Working
Trouble Shooting
이상합니다. 분명 Topic을 생성했고, 메세지도 맞게 보냈는데 Consumer에서 못읽어요.
로그를 한번 살펴보겠습니다.
CreateTopics result(s): CreatableTopic(name='__consumer_offsets', replicationFactor=3)...
INVALID_REPLICATION_FACTOR (Unable to replicate the partition 3 time(s):
The target replication factor of 3 cannot be reached because only 2 broker(s) are registered.)
수많은 로그들 사이에 이 메세지가 반복적으로 출력되고있었어요.
로그를 좀 해석해보면,
Kafka에서 Consumer Group(offset 저장)을 사용하려면, 내부적으로 __consumer_offsets
토픽이 필요합니다. 그런데 KRaft 모드 클러스터가 2개 브로커만 있어서, ReplicationFactor=3인 __consumer_offsets
를 만들 수 없어 계속 실패하고 있습니다.
Broker Replica=3
으로 설정하고싶지만, 서버의 한정적인 리소스때문에 우선 2개로 해결 해볼게요.
values.yaml을 수정해서 _ReplicationFactor=3
으로 수정하면 될 것 같아요.
controller:
replicaCount: 2
enableKraft: true
podAnnotations:
sidecar.istio.io/inject: "true"
persistence:
enabled: true
storageClass: local-path-kafka
size: 10Gi
extraEnvVars:
- name: KAFKA_CFG_GROUP_COORDINATOR_NEW_ENABLE
value: "true"
- name: KAFKA_CFG_GROUP_CONSUMER_MIGRATION_POLICY
value: "UPGRADE"
- name: KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR # 추가
value: "2"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR # 추가
value: "2"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR # 추가
value: "2"
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
zookeeper:
enabled: false
volumePermissions:
enabled: true
listeners:
client:
protocol: PLAINTEXT
port: 9092
interbroker:
protocol: PLAINTEXT
port: 9094
controller:
protocol: PLAINTEXT
port: 9093
# Chart 기본 advertisedListeners 설정은 비워두고, extraConfig와 환경변수 방식으로 통일합니다.
advertisedListeners: ""
overrideListeners: true
추가된 부분을 보면 , KAFKA_CFG_로 설정을 넣었습니다.
I have no name!@my-kafka-controller-0:/$ env | grep CFG
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=2
KAFKA_CFG_GROUP_COORDINATOR_NEW_ENABLE=true
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0
KAFKA_CFG_GROUP_CONSUMER_MIGRATION_POLICY=UPGRADE
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=2
I have no name!@my-kafka-controller-0:/$
실제 controller pod container 내부에서 명령어가 정상적용이 되어있습니다.
그럼 이제 다시 실행해보면??
ubuntu@k8s-master:~/kafka/kafka-helm$ kubectl exec -it my-kafka-controller-0 -- kafka-topics.sh \
--bootstrap-server my-kafka-controller-0.my-kafka-controller-headless.kafka-namespace.svc.cluster.local:9092 \
--list
Defaulted container "kafka" out of: kafka, volume-permissions (init), kafka-init (init)
test-topic
test-topic2
your-topic
ubuntu@k8s-master:~/kafka/kafka-helm$ kubectl exec -it my-kafka-controller-0 -- kafka-console-producer.sh \
--broker-list my-kafka-controller-0.my-kafka-controller-headless.kafka-namespace.svc.cluster.local:9092 \
--topic test-topic2
Defaulted container "kafka" out of: kafka, volume-permissions (init), kafka-init (init)
>hi2...
>^Ccommand terminated with exit code 130
ubuntu@k8s-master:~/kafka/kafka-helm$ kubectl exec -it my-kafka-controller-0 -- kafka-console-consumer.sh \
--bootstrap-server my-kafka-controller-0.my-kafka-controller-headless.kafka-namespace.svc.cluster.local:9092 \
--topic test-topic2 \
--group my-test-group --from-beginning
Defaulted container "kafka" out of: kafka, volume-permissions (init), kafka-init (init)
hi
Kafka
Working
hi2
저희가 이전에 작성했던 내용까지 토픽에 정상 저장되어있는게 보입니다!!
이번 트러블슈팅 내용은 여기까지 하겠습니다.
감사합니다.
Leave a Comment: