반응형

중요한 사항

 

1. Flask -> REST를 이용해서 서비스 제공( 데이터 전처리, 딥러닝, 머신러닝 모델의 학습 설계, 학습, 확인)

2. REST API의 자동명세 필요 (기존 엑셀작업 -> 자동 명세)

3. 파일 및 기능이 점점 늘어나면서 관리 필요

 

flask-restx를 이용하여 rest를 서비스하고

swagger를 사용하여 문서화하여 공유 예정 

 

Flask-restx란?

  • flask-restful 라이브러리 중 하나
  • Swagger라는 rest api를 문서화해주는 도구를 지원

Swagger란?

  • 개발한 rest api를 편리하게 문서화 해준다.

 ex ) https://www.data.go.kr/data/15102239/openapi.do#/

 

기상청_전국 해수욕장 날씨 조회서비스

전국 해수욕장의 날씨 예보(초단기예보,단기예보)와 조석 정보, 파고 정보, 일출일몰 정보, 수온 정보를 제공하는 서비스

www.data.go.kr

 

  • 그래서 이러한 문서를 통해서 프로젝트를 관리하고, 제 3의 사용자및 협업 대상이 편리하게 api를 테스트, 호출해볼수 있음
반응형

'work > web' 카테고리의 다른 글

pydantic / dataclass [파이썬 / python]  (0) 2022.10.11
flask 비동기 처리 해보기 -> FASTAPI 사용하기  (0) 2022.07.26
반응형

node-red를 활용하여 3개의 topic을 만들고 각각의 토픽에 랜덤 변수를 5초간격으로 송, 수신하는 node-red를 만들어라.

 

## 이전 포스트를 보고 kafka가 설치 되어있어야 한다.

 

 

https://nodejs.org/ko/

 

Node.js

Node.js® is a JavaScript runtime built on Chrome's V8 JavaScript engine.

nodejs.org

설치.(stable버전)

 

https://flows.nodered.org/node/node-red-contrib-kafka-manager

 

node-red-contrib-kafka-manager

Node-RED implements Kafka manager with associand associated .

flows.nodered.org

 

$ npm install kafka-node

 

 

<kafka내장 주키퍼로 시작>

$ zookeeper-server-start.sh -daemon /root/kafka/config/zookeeper.properties

$ yum install telnet
<엑세스 여부 확인>
$ telnet localhost 2181

<기본속성으로 kafka를 시작>
$ kafka-server-start.sh -daemon /root/kafka/config/server.properties

<9092 확인>

$ telnet localhost 9092

<샘플 주제를 만든다>

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic3

<작성된 주제 나열>

$ kafka-topics.sh --zookeeper localhost:2181 --list

#topic 1,2,3뜨면 됨

구성은 이렇게 해봤고, 

 

node-red에서 최초 inject를 잡아주고, 그다음 필요한 각각의 input을 넣어 프로세스를 잡아준다. 최종적으로 kafka_producer로 향하게 노드를 그린다.

 

# 설정은 필요에 따라 다르게 하고, Topic부분에 아까 써줬던 topic1,2,3을 각각 잡아주고

 

broker부분에 연필모양을 눌러서 hosts를 vm으로 잡아준다. (192.168.56.1 : 9092)

 

그리고 전송이 잘 되었는지 확인 하기 위해 debug consol을 활용하여 로그를 확인한다.

 

마지막으로 producer와 같이 consumer를 생성한뒤, 각각의 토픽명들을 적어주고, 브로커를 똑같이 잡아주고, 콘솔도 추가해준다. 

 

마지막으로 deploy를 눌러 시작하고 오른쪽의 debug를 눌러 로그를 확인한다.

 

기호에 따라 inject부분에 interval을 줘서 계속 데이터가 들어오는지 확인한다.

 

끝.

반응형

'설치,명령어등 > 카프카' 카테고리의 다른 글

kafka 테스트/실습  (0) 2022.03.04
kafka 설치/  (0) 2022.03.02
반응형

intellij(java) -> consumer/ producer 파일생성 및 log파일 생성후 vm 카프카에 데이터를 송/수신

 

intellij>

 

 

의존성 추가!!

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>

 

 

 

consumer 작성

package kafkaTutorial;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class Consumer {

    public static void main(String[] args) {
        Properties configs = new Properties();
        // 환경 변수 설정
        configs.put("bootstrap.servers", "192.168.56.1:9092");     // kafka server host 및 port
        configs.put("session.timeout.ms", "10000");             // session 설정
        configs.put("group.id", "!!!!!!변경!!!!!");                // topic 설정
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    // key deserializer
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // value deserializer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);    // consumer 생성
        consumer.subscribe(Arrays.asList(""!!!!!!!!변경!!!!!");"));      // topic 설정
        while (true) {  // 계속 loop를 돌면서 producer의 message를 띄운다.
            ConsumerRecords<String, String> records = consumer.poll(500);
            for (ConsumerRecord<String, String> record : records) {
                String s = record.topic();
                if (""!!!!!!변경!!!!!!!");".equals(s)) {
                    System.out.println(record.value());
                } else {
                    throw new IllegalStateException("get message on topic " + record.topic());
                }
            }
        }
    }

}

 

producer작성

 

package kafkaTutorial;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Producer {

    public static void main(String[] args) throws IOException {

        Properties configs = new Properties();
        configs.put("bootstrap.servers", "192.168.56.1:9092"); // kafka host 및 server 설정
        configs.put("acks", "all");                         // 자신이 보낸 메시지에 대해 카프카로부터 확인을 기다리지 않습니다.
        configs.put("block.on.buffer.full", "true");        // 서버로 보낼 레코드를 버퍼링 할 때 사용할 수 있는 전체 메모리의 바이트수
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");   // serialize 설정
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // serialize 설정

        // producer 생성
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

        // message 전달
        for (int i = 0; i < 5; i++) {
            String v = "hello world!"+i;
            producer.send(new ProducerRecord<String, String>("!!!!!!!!변경!!!!!!!", v));
        }

        // 종료
        producer.flush();
        producer.close();
    }

}

 

!!!!!!!변경!!!!!!!!!!부분 kafka 설정한 topic으로 변경하기

 

 

main->resources->log4j.properties // 폴더트리 만들어주고 파일생성-log4j만든다.

<log4j>

$ cat src/main/resources/log4j.properties 
# Root logger option
log4j.rootLogger=DEBUG, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

 

마지막으로 vm으로 돌아가서 

1. 카프카 멈춰주고

2. kafka - conf - server.properties를 vim으로 열어서 (맨위에 그냥 추가한다.주석처리 찾아서 해도됨)

advertised.listeners=PLAINTEXT://192.168.56.1:9092
listeners=PLAINTEXT://0.0.0.0:9092

 

3. 카프카 재 실행 후 consumer -> producer 차례로 실행하면 hello world가 보인다

 

 

출처 : https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

 

Why Can’t I Connect to Kafka? | Troubleshoot Connectivity

How to troubleshoot connectivity between Kafka clients (Python, Java, Spring, Go, etc.) to Kafka on Docker, AWS, or any other machine.

www.confluent.io

 

반응형

'설치,명령어등 > 카프카' 카테고리의 다른 글

node-red // kafka  (0) 2022.03.14
kafka 설치/  (0) 2022.03.02

+ Recent posts