Back-End/Kafka

[Kafka] 기초2 - 프로듀서 기본 동작과 예시

유자맛바나나 2022. 8. 11. 02:17

 

 프로듀서 디자인

프로듀서는 카프카의 토픽으로 메시지를 전송하는 역할을 담당한다. 프로듀서를 설정할 수 있는 여러가지 옵션이 있으므로 원하는 조건으로 메시지를 보내기 위해선 옵션을 적절히 사용할 수 있어야한다.

출처: https://dzone.com/articles/take-a-deep-dive-into-kafka-producer-api

레코드(ProducerRecord)

  • 레코드는 카프카로 전송하기 위한 실제 데이터이며, 토픽(Topic), 파티션(Partition), 키(Key), 밸류(Value)으로 구성된다. 
  • 토픽과 밸류(메시지 내용)는 필수 값이다.
  • 파티션(특정 파티션을 지정), 키(특정 파티션에 레코드를 정렬)은 선택 옵션이다.

 

시리얼라이저(Serializer)

  • 레코드의 키와 밸류를 Byte Array로 변환시킨다.

 

파티셔너(Partitioner)

  • 레코드들은 프로듀서의 send() 메시지를 통해 시리얼라이저와 파티셔너를 거친다.
  • 프로듀서 내부에선 send() 동작 이후 파티션별로 레코드를 모아둔 후 배치전송을 하게된다.
  • 전송이 실패하면 재시도 동작이 이뤄지고, 지정된 횟수만큼의 재시도가 실패하면 최종 실패를 전달한다.
  • 전송이 성공하면 메타데이터를 리턴한다.
  • 파티셔너의 파티션 선택 방식
    레코드의 선택 옵션인 파티션과 키에 따라 파티셔너가 다르게 동작한다.
파티션 지정 키 지정 파티션 선택 방식
O - 키 지정과 상관 없이, 파티셔너는 어떠한 동작도 안하고 지정된 파티션으로 레코드를 전달한다.
X O 파티셔너 내 해시 함수에 따라 특정 해시값으로 변환되고, 이 값이 파티션을 선택하는데 사용된다.
X X 라운드 로빈(Round-robin) 방식으로 파티션을 지정한다. 단, 데이터를 전송할 때 배치로 전송하는 점에 유의한다. 프로듀서에서 레코드 하나씩 라운드 로빈으로 파티션에 전송하는 것이 아니라, 최대한 레코드를 모아 하나의 배치를 만들어 파티션에 전송하는 것이고, 배치 단위가 라운드 로빈인 것이다.

[참고] 파티션 지정과 키 지정의 차이가 무엇일까?

공통점
토픽 내 특정 파티션이 지정되어 해당 파티션으로만 데이터를 쌓게 할 수 있다.
차이점
파티션을 지정하는 것 파티션 0번, 1번, 2번 등 특정 파티션을 정확하게 가리키는 것이다.
키를 지정하는 것 해시 함수를 이용해 파티션들 중 하나가 지정되는 것이다. 즉, 내가 어떠한 파티션을 직접 가리키는 것이 아니라 해시 함수에 의해 특정 파티션이 결정되는 것이다.
주의할 점이 있다. 파티션을 새로 추가하면 기존의 키와 파티션 매칭 정보가 깨진다. 따라서 기존에 사용한 키를 지정하더라도 동일한 파티션에 레코드가 쌓이는 것이 보장되지 않는다. 따라서 키를 사용할 경우 이 점에 유의해 처음부터 파티션 개수를 적절히 선정하고 추가 파티션을 생성하지 않는 것을 권장한다. 

 

❑ 프로듀서의 주요 옵션

대부분 사람들이 프로듀서를 기본값으로 사용하는 경우가 많지만 본인이 원하는 형태로 카프카를 이용해 메시지를 전송하려면 옵션들을 적절히 사용해야 한다. 주요 옵션은 아래와 같다

프로듀서 옵션 설명
bootstrap.servers 카프카 클러스터는 클러스터 마스터라는 개념이 없으므로, 클러스터 내 모든 서버가 클라이언트의 요청을 받을 수 있음. 클라이언트가 카프카 클러스터에 처음 연결하기 위한 호스트와 포트정보를 나타 냄.
client.dns.lookup 하나의 호스트에 여러 IP를 매핑해 사용하는 일부 환경에서 클라이언트가 하나의 IP와 연결하지 못할 경우에 다른 IP로 시도하는 설정(Host 한 개가 여러개의 IP를 가질 때를 말하는 듯). 
use_all_dns_ips가 기본값으로, DNS에 할당된 호스트의 모든 IP를 쿼리하고 저장. 첫 번째 IP로 접근 실패 시, 종료하지 않고 다음 IP로 접근 시도. resolve_canonical_bootstrap_servers_only 옵션은 Kerberos 환경에서 FQDN을 얻기 위한 용도로 사용 됨.
acks 프로듀서가 카프카 토픽의 리더 측에 메시지를 전송한 후 요청을 완료하기를 결정하는 옵션이며 0, 1, all(-1) 세 가지를 선택할 수 있음.
- 0 : 빠른 전송을 의미하지만, 일부 메시지 손실 가능성 있음. 
- 1 : 리더가 메시지를 받았는지 확인하지만, 모든 팔로워를 전부 확인하지 않음. 대부분 기본값으로 1을 사용. 
- all : 팔로워가 메시지를 받았는지 여부 확인. 다소 느릴 수 있으나 하나의 팔로우가 있는 한 메시지 손실은 없음.
buffer.memory 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등)할 수 있는 전체 메모리 바이트(byte)
compression.type 프로듀서가 메시지 전송 시 선택할 수 있는 압축 타입. none, gzip, snappy, lz4, zstd 중 선택
enable.idempotence true로 설정할 경우 중복 없는 전송이 가능하며, 이와 동시에 max.in.flight.requests.per.connection은 5 이하, retries는 0 이상, acks는 all로 설정해야 함.
max.in.flight.requests.per.connenction 하나의 커넥션에서 프로듀서가 최대한 ACK 없이 전송할 수 있는 요청 수. 메시지의 순서가 중요하다면 1로 설정할 것을 권장하지만 성능은 다소 떨어짐.
retries 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내주는 횟수
batch.size 프로듀서는 동일한 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도. 적절한 배치 크기 설정은 성능에 도움을 줌.
linger.ms 배치 형태의 메시지를 보내기 전 추가적인 메시지를 위해 기다리는 시간을 조정하고, 배치 크기에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달 했을 때 메시지를 전송.
transactional.id ‘정확히 한 번 전송’을 위해 사용하는 옵션. 동일한 TransactionalId에 한해 정확히 한 번을 보장. 옵션을 사용하기 전 enable.idempotence를 true로 설정해야 함.

 

 

 프로듀서 예시(Java)

프로듀서의 전송 방법은 1) 메시지를 보낸 후 응답 확인하지 않기, 2) 동기 전송, 3) 비동기 전송 크게 세 가지로 나뉜다. 각각에 대한 예시 코드와 설명은 아래와 같다

 

 

1) 메시지를 보낸 후 응답 확인하지 않기

  • 프로듀서에서 카프카의 토픽으로 레코드를 전송한 후 응답을 확인하지 않는 예제다
  • 실제 운영 환경에서 사용하지 않는 것을 추천한다. 하지만 카프카는 항상 살아 있고, 프로듀서 또한 자동으로 재시작해 대부분 성공적으로 메시지가 전송된다

Example Code

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerIgnoreResponse {
    public static void main(String[] args) {

        Properties props = new Properties(); // 1
        props.put("bootstrap.servers", "kafkaAddress1:port, kafkaAddress2:port, kafkaAddress3:port"); // 2
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 3
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 3
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 4
        try {
            for(int i=0; i<3; i++){
                ProducerRecord<String, String> record = new ProducerRecord<>("my-topic-01",
                        "Hello World " + i); // 5
                producer.send(record); // 6
            }
        } catch (Exception e){
            e.printStackTrace(); // 7
        } finally {
            producer.close(); // 8
        }
    }
}

 

  1. Properties 객체 생성
  2. 브로커 리스트 정의. 카프카 서버의 IP 또는 DNS를 kafkaAddress에, 포트를 port에 넣어준다
  3. 메시지 키와 밸류는 문자열 타입이므로 카프카의 기본 StringSerializer를 지정. Serializer는 Byte Array로 변환해주는 역할을 한다.
  4. Properties 객체를 전달해 새 프로듀서 생성
  5. ProducerRecord 객체 생성
  6. send() 메소드를 사용해 메시지를 전송 후 Java Future 객체로 RecordMetadata를 리턴 받지만, 리턴값을 무시하므로 메시지가 성공적으로 전송됐는지 파악할 수 없음
  7. 카프카 브로커에게 메시지를 전송하기 전 에러가 발생하면 예외 처리 가능, 전송 후의 에러는 무시함
  8. 프로듀서 종료

 

2) 동기 전송 (Sync Send)

Example Code

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class ProducerSync {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafkaAddress1:port, kafkaAddress2:port, kafkaAddress3:port");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        try {
            for(int i=0; i<3; i++){
                ProducerRecord<String, String> record = new ProducerRecord<>("my-topic-01",
                        "Hello World " + i);
                RecordMetadata metadata = producer.send(record).get(); // 1
                System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Received Message :%s\n",
                        metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value()); // 2
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

 

  1. 프로듀서는 send() 메서드를 실행 후 get() 메서드를 사용해 카프카의 응답을 기다린다. 메시지가 성공적으로 전송되지 않으면 예외가 발생하고, 에러가 없다면 RecordMetadata를 얻는다.
  2. RecordMetadata 객체를 이용해 토픽, 파티션, 오프셋, 키, 밸류 정보를 확인할 수 있다.

 

3) 콜백을 이용한 비동기 전송 (Async Send)

  • 동기 전송(Sync Send) 방식과 같이 프로듀서가 보낸 모든 레코드에 대해 응답을 기다리면 많은 시간을 소비하게 되므로 빠른 전송을 할 수 없다.
  • 하지만 비동기 방식으로 전송하면 응답을 기다리지 않아 빠른 전송이 가능하다.
  • 레코드 전송이 실패하더라도 예외 처리를 할 수 있어 에러 로그 등을 기록할 수 있다.

Example Code : Callback

import org.apache.kafka.clients.producer.Callback; 
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class MyProducerCallback implements Callback { // 1

    private ProducerRecord<String,String> record;

    public MyProducerCallback(ProducerRecord<String, String> record) {
        this.record = record;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) { // 2
        if(e != null){
            e.printStackTrace(); // 3
        } else {
            System.out.printf("Topic : %s , Partition : %d , Offset : %d, Key : %s, Received Message :%s\n",
                    metadata.topic(), metadata.partition(), metadata.offset(), record.key(), record.value()); // 4
        }

    }
}
  1. 콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback 을 구현체가 필요하다.
    *주의: org.apache.kafka의 Callback이다.
  2. 레코드 전송이 완료되면 onCompletion()가 실행된다.
  3. 만약 카프카가 오류를 리턴하면 Exception을 갖게되므로(e != null) 예외 처리를 할 수 있다. 실제 운영 환경에서는 추가적인 예외 처리가 필요하다.
  4. 전송이 정상적으로 되었다면 RecordMetadata 객체를 이용해 토픽, 파티션, 오프셋, 키, 밸류 정보를 확인할 수 있다.

 

Example Code : ProducerAsync

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class ProducerAsync {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafkaAddress1:port, kafkaAddress2:port, kafkaAddress3:port");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        try {
            for(int i=0; i<3; i++){
                ProducerRecord<String, String> record = new ProducerRecord<>("my-topic-01",
                        "Hello World " + i);
                producer.send(record, new MyProducerCallback(record)); // 1
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}
  1. 프로듀서에서 레코드를 보낼 때 콜백 객체를 같이 보낸다. 레코드 전송이 완료되면 콜백 객체의 onCompletion()가 실행된다.

 

❑ REFERENCE

실전 카프카 개발부터 운영까지 | 고승범 | 책만