kafka

Apache Kafka Golang 생산자 소비자 튜토리얼

팬도라 2021. 2. 13. 16:55
반응형

본 문서는 Kafka 공식 문서 및 GitHub을 기준으로 작성되었음을 알린다.

Apache Kafka producer와 consumer를 구현할 때 사용하는 다양한 언어기반의 클라이언트를 제공한다. 지원하는 언어는 다음과 같다.

  • Java
  • Ruby
  • Python
  • C
  • C++
  • Go

본 글에서는 Go언어를 기반으로 Kafka Example을 작성하는 방법을 알아보도록 한다.

Kafka docker-compose.yml

애플리케이션을 실행하기 전에 Apache Kafka를 docker-compose를 통해 실행한다.

$ git clone https://github.com/confluentinc/cp-docker-images.git
$ cd cp-docker-images/examples/kafka-single-node
$ docker-compose up -d 

Consumer

Kafka에서는 consumer 구현을 위한 두 가지의 API를 제공한다. 세부적인 것들이 모두 추상화되어 있어서 몇 번의 간단한 함수 호출로 구현할 수 있는 High Level Consumer API, offset과 같은 세부적인 부분까지 다룰 수 있으나 구현하기가 까다로운 Simple Consumer API가 제공된다.

본 예제에서는 간단하게 데이터를 수신받는 기능만 구현할 것이기 때문에 High Level Consumer API를 사용한다.

package Config

import (
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func Kafka() *kafka.Consumer {


    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    return c
}

kafka.NewConsumer 부분에서 kafka 접속정보를 입력한다. 접속 에러가 발생할 경우 panic 발생하여 시스템을 종료한다.

package Basic

import (

    "fmt"
    "kafka-comsumer/Config"
)

func Consumer()  {

    c := Config.Kafka()

    c.SubscribeTopics([]string{"weather"}, nil)
    defer c.Close()

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }
}

c.SubscribeTopics 부분에서 수신받을 Topic을 지정한다. 지정된 Topic으로 부터 메시지가 수신되면 파티션으로부터 메시지를 꺼내서 출력한다. 모든 수신이 완료되면 최종적으로 defer c.Close() 을 통해서 접속을 종료한다.

package main

import (
    "fmt"
    "kafka-comsumer/Basic"
)

func main() {

    fmt.Println("Kafka Consumer Example")
    Basic.Consumer()
}

main 패키지를 생성하고 Consumer()을 호출한다.

Producer

go언어를 통해 쉽게 작성할 수 있는 Producer 예제이다.

package Config

import "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"

func Kafka() *kafka.Producer{

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

    return p
}

kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) 부분은 kafka에서 접속정보를 지정한다.

package Basic

import (
    "fmt"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
    "kafka-producer/Config"
    "kafka-producer/Sensor"
)

func Producer()  {

    p := Config.Kafka()
    defer p.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := "myTopic"

    for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
        _ = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    topic2 := "weather"

    var humid [10]string
    humid = Sensor.Humidity()

    for _, word := range humid {
        _ = p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic2, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }

    // Wait for message deliveries before shutting down
    p.Flush(15 * 1000)

}

topic := "myTopic", topic2 := "weather" 을 통해서 Topic을 지정하고 TopicPartition을 통해서 배열로 저장된 메시지를 byte 형식으로 메시지를 송신한다. go func() 부분은 go언어에서 익명 함수를 통해서 고루틴을 실행하고 메시지 전송 여부에 대해 체크한다.

두 개의 Topic이 생성되었기 때문에 Consumer에서는 수신받을 적절한 Topic을 지정해야 한다.

package Sensor

import (
    "math/rand"
    "strconv"
    "time"
)

func Humidity() [10]string {

    var val[10] string
    for i := 0; i < len(val); i++ {
        rand.Seed(time.Now().UnixNano())
        val[i] = "humid : " + strconv.Itoa(rand.Intn(100))
    }

    return val
}

위 패키지는 특별한 기능은 없으며 main에서 작성된 부분을 따로 분리했다. 지금 예제는 습도라는 가상의 센서에서 배열을 반환하여 전송하는 기능을 담당한다.

package main

import (
    "fmt"
    "kafka-producer/Basic"
)

func main() {

    fmt.Println("Kafka Producer Example")
    Basic.Producer()

}

main 패키지를 통해서 작성된 Producer()를 호출한다.

지금까지 Apache Kafka에 기본적인 개념과 CLI, 코드 실습을 진행하였다. 현재 작성된 기술만으로 모두 이해할 수는 없겠지만 본인 스스로 도움이 되었으면 한다.