반응형
intellij(java) -> consumer/ producer 파일생성 및 log파일 생성후 vm 카프카에 데이터를 송/수신
intellij>
의존성 추가!!
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
consumer 작성
package kafkaTutorial;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties configs = new Properties();
// 환경 변수 설정
configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka server host 및 port
configs.put("session.timeout.ms", "10000"); // session 설정
configs.put("group.id", "!!!!!!변경!!!!!"); // topic 설정
configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key deserializer
configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); // consumer 생성
consumer.subscribe(Arrays.asList(""!!!!!!!!변경!!!!!");")); // topic 설정
while (true) { // 계속 loop를 돌면서 producer의 message를 띄운다.
ConsumerRecords<String, String> records = consumer.poll(500);
for (ConsumerRecord<String, String> record : records) {
String s = record.topic();
if (""!!!!!!변경!!!!!!!");".equals(s)) {
System.out.println(record.value());
} else {
throw new IllegalStateException("get message on topic " + record.topic());
}
}
}
}
}
producer작성
package kafkaTutorial;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class Producer {
public static void main(String[] args) throws IOException {
Properties configs = new Properties();
configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka host 및 server 설정
configs.put("acks", "all"); // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
configs.put("block.on.buffer.full", "true"); // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정
// producer 생성
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// message 전달
for (int i = 0; i < 5; i++) {
String v = "hello world!"+i;
producer.send(new ProducerRecord<String, String>("!!!!!!!!변경!!!!!!!", v));
}
// 종료
producer.flush();
producer.close();
}
}
!!!!!!!변경!!!!!!!!!!부분 kafka 설정한 topic으로 변경하기
main->resources->log4j.properties // 폴더트리 만들어주고 파일생성-log4j만든다.
<log4j>
$ cat src/main/resources/log4j.properties
# Root logger option
log4j.rootLogger=DEBUG, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
마지막으로 vm으로 돌아가서
1. 카프카 멈춰주고
2. kafka - conf - server.properties를 vim으로 열어서 (맨위에 그냥 추가한다.주석처리 찾아서 해도됨)
advertised.listeners=PLAINTEXT://192.168.56.1:9092
listeners=PLAINTEXT://0.0.0.0:9092
3. 카프카 재 실행 후 consumer -> producer 차례로 실행하면 hello world가 보인다
출처 : https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
Why Can’t I Connect to Kafka? | Troubleshoot Connectivity
How to troubleshoot connectivity between Kafka clients (Python, Java, Spring, Go, etc.) to Kafka on Docker, AWS, or any other machine.
www.confluent.io
반응형
'설치,명령어등 > 카프카' 카테고리의 다른 글
node-red // kafka (0) | 2022.03.14 |
---|---|
kafka 설치/ (0) | 2022.03.02 |