이번 글에서는 위 그림과 같이 간단한 작업을 실습해볼 예정이다.
Producer와 Consumer는 Java를 이용해 구현했으며, Gradle을 사용했다.
Java Project 생성하는 법은 이번 글에서는 생략했다.
코드를 작성하기 앞서 build.gradle에 의존성을 추가해야한다.
implementation 'org.apache.kafka:kafka-clients:3.5.1'
producerTest.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class producerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<서버 IP>:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "Key" + i;
String value = "Value" + i;
producer.send(new ProducerRecord<>("test-topic", key, value));
System.out.println("Message sent: " + key + " : " + value);
}
producer.close();
}
}
위 코드는 Kafka Producer를 사용하여 test-topic이라는 토픽에 10개의 메시지를 전송한다.
각 메시지는 key와 value로 구성되어 있으며, Key0, Value0 에서 Key9, Value9까지 전송된다.
consumerTest.java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class consumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<서버 IP>:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "consumer-group");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
record.key(), record.value(), record.offset());
}
}
}
}
위 코드는 Kafka Consumer를 사용하여 test-topic이라는 토픽에서 메시지를 계속해서 읽고 출력하는 코드이다. auto.offset.reset을 earliest로 설정하여 처음부터 메시지를 읽기 시작한다.
수행 결과
두 java project 모두 ./gradlew run을 통해 실행하였다.
그 결과, Producer는 test-topic으로 메시지를 잘 보내는 것을 확인할 수 있었고, Consumer 역시 test-topic에서 record를 잘 읽어오는 것을 확인할 수 있었다.
깃허브: github
kafkaPractice/kafka_pr at main · laewonJeong/kafkaPractice
Contribute to laewonJeong/kafkaPractice development by creating an account on GitHub.
github.com
'Data Engineering > Kafka' 카테고리의 다른 글
[Kafka] Apache Kafka 설치 (0) | 2025.01.08 |
---|---|
[Kafka] Zookeeper와 KRaft(Kafka Raft) (0) | 2025.01.07 |
[Kafka] Kafka 데이터 흐름 이해하기: Partition과 Offset (0) | 2025.01.07 |
[Kafka] Apache Kafka 개요 (0) | 2024.12.28 |