Connecting

Apache Kafka Golang 생산자 소비자 튜토리얼 본문

kafka

Apache Kafka Golang 생산자 소비자 튜토리얼

팬도라 2021. 2. 13. 16:55
반응형

본 문서는 Kafka 공식 문서 및 GitHub을 기준으로 작성되었음을 알린다.

Apache Kafka producer와 consumer를 구현할 때 사용하는 다양한 언어기반의 클라이언트를 제공한다. 지원하는 언어는 다음과 같다.

  • Java
  • Ruby
  • Python
  • C
  • C++
  • Go

본 글에서는 Go언어를 기반으로 Kafka Example을 작성하는 방법을 알아보도록 한다.

Kafka docker-compose.yml

애플리케이션을 실행하기 전에 Apache Kafka를 docker-compose를 통해 실행한다.

$ git clone https://github.com/confluentinc/cp-docker-images.git
$ cd cp-docker-images/examples/kafka-single-node
$ docker-compose up -d 

Consumer

Kafka에서는 consumer 구현을 위한 두 가지의 API를 제공한다. 세부적인 것들이 모두 추상화되어 있어서 몇 번의 간단한 함수 호출로 구현할 수 있는 High Level Consumer API, offset과 같은 세부적인 부분까지 다룰 수 있으나 구현하기가 까다로운 Simple Consumer API가 제공된다.

본 예제에서는 간단하게 데이터를 수신받는 기능만 구현할 것이기 때문에 High Level Consumer API를 사용한다.

package Config

import (
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func Kafka() *kafka.Consumer {


    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    return c
}

kafka.NewConsumer 부분에서 kafka 접속정보를 입력한다. 접속 에러가 발생할 경우 panic 발생하여 시스템을 종료한다.

package Basic

import (

    "fmt"
    "kafka-comsumer/Config"
)

func Consumer()  {

    c := Config.Kafka()

    c.SubscribeTopics([]string{"weather"}, nil)
    defer c.Close()

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }
}

c.SubscribeTopics 부분에서 수신받을 Topic을 지정한다. 지정된 Topic으로 부터 메시지가 수신되면 파티션으로부터 메시지를 꺼내서 출력한다. 모든 수신이 완료되면 최종적으로 defer c.Close() 을 통해서 접속을 종료한다.

package main

import (
    "fmt"
    "kafka-comsumer/Basic"
)

func main() {

    fmt.Println("Kafka Consumer Example")
    Basic.Consumer()
}

main 패키지를 생성하고 Consumer()을 호출한다.

Producer

go언어를 통해 쉽게 작성할 수 있는 Producer 예제이다.

package Config

import "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

func Kafka() *kafka.Producer{

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

    return p
}

kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) 부분은 kafka에서 접속정보를 지정한다.

package Basic

import (
    "fmt"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    "kafka-producer/Config"
    "kafka-producer/Sensor"
)

func Producer()  {

    p := Config.Kafka()
    defer p.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := "myTopic"

    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        _ = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    topic2 := "weather"

    var humid [10]string
    humid = Sensor.Humidity()

    for _, word := range humid {
        _ = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic2, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // Wait for message deliveries before shutting down
    p.Flush(15 * 1000)

}

topic := "myTopic", topic2 := "weather" 을 통해서 Topic을 지정하고 TopicPartition을 통해서 배열로 저장된 메시지를 byte 형식으로 메시지를 송신한다. go func() 부분은 go언어에서 익명 함수를 통해서 고루틴을 실행하고 메시지 전송 여부에 대해 체크한다.

두 개의 Topic이 생성되었기 때문에 Consumer에서는 수신받을 적절한 Topic을 지정해야 한다.

package Sensor

import (
    "math/rand"
    "strconv"
    "time"
)

func Humidity() [10]string {

    var val[10] string
    for i := 0; i < len(val); i++ {
        rand.Seed(time.Now().UnixNano())
        val[i] = "humid : " + strconv.Itoa(rand.Intn(100))
    }

    return val
}

위 패키지는 특별한 기능은 없으며 main에서 작성된 부분을 따로 분리했다. 지금 예제는 습도라는 가상의 센서에서 배열을 반환하여 전송하는 기능을 담당한다.

package main

import (
    "fmt"
    "kafka-producer/Basic"
)

func main() {

    fmt.Println("Kafka Producer Example")
    Basic.Producer()

}

main 패키지를 통해서 작성된 Producer()를 호출한다.

지금까지 Apache Kafka에 기본적인 개념과 CLI, 코드 실습을 진행하였다. 현재 작성된 기술만으로 모두 이해할 수는 없겠지만 본인 스스로 도움이 되었으면 한다.

'kafka' 카테고리의 다른 글

Apache Kafka 생산자 소비자 CLI 튜토리얼  (0) 2021.01.26
Apache Kafka 클러스터 구성을 홀수로 하는 이유  (0) 2021.01.13
아파치 카프카 설치하기  (0) 2020.10.07
Apache Kafka 기본  (0) 2020.09.22
Apache kafka 개요  (2) 2020.09.08
Comments