래원

[Kafka] 간단한 Producer/Consumer 실습 (Java) 본문

Data Engineering/Kafka

[Kafka] 간단한 Producer/Consumer 실습 (Java)

Laewon Jeong 2025. 1. 10. 22:33

Producer/Consumer 실습.

 

 

이번 글에서는 위 그림과 같이 간단한 작업을 실습해볼 예정이다.

 

Producer와 Consumer는 Java를 이용해 구현했으며, Gradle을 사용했다.

 

Java Project 생성하는 법은 이번 글에서는 생략했다.


 

코드를 작성하기 앞서 build.gradle에 의존성을 추가해야한다.

implementation 'org.apache.kafka:kafka-clients:3.5.1'

 

producerTest.java


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class producerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "<서버 IP>:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            String key = "Key" + i;
            String value = "Value" + i;
            producer.send(new ProducerRecord<>("test-topic", key, value));
            System.out.println("Message sent: " + key + " : " + value);
        }

        producer.close();
    }
}

 

위 코드는 Kafka Producer를 사용하여 test-topic이라는 토픽에 10개의 메시지를 전송한다.

각 메시지는 key와 value로 구성되어 있으며, Key0, Value0 에서 Key9, Value9까지 전송된다.

 

consumerTest.java


import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class consumerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "<서버 IP>:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "consumer-group");

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
                        record.key(), record.value(), record.offset());
            }
        }
    }
}

 

위 코드는 Kafka Consumer를 사용하여 test-topic이라는 토픽에서 메시지를 계속해서 읽고 출력하는 코드이다. auto.offset.reset을 earliest로 설정하여 처음부터 메시지를 읽기 시작한다.

 

수행 결과


수행 결과.

 

두 java project 모두 ./gradlew run을 통해 실행하였다.

그 결과, Producer는 test-topic으로 메시지를 잘 보내는 것을 확인할 수 있었고, Consumer 역시 test-topic에서 record를 잘 읽어오는 것을 확인할 수 있었다.

 


깃허브: github

 

kafkaPractice/kafka_pr at main · laewonJeong/kafkaPractice

Contribute to laewonJeong/kafkaPractice development by creating an account on GitHub.

github.com