package kafkaTutorial;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
// 환경 변수 설정
configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka server host 및 port
configs.put("session.timeout.ms", "10000"); // session 설정
configs.put("group.id", "!!!!!!변경!!!!!"); // topic 설정
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer 생성
consumer.subscribe(Arrays.asList(""!!!!!!!!변경!!!!!");")); // topic 설정
while (true) { // 계속 loop를 돌면서 producer의 message를 띄운다.
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
String s = record.topic();
if (""!!!!!!변경!!!!!!!");".equals(s)) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
producer작성
package kafkaTutorial;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka host 및 server 설정
configs.put("acks", "all"); // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
configs.put("block.on.buffer.full", "true"); // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// message 전달
for (int i = 0; i < 5; i++) {
String v = "hello world!"+i;
producer.send(new ProducerRecord<String, String>("!!!!!!!!변경!!!!!!!", v));
}
// 종료
producer.flush();
producer.close();
}
}