리밸런싱이 자주 일어나는 경우 - CommitFailedException에 대해

에러 발생 상황

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Flume Kafka Channel을 이용하고 있는데, 간간히 Kafka Client에서 CommitFailedException이라는 에러가 발생했다.( Flume 1.9.0 버전에서 제공하는 기본 Kafka api 버전은 2.1. 버전이다. )

에러가 발생하는 전후 상황의 로그를 확인해보니, 특정 Consumer에서 리밸런싱이 짧은 시간에 굉장히 여러 번 발생했다. 리밸런싱이 발생하면서 특정 Offset에 커밋이 실패하는 에러들이 굉장히 자주 발생하고 있었다.

 

해당 에러가 왜 발생했는지 알아보기 위해 메세지에서 얘기하고 있는 속성 값들과, Kafka Consumer API에 대해서 자세히 알아보기로 했다.


Kafka Consumer Client Internal

Naver D2 블로그에서 Kafka 내부 동작에 대해 굉장히 정성스럽게 적어주신 글을 참고하였다.

 

Kafka Consumer 구성 요소

KafkaConsumer는 크게 아래 클래스들로 구성이 되어있다.

KafkaConsumer

  • ConsumerNetworkClient
    • KafkaConsumer의 모든 네트워크 통신을 담당
  • SubscriptionState
    • Consume 하고 있는 토픽, 파티션, 오프셋 정보를 추적 관리
  • ConsumerCoordinator
    • 컨슈머 리밸런스, 오프셋 초기화, 오프셋 커밋 담당
  • Fetcher
    • 메시지 Fetch 동작
  • HeartbeatThread
    • Broker에 주기적으로 Consumer의 Heartbeat를 전송

위에서 에러 메시지에 등장하는 속성들과 관련이 있는 ConsumerCoordinator의 동작에 대해 살펴보았다.


리밸런스가 왜 자주 일어났을까

ConsumerCoordinator 내부에는 HeartbeatThread가 포함되어있다.

이 HeartbeatThread는 주기적으로 GroupCoordinator에게 자신이 살아있다는 Heartbeat를 전송하게 된다.

여기서 GroupCoordinator는 컨슈머 그룹의 동작을 관리하는 있는 특정 브로커라고 생각하면 되겠다.

 

리밸런스는 말 그대로 컨슈머 그룹들을 재조정하는 작업을 의미한다.

새로운 컨슈머가 컨슈머 그룹에 추가되거나, 컨슈머 그룹에서 특정한 컨슈머가 제외되었을 때 각 컨슈머들이 소비하고 있는 파티션들에 대한 재조정이 필요한데, 이를 리밸런싱이라 한다.

 

리밸런싱 작업은 다음과 같은 순서로 진행된다.

  1. GroupCoordinator 찾기
  2. Group에 컨슈머 Join 하기
  3. Sync

여기서 2단계 Group에 Join 하는 과정에서, JoinGroup API를 살펴보았다.

JoinGroup API는 다음과 같은 속성 값들을 사용하게 된다.

  1. group.id
  2. session.timeout.ms
  3. max.poll.interval.max

에러 메시지에 나오는 속성 값들이었다.

 

하지만 해당 API는 컨슈머 그룹에 새롭게 컨슈머가 추가되거나, 삭제될 때 사용하는 API인데, 왜 읽어온 메시지 처리가 길어진다고 컨슈머 그룹의 리밸런싱이 유발되는 것일까?

 

해당 궁금증은 아래 도식에서 확인할 수 있었다.

Heatbeat Thread

위 도식은 HeartbeatThread의 동작 원리이다.

 

실제 메시지를 처리하는 poll 작업의 간격을 담당하는 max.poll.interval.ms 를 사용하고, 해당 시간 동안에 메시지를 처리하여 poll이 정상적으로 이루어져야 한다. 그렇지 못할 경우 리밸런싱이 유발된다.

 

Heartbeat를 GroupCoordinator에 송신하는 Heartbeat Thread는 heartbeat.interval.ms라는 속성을 사용한다.

또한 GroupCoordinator가 해당 Consumer Group이 정상이라고 인식하는 session.timeout.ms라는 속성 값을 사용한다.

해당 시간 동안 Heartbeat를 전송받지 못하면 ConsumerGroup이 정상적이지 못하다고 판단하여 리밸런싱이 유발된다.

 

따라서 리밸런싱이 발생했던 이유를 정리해보면

  1. 메시지를 처리하는 process 과정에서 max.poll.interval.ms 만큼 주기적으로 poll을 수행하지만 해당 주기 안에 메시지를 처리하는데 오래 걸리거나 오류가 발생하여 정상적으로 poll 이 수행되지 않을 경우 -> 메시지 처리 과정 상의 오류
  2. GroupCoordinator가 Heartbeat를 session.timeout.ms 동안 정상적으로 수신받지 못하는 경우 -> Consumer 자체가 정상적이지 못한 경우

로 정리할 수 있었다.


해당 에러가 발생한 이유

그렇다면 나의 경우에는 왜 해당 에러가 발생했을까?

 

웹 상에서 여러 비슷한 상황의 해결에 대한 문서들을 찾아볼 수 있었다.

https://medium.com/@mouli2k5/solution-for-kafka-commitfailedexception-c3a6c9baae51

 

Solution for Kafka CommitFailedException

Introduction

medium.com

해당 문서를 읽어보니 Default로 설정되어있는 해당 속성 값들을 메시지 처리에 맞게끔 튜닝을 하면 된다는 솔루션이었다.

  • max.poll.records를 줄이기
  • max.poll.interval.ms( 디폴트 5분 )를 증가시키기 
  • session.timeout.ms( 디폴트 10초 )를 증가시키기

메시지 처리에 드는 시간을 줄이거나, 실제로 리밸런싱을 판단하는 속성 값들의 타임아웃을 증가시켜주는 방식이었다. 하지만 정상적으로 동작했던 컨슈머였기에, 해당 속성 값을 줄이거나 늘리는 게 효과가 있을 것 같지는 않았다.

 

어느 문서에서는 브로커와 컨슈머 그룹 간의 통신이 원활한지 살펴보라는 문서도 있었다.

카프카는 네트워크에 굉장히 민감하니 네트워크 속성 값들을 살펴보았으나, 관련해서도 큰 이슈는 없었다.

 

실제로 해당 에러가 지속적으로 발생했을 때, 전후 애플리케이션 로그를 살펴보니 카프카에 적재되는 로그들 중의 일부가 굉장히 긴 사이즈를 차지하고 있는 것을 살펴볼 수 있었다.

로그 포맷 자체가 잘못된 것이 아니라 특정 타입의 사이즈가 길게 들어오다 보니, 로그 처리 프로세스에서 파싱 하는 데에 시간이 오래 걸렸던 것이 원인이었다.

 

해당 원인을 공유했고, 그 이후로는 해당 에러가 발생하지 않았다.


참조

https://d2.naver.com/helloworld/0974525