반응형

intellij(java) -> consumer/ producer 파일생성 및 log파일 생성후 vm 카프카에 데이터를 송/수신

 

intellij>

 

 

의존성 추가!!

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>

 

 

 

consumer 작성

package kafkaTutorial;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {

    public static void main(String[] args) {
        Properties configs = new Properties();
        // 환경 변수 설정
        configs.put("bootstrap.servers", "192.168.56.1:9092");     // kafka server host 및 port
        configs.put("session.timeout.ms", "10000");             // session 설정
        configs.put("group.id", "!!!!!!변경!!!!!");                // topic 설정
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성
        consumer.subscribe(Arrays.asList(""!!!!!!!!변경!!!!!");"));      // topic 설정
        while (true) {  // 계속 loop를 돌면서 producer의 message를 띄운다.
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                String s = record.topic();
                if (""!!!!!!변경!!!!!!!");".equals(s)) {
                    System.out.println(record.value());
                } else {
                    throw new IllegalStateException("get message on topic " + record.topic());
                }
            }
        }
    }

}

 

producer작성

 

package kafkaTutorial;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {

    public static void main(String[] args) throws IOException {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka host 및 server 설정
        configs.put("acks", "all");                         // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
        configs.put("block.on.buffer.full", "true");        // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

        // producer 생성
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        // message 전달
        for (int i = 0; i < 5; i++) {
            String v = "hello world!"+i;
            producer.send(new ProducerRecord<String, String>("!!!!!!!!변경!!!!!!!", v));
        }

        // 종료
        producer.flush();
        producer.close();
    }

}

 

!!!!!!!변경!!!!!!!!!!부분 kafka 설정한 topic으로 변경하기

 

 

main->resources->log4j.properties // 폴더트리 만들어주고 파일생성-log4j만든다.

<log4j>

$ cat src/main/resources/log4j.properties 
# Root logger option
log4j.rootLogger=DEBUG, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

 

마지막으로 vm으로 돌아가서 

1. 카프카 멈춰주고

2. kafka - conf - server.properties를 vim으로 열어서 (맨위에 그냥 추가한다.주석처리 찾아서 해도됨)

advertised.listeners=PLAINTEXT://192.168.56.1:9092
listeners=PLAINTEXT://0.0.0.0:9092

 

3. 카프카 재 실행 후 consumer -> producer 차례로 실행하면 hello world가 보인다

 

 

출처 : https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

 

Why Can’t I Connect to Kafka? | Troubleshoot Connectivity

How to troubleshoot connectivity between Kafka clients (Python, Java, Spring, Go, etc.) to Kafka on Docker, AWS, or any other machine.

www.confluent.io

 

반응형

'설치,명령어등 > 카프카' 카테고리의 다른 글

node-red // kafka  (0) 2022.03.14
kafka 설치/  (0) 2022.03.02

+ Recent posts