반응형

node-red를 활용하여 3개의 topic을 만들고 각각의 토픽에 랜덤 변수를 5초간격으로 송, 수신하는 node-red를 만들어라.

 

## 이전 포스트를 보고 kafka가 설치 되어있어야 한다.

 

 

https://nodejs.org/ko/

 

Node.js

Node.js® is a JavaScript runtime built on Chrome's V8 JavaScript engine.

nodejs.org

설치.(stable버전)

 

https://flows.nodered.org/node/node-red-contrib-kafka-manager

 

node-red-contrib-kafka-manager

Node-RED implements Kafka manager with associand associated .

flows.nodered.org

 

$ npm install kafka-node

 

 

<kafka내장 주키퍼로 시작>

$ zookeeper-server-start.sh -daemon /root/kafka/config/zookeeper.properties

$ yum install telnet
<엑세스 여부 확인>
$ telnet localhost 2181

<기본속성으로 kafka를 시작>
$ kafka-server-start.sh -daemon /root/kafka/config/server.properties

<9092 확인>

$ telnet localhost 9092

<샘플 주제를 만든다>

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic3

<작성된 주제 나열>

$ kafka-topics.sh --zookeeper localhost:2181 --list

#topic 1,2,3뜨면 됨

구성은 이렇게 해봤고, 

 

node-red에서 최초 inject를 잡아주고, 그다음 필요한 각각의 input을 넣어 프로세스를 잡아준다. 최종적으로 kafka_producer로 향하게 노드를 그린다.

 

# 설정은 필요에 따라 다르게 하고, Topic부분에 아까 써줬던 topic1,2,3을 각각 잡아주고

 

broker부분에 연필모양을 눌러서 hosts를 vm으로 잡아준다. (192.168.56.1 : 9092)

 

그리고 전송이 잘 되었는지 확인 하기 위해 debug consol을 활용하여 로그를 확인한다.

 

마지막으로 producer와 같이 consumer를 생성한뒤, 각각의 토픽명들을 적어주고, 브로커를 똑같이 잡아주고, 콘솔도 추가해준다. 

 

마지막으로 deploy를 눌러 시작하고 오른쪽의 debug를 눌러 로그를 확인한다.

 

기호에 따라 inject부분에 interval을 줘서 계속 데이터가 들어오는지 확인한다.

 

끝.

반응형

'설치,명령어등 > 카프카' 카테고리의 다른 글

kafka 테스트/실습  (0) 2022.03.04
kafka 설치/  (0) 2022.03.02
반응형

intellij(java) -> consumer/ producer 파일생성 및 log파일 생성후 vm 카프카에 데이터를 송/수신

 

intellij>

 

 

의존성 추가!!

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>

 

 

 

consumer 작성

package kafkaTutorial;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {

    public static void main(String[] args) {
        Properties configs = new Properties();
        // 환경 변수 설정
        configs.put("bootstrap.servers", "192.168.56.1:9092");     // kafka server host 및 port
        configs.put("session.timeout.ms", "10000");             // session 설정
        configs.put("group.id", "!!!!!!변경!!!!!");                // topic 설정
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성
        consumer.subscribe(Arrays.asList(""!!!!!!!!변경!!!!!");"));      // topic 설정
        while (true) {  // 계속 loop를 돌면서 producer의 message를 띄운다.
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                String s = record.topic();
                if (""!!!!!!변경!!!!!!!");".equals(s)) {
                    System.out.println(record.value());
                } else {
                    throw new IllegalStateException("get message on topic " + record.topic());
                }
            }
        }
    }

}

 

producer작성

 

package kafkaTutorial;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {

    public static void main(String[] args) throws IOException {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka host 및 server 설정
        configs.put("acks", "all");                         // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
        configs.put("block.on.buffer.full", "true");        // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

        // producer 생성
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        // message 전달
        for (int i = 0; i < 5; i++) {
            String v = "hello world!"+i;
            producer.send(new ProducerRecord<String, String>("!!!!!!!!변경!!!!!!!", v));
        }

        // 종료
        producer.flush();
        producer.close();
    }

}

 

!!!!!!!변경!!!!!!!!!!부분 kafka 설정한 topic으로 변경하기

 

 

main->resources->log4j.properties // 폴더트리 만들어주고 파일생성-log4j만든다.

<log4j>

$ cat src/main/resources/log4j.properties 
# Root logger option
log4j.rootLogger=DEBUG, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

 

마지막으로 vm으로 돌아가서 

1. 카프카 멈춰주고

2. kafka - conf - server.properties를 vim으로 열어서 (맨위에 그냥 추가한다.주석처리 찾아서 해도됨)

advertised.listeners=PLAINTEXT://192.168.56.1:9092
listeners=PLAINTEXT://0.0.0.0:9092

 

3. 카프카 재 실행 후 consumer -> producer 차례로 실행하면 hello world가 보인다

 

 

출처 : https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

 

Why Can’t I Connect to Kafka? | Troubleshoot Connectivity

How to troubleshoot connectivity between Kafka clients (Python, Java, Spring, Go, etc.) to Kafka on Docker, AWS, or any other machine.

www.confluent.io

 

반응형

'설치,명령어등 > 카프카' 카테고리의 다른 글

node-red // kafka  (0) 2022.03.14
kafka 설치/  (0) 2022.03.02
반응형

kafaka version 2.0.0

 

 

$ java -version
-> 1.8.xxx

 

 

-kafka install url-

 

 

https://kafka.apache.org/downloads

 

아래의 예제는 /root/에서 다 실행함. 

$ wget https://archive.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz

$ tar -xzf kafka_2.12-2.0.0.tgz

<심볼릭 링크 만들기>

$ ln -s kafka_2.12-2.0.0 kafka

$ echo "export PATH=$PATH:/root/kafka_2.12-2.0.0/bin" >> ~/.bash_profile

$ source ~/.bash_profile

<kafka내장 주키퍼로 시작>

$ zookeeper-server-start.sh -daemon /root/kafka/config/zookeeper.properties

$ yum install telnet
<엑세스 여부 확인>
$ telnet localhost 2181

<기본속성으로 kafka를 시작>
$ kafka-server-start.sh -daemon /root/kafka/config/server.properties

<9092 확인>

$ telnet localhost 9092

<샘플 주제를 만든다>

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tecmint

<작성된 주제 나열>

$ kafka-topics.sh --zookeeper localhost:2181 --list

 

 

출처 : https://ko.linux-console.net/?p=381

반응형

'설치,명령어등 > 카프카' 카테고리의 다른 글

node-red // kafka  (0) 2022.03.14
kafka 테스트/실습  (0) 2022.03.04

+ Recent posts