Back-End/Kafka

[Kafka] 기초3 - 컨슈머 기본 동작과 예시

유자맛바나나 2022. 8. 23. 03:07

 

 컨슈머의 기본 동작

컨슈머는 프로듀서가 카프카의 토픽으로 전송한 메시지를 가져오는 역할을 한다. 단순히 메시지만 가져오는 것으로 보일 수 있으나, 내부적으로는 컨슈머 그룹, 리밸런싱 등의 동작을 수행한다.

 

컨슈머 그룹(Consumer Group)

  • 컨슈머 그룹은 하나 이상의 컨슈머들이 모여 있는 그룹을 의미하고, 컨슈머는 반드시 컨슈머 그룹에 속하게 된다. 그룹 내의 컨슈머들은 서로의 정보를 공유한다. 예를 들어, 특정 컨슈머에 문제가 생겨 종료된다면, 그룹 내 다른 컨슈머가 대신해 토픽의 파티션을 컨슘한다.
  • 컨슈머 그룹은 파티션의 리더에게 토픽에 저장된 메시지를 가져오기 위한 요청을 보낸다.
  • 필수는 아니지만, 파티션 수와 한 그룹 안의 컨슈머 수는 1:1 매핑이 이상적이다.
  • 단, '파티션 수 < 컨슈머 수' 가 되지 않도록 주의한다. 파티션 수를 초과하는 컨슈머는 메시지를 가져오지 않고 대기 상태로만 존재하기 때문이다.

 

❑ 컨슈머의 주요 옵션

컨슈머를 어떻게 다룰 수 있느냐에 따라 메시지의 중복, 유실 등 여러 가지 상황이 발생하지 않고, 지연이 발생하지 않도록 할 수 있다.

프로듀서 옵션 설명
bootstrap.servers 카프카 클러스터는 클러스터 마스터라는 개념이 없으므로, 클러스터 내 모든 서버가 클라이언트의 요청을 받을 수 있음. 클라이언트가 카프카 클러스터에 처음 연결하기 위한 브로커의 호스트와 포트정보를 나타 냄.
fetch.min.bytes 한 번에 가져올 수 있는 최소 데이터 크기. 지정한 크기보다 작은 경우, 요청에 응답하지 않고 데이터가 누적될 때까지 대기.
group.id 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자. 동일한 그룹 내의 컨슈머 정보는 모두 공유 됨.
heartbeat.interval.ms 하트 비트가 있다는 것은 컨슈머의 상태가 active임을 의미. session.timeout.ms와 밀접한 관계가 있으며, session.timeout.ms보다 낮은 값으로 설정해야 함. 일반적으로 session.timeout.ms의 1/3로 설정.
max.partition.fetch.bytes 파티션 당 가져올 수 있는 최대 크기
session.timeout.ms 이 옵션을 이용해, 컨슈머가 종료된 것인지를 판단한다. 컨슈머는 주기적으로 하트 비트를 보내야 하고, 만약 이 시간 전까지 하트 비트를 보내지 않았다면 해당 컨슈머는 종료된 것으로 간주하고 컨슈머 그룹에서 제외 후, 리밸런싱을 시작 함.
enable.auto.commit 백그라운드로 주기적으로 오프셋을 커밋
auto.offset.reset 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않는 경우에 다음 옵션으로 reset 함.
- earliest: 가장 초기의 오프셋값으로 설정
- latest: 가장 마지막 오프셋값으로 설정
- none: 이전 오프셋값을 찾지 못하면 에러를 나타냄.
fetch.max.bytes 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기
group.instance.id 컨슈머의 고유 식별자. 만약 설정한다면 static 멤버로 간주되어, 불필요한 리밸런싱을 하지 않음
isolation.level 트랜잭션 컨슈머에서 사용되는 옵션.
- read_uncommited(Default): 모든 메시지를 읽음
- read_committed : 트랜잭션이 완료된 메시지만 읽음
max.poll.records 한 번의 poll() 요청으로 가져오는 최대 메시지 수.
partition.assignment.strategy 파티션 할당 전략.(Default : range)
fetch.max.wait.ms fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간.

 

 컨슈머 예시(Java)

컨슈머가 메시지를 가져오는 방법은 크게 세 가지가 있다. '1) 오토 커밋(Auto Commit), 2) 동기 가져오기, 3) 비동기 가져오기' 가 있다. 각각에 대한 예시 코드와 설명은 아래와 같다

 

1) 오토 커밋(Auto Commit)

  • 컨슈머 애플리케이션에서 기본값으로 가장 많이 사용되는 것이 오토 커밋이다
  • 오프셋을 주기적으로 자동 커밋하므로 관리자가 따로 관리하지 않아도 되서 편리하다
  • 반면, 컨슈머 종료가 빈번히 일어날 경우 종종 메시지 누락, 중복 등의 문제가 발생하는 경우도 있다. 하지만 카프카와 컨슈머는 안정적으로 동작하므로 빈번한 일이 아니므로 오토 커밋을 주로 사용하는 것.

Example Code

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;

public class ConsumerAutoCommit {
    public static void main(String[] args) {
        Properties props = new Properties(); // 1
        props.put("bootstrap.servers", "kafkaAddress1:port, kafkaAddress2:port, kafkaAddress3:port"); // 2
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 3
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 3
        props.put("group.id", "my-consumer01"); // 4
        props.put("enable.auto.commit", "true"); // 5
        props.put("auto.offset.reset", "latest"); // 6
       
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 7
        consumer.subscribe(Arrays.asList("my-topic-01")); // 8

        try {
            while(true){ // 9                
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); // 10
                for (ConsumerRecord<String, String> record : records) { // 11
                    System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Value :%s\n",
                            record.topic(),record.partition(),record.offset(),record.key(),record.value());
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close(); // 12
        }
    }
}
  1. Properties 객체 생성
  2. 브로커 리스트 정의. 카프카 서버의 IP 또는 DNS를 kafkaAddress에, 포트를 port에 넣어준다
  3. 프로듀서에서 StringSerializer를 사용했으므로 Deserializer 역시 StringDeserializer를 사용한다.
  4. 컨슈머 그룹 아이디 정의
  5. 오토 커밋 사용: 오프셋을 자동으로 커밋하도록 함
  6. 컨슈머가 오프셋을 찾지 못하는 경우 가장 최근의 오프셋 값(latest)의 메시지를 가져옴 
  7. Properties 객체를 전달해 새 컨슈머 생성
  8. Consumer가 구독할 Topic을 지정한다.
  9. Polling loop(무한 루프): 카프카 컨슈머에서 poll() 메서드가 포함된 무한루프를 말하며, 지속적으로 메시지를 가져온다.
  10. 컨슈머는 poll() 메서드를 통해 메시지를 가져온다. 타임아웃 주기(1000)를 설정해 해당 시간만큼 블록한다.
    [참고] poll(long)은 2.0버전이후 Deprecated 되었다. 대신 Duration을 사용한다.
  11. poll() 메서드는 여러 개의 레코드를 리턴하므로 반복문을 통해 개별 레코드를 처리한다.
  12. 컨슈머 종료

 

2) 동기 가져오기 (Sync Pull)

  • 동기 방식으로 가져오는 경우 속도는 느리지만, 메시지 손실(토픽에는 메시지가 존재하지만 오프셋 커밋이 잘못된 경우)은 거의 발생하지 않는다.
  • 메시지 손실이 일어나면 안되는 중요한 작업에 동기 방식을 적용하는 것을 권장한다. 하지만 이 방법도 메시지의 중복 이슈를 완전히 피해갈 수는 없다.(추후 컨슈머 상세 포스팅에서 다룬다)

Example Code

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;

public class ConsumerSync {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafkaAddress1:port, kafkaAddress2:port, kafkaAddress3:port");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my-consumer01");
        props.put("enable.auto.commit", "false"); // 1
        props.put("auto.offset.reset", "latest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic-01"));

        try {
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Value :%s\n",
                            record.topic(),record.partition(),record.offset(),record.key(),record.value());
                }
                consumer.commitSync(); // 2
            }
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}
  1. 오토 커밋 미사용(false): 오프셋 커밋을 직접 지정함 
  2. records에 대한 처리를 모두 완료 후, 추가 polling 전 오프셋을 동기(Sync) 커밋한다

 

3) 비동기 가져오기 (Async Pull)

  • 비동기 커밋(commitAsync)은 동기 커밋과 달리 오프셋 커밋을 실패하더라도 재시도 하지 않는다. 재시도가 있다고 가정할 경우, 앞서 커밋한 오프셋 번호가 실패한 후, 재시도를 통해 커밋이 된다면 오프셋 번호가 마지막 데이터의 오프셋을 가리키지 않기 때문이다.
  • [예시]
    1. 1번 오프셋 메시지 읽은 뒤 1번 오프셋을 비동기 커밋 성공 → 현재 마지막 오프셋 = 1
    2. 2번 오프셋 메시지 읽은 뒤 2번 오프셋을 비동기 커밋하지만 실패 → 현재 마지막 오프셋 = 1
    3. 3번 오프셋 메시지 읽은 뒤 3번 오프셋을 비동기 커밋 성공 → 현재 마지막 오프셋 = 3
    4. 2번 오프셋을 비동기 커밋 재시도하여 성공 → 현재 마지막 오프셋 = 2 (마지막 데이터의 오프셋인 3을 가리키지 않음)
  • 비동기 방식을 좀 더 보완하기 위해 콜백을 같이 사용하는 경우도 있다.

Example Code

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;

public class ConsumerAsync {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafkaAddress1:port, kafkaAddress2:port, kafkaAddress3:port");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "my-consumer01");
        props.put("enable.auto.commit", "false"); // 1
        props.put("auto.offset.reset", "latest");
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic-01"));

        try {
            while(true){
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Value :%s\n",
                            record.topic(),record.partition(),record.offset(),record.key(),record.value());
                }
                consumer.commitAsync(); // 2
            }
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}
  1. 오토 커밋 미사용(false): 오프셋 커밋을 직접 지정함 
  2. records에 대한 처리를 모두 완료 후, 추가 polling 전 오프셋을 '비'동기('A'sync) 커밋한다 

 

 

❑ REFERENCE

실전 카프카 개발부터 운영까지 | 고승범 | 책만