[Kafka] Kafka Producer&Consumer Application

[Kafka] Kafka Producer&Consumer Application

Kafka Producer Application

Kafak Producer는 데이터를 프로듀싱 즉 생산하는 역할을 합니다. 즉, 데이터를 Kafka Topic에 생성합니다.

Producer의 역할

Topic에 해당하는 메시지를 생성

특정 Topic으로 데이터를 publish

처리 실패/재시도

gradle 설정

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'

Kafka Producer를 작성한 코드

public class Producer{ public static void main(String[] args) throws IOException{ Properties configs = new Properties(); configs.put("bootstrap.servers", "localhost:9092"); configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer(configs); ProducerRecord record = new ProducerRecord("click_log", "click"); producer.send(record); producer.close(); } }

Properties configs

Producer를 위한 설정입니다. 자바 프로퍼티 객체를 통해 Producer의 설정을 정의합니다. 부트스트랩 서버 설정을 로컬 호스의 Kafka를 바라보도록 설정을 의미합니다. Kafka broker의 주소목록은 되도록이면 2개 이상의 ip와 port를 설정하도록 권장합니다.

KafkaProducer producer

설정한 프로퍼티로 Kafka Producer 인스턴스를 생성합니다.

ProducerRecord record

ProducerRecord 인스턴스를 생성할 때 어느 Topic에 넣을 것인지 어떤 key와 value를 담을 것인지 선언할 수 있습니다. 파라미터 개수에 다라 자동으로 오버로딩되어 인스턴스가 생성되므로 이점을 유의하여 ProducerRecord를 생성합니다.

// key값 포함 ProducerRecord record = new ProducerRecord("click_log", "1", "click");

producer.send()

send() 메서드의 파라미터로 ProducerRecord를 넣으면 전송이 이루어집니다. click_log Topic에 click value가 들어가게 됩니다.

producer.close()

Producer 종료

Kafka Consumer Application

Kafka에서 Consumer가 데이터를 가져가더라도 데이터가 사라지지 않습니다. 이와 같은 특징은 Kafka 그리고 Kafka Consumer를 데이터 파이프라인으로 운영하는데 매우 핵심적인 역할을 합니다.

Kafka Consumer는 기본적으로 Topic의 데이터를 가져옵니다. 데이터는 Topic 내부의 Partition에 저장됩니다. Consumer는 Partition에 저장된 데이터를 가져옵니다. 이렇게 데이터를 가져오는 것을 폴링(polling)이라고 합니다.

Consumer의 역할

Topic의 Partition으로 부터 데이터 Polling

Partition offset 위치 기록(commit)

Consumer group을 통해 병렬 처리

gradle 설정

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'

Kafka Consumer를 작성한 코드

public class Consumer{ public static void main(String[] args){ Properties configs = new Properties(); configs.put("bootstrap.servers", "localhost:9092"); configs.put("group.id", "click_log_group"); configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaConsumer consumer = new KafkaConsumer(configs); consumer.subscribe(Arrays.asList("click_log")); while(true){ ConsumerRecords records = consumer.poll(500); for(ConsumerRecord record : records){ System.out.pritln(record.value()); } } } }

Properties configs

기본적인 Consumer 옵션을 지정할 수 있습니다. bootstrap servers 옵션을 통해 Kafka broker를 설정합니다.

그룹 아이디를 지정합니다. 그룹 아이디는 Consumer 그룹이라고도 불립니다.

KafkaConsumer consumer

Kafka Consumer 클래스를 통해 이전에 선언한 설정들을 매개변수로해서 Consumer인스턴스를 생성합니다. 이 Consumer 인스턴스를 통해 데이터를 읽고 처리할 수 있습니다.

consumer.subscribe()

어느 Topic을 대상으로 데이터를 가져올지 선언합니다. 만약 특정 토픽의 전체 Partition이 아니라 일부 Partition의 데이터만 가져오고 싶다면 아래의 코드처럼 assign() 메서드를 사용하면 됩니다.

TopicPartition partition0 = new TopicPartition(topicName, 0); TopicPartition partition1 = new TopicPartition(topicName, 1); consumer.assign(Arrays.asList(partition0, partition1));

while(true)폴링 루프 구문

Kafka Consumer에서 폴링 루프는 poll()메서드가 포함된 무한루프를 말합니다. Consumer API의 핵심은 broker로부터 연속적으로 그리고 Consumer가 허락하는 한 많은 데이터를 읽는 것입니다. 이런 측면에서 폴링 루프는 Consumer API의 핵심 로직이라고 볼 수 있습니다. Consumer는 poll() 메서드를 통해 데이터를 가져오는데 poll()메서드에서 설정한 시간동안 데이터를 기다리게 됩니다.

from http://hsti.tistory.com/87 by ccl(A) rewrite - 2021-10-05 10:27:42