Connecting

Go 언어 고루틴 / 채널 / 동기화 / 셀렉트 본문

Go 언어

Go 언어 고루틴 / 채널 / 동기화 / 셀렉트

팬도라 2020. 7. 23. 02:05
반응형

고루틴

고루틴 (goroutine)은 가벼운 스레드와 같은 것으로 수행 흐름과 별개로 병렬처리가 가능하게 합니다. OS에서 스케줄링으로 관리되는 스레드(약 1MB)보다 가볍기 때문에 (약 8kbyte) 자신의 코어갯수보다 많이 실행해도 무리 없이 동작한다는 장점이 있습니다. 고루틴은 고 런타임이 관리하고 고채널을 통해 고루틴간의 통신을 할 수 있습니다.

Go 언어에서 고루틴을 실행하는 방법은 아래와 같이 매우 간단합니다.

    go Hello() // 일반함수를 통한 고루틴 실행
    for i := 0; i < 3; i++ {
    go func(n int) { // 익명함수(클로저)를 통한 고루틴 실행
            f.Println("goroutine : ", n)
            oneTime.Do(Hello)
        }(i)
    }

고루틴을 사용하고자 하는 곳에 go 키워드를 사용함으로서 사용할 수 있으며, 이때 현재 함수의 실행 흐름과는 논리적으로 분리되어 동작합니다. 본격적으로 고루틴의 사용법을 익히기 전에 고루틴이 어떠한 방식에 기초하여 동작하는지에 대해 알고 넘어갈 필요가 있습니다.

OS에서 여러 프로그래밍이 동작할 때 동시성 혹은 병렬성으로 동작합니다. 흔히 동시성이라고 하면 병렬성과 비슷한 개념이라고 생각할 수 있지만 이는 명확히 다른 개념입니다. 동시성은 싱글 코어에서 멀티 스레드를 동작시키는 논리적인 개념으로 한번에 여러개가 동시에 실행되는 것 처럼 보이게 됩니다. 하지만 병렬성은 물리적으로 동시에 여러작업을 처리할 수 있기 때문에 멀티 코어에서 멀티 스레드를 동작시키는 방식입니다.

Go 1.5 버전 이전에는 CPU 코어 하나만 사용하도록 기본 설정되어 있기 때문에 여러개의 고루틴을 만들더라도 동시성으로 동작하였으나 현재는 물리 CPU 개수만큼 사용하도록 설정되어 각 코어에서 시분할 ( Concurrent ) 처리로 동작합니다. 만약 여러 CPU를 병렬로 실행하고자 하는 코드를 작성하고자 하면 runtime.GOMAXPROCS(runtime.NumCPU()) 함수를 호출하여 시스템의 모든 코어를 사용하도록 설정할 수 있습니다.

다음 예제는 간단하게 고루틴 활용한 예제입니다.

package main

import f "fmt"

func main() {

    f.Println("goroutine 경량 실행 스레드")

    go Hello()
    f.Scanln()
}

func Hello() {

    f.Println("Hello World")

}

채널

Go 언어에서 채널이란 고루틴끼리 데이터를 주고받는 통로(파이프)의 역할을 수행합니다. make(chan 자료형)을 통해 함수를 생성해야 하며, 채널 연산자 <-를 사용한다는 특징을 가지고 있습니다. 채널은 어떠한 데이터 타입도 주고 받을 수 있고 실행의 흐름을 동기화 할 수 있습니다.

Go 언어에서 채널을 생성하는 방법은 2가지가 존재하는데 하나는 일반 함수를 사용하는 방법이고, 다른 하나는 익명함수를 사용하는 방법입니다. 다음 예제를 통해서 간단한 채널을 만들고 실행해 보겠습니다.

package main

import f "fmt"

func main() {

    msg := make(chan string)

    go func() { msg <- "ping" }()

    result := <-msg
    f.Println(result)

}

고루틴으로 생성된 string 타입의 채널은 result 라는 변수에 저장되게 되는 구조로, 기본적으로 송신과 수신은 송신자와 수신자가 준비될 때까지 블로킹됩니다. 이 특징은 다른 동기화 작업을 하지 않고도 ping 메시지를 수신받습니다.

다음은 일반함수로 생성된 고루틴을 함수로 통해 채널을 전달받아 계산하는 예제입니다.

package main

import f "fmt"

func main() {

    f.Println("Channels")

    c := make(chan int)
    go sum(10, 20, c)

    result := <-c

    f.Println(result)

}

func sum(a int, b int, c chan int) {

    c <- a + b

}

sum 함수가 고루틴으로 생성된 후, 매개변수로 채널을 넘겼습니다. 이때 살펴봐야 할 점은 sum 함수에서 c라는 변수의 리턴값이 존재하지 않는다는 것인데, 이는 채널이 레퍼런스 타입이기 때문입니다.

Go 언어 채널에는 Unbuffered Channel과 Buffered Channel 2가지 채널이 존재합니다. Unbuffered Channel은 위에서 살펴본 것과 같이 하나의 하나의 수신자가 송신자로 부터 데이터를 받을 때까지 블록킹하지만, Buffered Channel은 수신자가 데이터를 받을 준비가 되어 있더라도 송신자의 지정된 버퍼만큼 데이터를 보내고 다른일을 수행할 수 있도록 합니다. 다음 예제를 통해서 살펴보록 하겠습니다.

package main

import f "fmt"

func main() {

    f.Println("beffered")

    messages := make(chan string, 2) // string 타입의 채널을 만들고 해당 채널의 버퍼는 2이다. 
    messages <- "wisoft"
    messages <- "lab"

    f.Println(<-messages)
    f.Println(<-messages)

}

위의 예제에서는 버퍼의 크기가 2이지만 이를 초과하는 데이터를 채널에 넣게 되면 어떠한 일이 발생하는지 확인해 보도록 하겠습니다. 위 코드에서 message 채널에 string 변수를 넣고 이를 출력해 보겠습니다.

    messages <- "golang"
    f.Println(<-messages)

버퍼의 크기가 2인 messages 변수에 string 변수가 하나 더 들어갔기 때문에 버퍼의 크기는 초과하게 되고 결과적으로 다음과 같은 에러가 발생하게 됩니다.

fatal error: all goroutines are asleep - deadlock!

따라서 버퍼의 크기를 정하게 되면, 해당 채널에 할당된 버퍼의 크기가 넘지 않도록 주의해야 합니다.

다음은 고루틴간의 채널을 동기화 하는 방법을 알아보도록 하겠습니다. 이전 시간에서 고루틴을 사용하고 나서, 프로그램의 종료는 막기 위해서 f.Scanln()을 사용했는데 이는 main 함수와 고루틴이 동시에 실행되어 바로 종료과 되는 상황을 막기 위함이었습니다. 이러한 문제를 해결하기 위해서 고루틴이 끝날때까지 대기하기 위해 블로킹 리시브를 사용하는 방법을 다음 예제를 통해 알아보겠습니다.

package main

import (
    f "fmt"
    "time"
)

func main() {

    f.Println("synchronization")

    done := make(chan bool, 1)
    go worker(done)

    <-done
}

func worker(done chan bool) {

    f.Println("working...")
    time.Sleep(time.Second)
    f.Println("done")

    done <- true
}

worker 함수는 done 변수의 bool 채널을 매개변수로 입력받고 해당 함수가 호출시 랜덤한 시간만큼 수행 후에 true 값을 전달하여 함수의 작업이 끝났음을 전달합니다. 따라서 채널은 worker 함수로부터 알림을 받을 때까지 블로킹합니다.

채널은 고루틴끼리 통신하기 위한 양방향 통로(파이프)이지만 채널을 함수의 인자로 사용할 때는 채널이 수신용인지 송신용인지를 정해서 타입의 안전성을 향상시킬 수 있습니다. 다음 예제를 실행해 보도록 하겠습니다.

package main

import f "fmt"

func main() {

    f.Println("Channel direction")

    pings := make(chan string, 1)
    pongs := make(chan string, 1)

    ping(pings, "passed message")
    pong(pings, pongs)

    f.Println(<-pongs)

}

func ping(pings chan<- string, msg string) { // 송신 전용 채널

    pings <- msg

}

func pong(pings <-chan string, pongs chan<- string) { // 수신 전용 채널

    msg := <-pings
    pongs <- msg

}

ping 함수는 송신용 채널만을 받습니다. 이 채널에 수신용 채널을 넣으면 컴파일시 에러가 발생합니다. pong 함수는 수신용 채널인 (pings)를 하나 받고 송신용인 (pongs)를 두 번째 인자로 받습니다.

동기화

OS를 기본적으로 공부했던 개발자라면 동기화가 얼마나 중요한 개념인지 다들 알고 계실 것입니다. 다중 처리기, 다중 코어에서 어떠한 작업을 분할에서 작업할 시, 해당 작업이 이전 작업 혹은 다른 작업의 같은 메모리 영역을 건들일 경우 예기치 않은 동작이나 치명적인 에러를 나타낼 수 있습니다.

OS 내부적으로 원자적 연산을 보장하기 위해 다양한 기법을 사용하지만, 응용 프로그래머 입장에서도 멀티코어를 잘 처리하기 위해서는 코드안에서의 동기화 처리가 매우 중요합니다. 다행인 점이라면 Go 언어에서는 동기화 처리를 위한 문법이 상당히 간결하면서도 강력하며, 쉽게 멀티 코어를 지원하는 프로그래밍을 작성할 수 있다는 점입니다. Go 언어에서는 Mutex를 사용합니다.

컴퓨터 과학에서 (lock) 또는 뮤텍스(mutex, 상호 배제에서)는 여러 스레드를 실행하는 환경에서 자원에 대한 접근에 제한을 강제하기 위한 동기화 매커니즘이다. 락은 상호 배제 동시성 제어 정책을 강제하기 위해 설계된다.

Go 언어에서 뮤텍스를 선언하고 해제하는 방법은 다음과 같습니다.

mutex.Lock() 
mutex.Unlock()

읽기/쓰기가 동시에 일어나는 작업의 경우 다음과 같이 뮤텍스를 선언하고 해제합니다.

rwMutex.Lock()
rwMutex.RUnlock()

고루틴에서 뮤텍스를 사용하고 다른 고루틴에게 CPU를 양보하기 위해서는 다음을 반드시 사용해야 합니다.

runtime.Gosched()

다음 예제를 통해 뮤텍스를 사용하지 않았을 때 발생하는 고루틴간의 경쟁사항을 알아보도록 하겠습니다.

runtime.GOMAXPROCS(runtime.NumCPU()) // 모든 CPU 사용
fmt.Println(runtime.NumCPU()) // 본인 PC가 사용하고 있는 코어 및 HT(intel) or SMT(AMD) 갯수

    var data = []int{} // int형 슬라이스 생성

    go func() {                             
        for i := 0; i < 1000; i++ {     
            data = append(data, 1)  

            runtime.Gosched()       
        }
    }()

    go func() {                             
        for i := 0; i < 1000; i++ {     
            data = append(data, 1)  

            runtime.Gosched()       
        }
    }()

    time.Sleep(2 * time.Second)      
    fmt.Println(len(data))           

위의 예제를 살펴보면 int형 슬라이스에서 2개의 고루틴이 1000번 반복하며 슬라이스에 append 하는 것을 확인할 수 있습니다. 우리가 예상하는 data의 길이는 2000이 되어야 하지만 실질적으로 실행하면 2000이 나오지 않습니다. 이는 두개의 고루틴이 하나의 data에 접근하여 경쟁사항을 만들었기 때문이며 이를 경쟁 조건(Race condition)이라고 말합니다. 위와 같은 문제를 해결하기 위해서 우리는 뮤텍스를 사용하여 경쟁 조건이 발생하지 않도록 조치해야 합니다.

go func() {                             
        for i := 0; i < 1000; i++ {     
            mutex.Lock()            
            data = append(data, 1)  
            mutex.Unlock()         

            runtime.Gosched()       
        }
    }()

    go func() {                             
        for i := 0; i < 1000; i++ {     
            mutex.Lock()            
            data = append(data, 1)  
            mutex.Unlock()          

            runtime.Gosched()
        }
    }()

이전 코드에서 달라진 점은 뮤텍스의 사용유무라는 것을 알 수 있습니다. 이를 통해 경쟁 조건을 만족하여 데이터의 길이는 정확히 2000이 나오는 것을 확인할 수 있을 것입니다. 하지만 여기서 주의해야 할 점은 뮤텍스에서 Lock을 걸고 나면 반드시 Unlock을 해야 한다는 점입니다. 이를 주의하지 않고 뮤텍스를 사용할 경우 교착상태 (deadlock)이 발생하여 코드가 멈추게 됩니다. 멀티 스레드 사황에서는 교착상태가 얼마든지 나타날 수 있으며, 한번 발생하게 되면 문제점을 파악하는데 많은 시간이 소요될 수 있습니다.

멀티스레드를 활용한 프로그래밍 작성이 처음이라면 이러한 문제가 발생하지 않도록 반드시 주의하면서 작성해야 합니다. 다음은 뮤텍스 사용을 좀더 고도화 하여 읽기/쓰기 작업을 진행할 때의 경쟁사항과 이를 해결하는 코드 예제를 살펴보도록 하겠습니다.

package main

import (
    f "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())
    Condition()
    f.Println("-------------------")
    Mutex()

}

func Condition() {

    data := 0

    go func() {
        for i := 0; i < 3; i++ {
            data += 1
            f.Println("write : ", data)
            time.Sleep(10 * time.Millisecond)
        }
    }()

    go func() {
        for i := 0; i < 3; i++ {
            f.Println("read : ", data)
            time.Sleep(1 * time.Second)
        }
    }()

    go func() {
        for i := 0; i < 3; i++ {
            f.Println("read2 : ", data)
            time.Sleep(2 * time.Second)
        }
    }()

    time.Sleep(10 * time.Second)

}

func Mutex() {

    data := 0
    rwMutex := new(sync.RWMutex)

    go func() {
        for i := 0; i < 3; i++ {
            rwMutex.Lock()
            data += 1
            f.Println("write : ", data)
            time.Sleep(10 * time.Millisecond)
            rwMutex.Unlock()
        }
    }()

    go func() {
        for i := 0; i < 3; i++ {
            rwMutex.Lock()
            f.Println("read : ", data)
            time.Sleep(1 * time.Second)
            rwMutex.Unlock()
        }
    }()

    go func() {
        for i := 0; i < 3; i++ {
            rwMutex.Lock()
            f.Println("read2 : ", data)
            time.Sleep(2 * time.Second)
            rwMutex.Unlock()
        }
    }()

    time.Sleep(10 * time.Second)

}

위의 코드를 살펴보면 data라는 int형 공통 변수 안에서 반복문을 통해 읽기와 쓰기 작업이 고루틴으로 동시에 작업하는 것을 확인할 수 있습니다. 경쟁상황에서는 재대로 값이 출력되지 않는 문제가 발생하는데 OS에서는 이를 Readers-Writers Problem라고 합니다. 이러한 문제를 해결하기 위해 Go 언어에서 제공하는 rwMutex를 통해서 경쟁조건을 막아주고 쓰기 작업 중 읽기 작업이 동시에 일어나지 않도록 조치해야 합니다.

복잡한 반복문과 고루틴이 동작하고 있는 상황에서 초기화를 위해 딱 한번만 실행을 해야 하는 경우 sync.Once를 사용하고 Do를 사용합니다. 다음 예제를 통해 살펴보도록 하겠습니다.

package main

import (
    "fmt"
    "sync"
)

var doOnce sync.Once

func main() {
    DoSomething()
    DoSomething()
    DoSomething()
}

func DoSomething() {
    doOnce.Do(func() {
        fmt.Println("Run once - first time, loading...")
    })
    fmt.Println("Run this every time")
}

셀렉트

채널을 통해 다중 연산을 하는 경우 이를 손쉽게 사용할 수 있는 셀렉트라는 분기문을 제공합니다. 이를 통해 원하는 채널에 값이 들어왔을 때만 해당 분기문을 실행하기 때문에 다중 채널을 통한 연산시 동기화 연산으로 확장시킬 수 있습니다.

다음 예제를 통해 기본 동작을 확인해 보도록 하겠습니다.

package main

import (
    f "fmt"
    "time"
)

func main() {

    f.Println("select")

    c1 := make(chan string)
    c2 := make(chan string)

    go func() { // 익명함수를 통한 고루틴 생성
        time.Sleep(time.Second * 1) // 1초간 대기
        c1 <- "one" // c1 채널에 string 값 one을 전송 
    }()

    go func() {
        time.Sleep(time.Second * 2)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select { // 해당 select 분기문 안에 있는 채널에 값이 들어왔을 때 case 내용을 실행한다. 
        case msg1 := <-c1:
            f.Println("received", msg1)
        case msg2 := <-c2:
            f.Println("received", msg2)
        }
    }

}

select 분기문도 default 케이스를 지정할 수 있으며 case에 지정된 채널에 값이 들어오지 않았을 때 즉시 실행됩니다. 단, default에 적절한 처리를 하지 않으면 CPU 코어를 모두 점유하므로 주의해야 합니다.

Comments