Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 아파치 카프카
- Apache Spark
- 티스토리챌린지
- Python
- 아파치 하둡
- Apache Hadoop
- 카프카
- 아파치 스파크
- heapq
- apache kafka
- 분산처리
- 오블완
- 프로그래머스
- programmers
- 도커
- 알고리즘
- 우선순위큐
- leetcode
- DP
- 이진탐색
- docker
- 코딩테스트
- 리트코드
- 그래프
- String
- 하둡
- BFS
- 분산
- 파이썬
- 문자열
Archives
- Today
- Total
래원
[Kafka] 간단한 Producer/Consumer 실습 (Java) 본문
이번 글에서는 위 그림과 같이 간단한 작업을 실습해볼 예정이다.
Producer와 Consumer는 Java를 이용해 구현했으며, Gradle을 사용했다.
Java Project 생성하는 법은 이번 글에서는 생략했다.
코드를 작성하기 앞서 build.gradle에 의존성을 추가해야한다.
implementation 'org.apache.kafka:kafka-clients:3.5.1'
producerTest.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class producerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<서버 IP>:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "Key" + i;
String value = "Value" + i;
producer.send(new ProducerRecord<>("test-topic", key, value));
System.out.println("Message sent: " + key + " : " + value);
}
producer.close();
}
}
위 코드는 Kafka Producer를 사용하여 test-topic이라는 토픽에 10개의 메시지를 전송한다.
각 메시지는 key와 value로 구성되어 있으며, Key0, Value0 에서 Key9, Value9까지 전송된다.
consumerTest.java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class consumerTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "<서버 IP>:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "consumer-group");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received message: key = %s, value = %s, offset = %d%n",
record.key(), record.value(), record.offset());
}
}
}
}
위 코드는 Kafka Consumer를 사용하여 test-topic이라는 토픽에서 메시지를 계속해서 읽고 출력하는 코드이다. auto.offset.reset을 earliest로 설정하여 처음부터 메시지를 읽기 시작한다.
수행 결과
두 java project 모두 ./gradlew run을 통해 실행하였다.
그 결과, Producer는 test-topic으로 메시지를 잘 보내는 것을 확인할 수 있었고, Consumer 역시 test-topic에서 record를 잘 읽어오는 것을 확인할 수 있었다.
깃허브: github
'Data Engineering > Kafka' 카테고리의 다른 글
[Kafka] Apache Kafka 설치 (0) | 2025.01.08 |
---|---|
[Kafka] Zookeeper와 KRaft(Kafka Raft) (0) | 2025.01.07 |
[Kafka] Kafka 데이터 흐름 이해하기: Partition과 Offset (0) | 2025.01.07 |
[Kafka] Apache Kafka 개요 (0) | 2024.12.28 |