-
golang: 파이프라인(pipeline)이란?Back-End/Golang 2024. 5. 15. 20:36반응형
pipeline 파이프라인(pipeline)은 데이터를 단계별로 처리하는 시스템에서 자주 사용되는 패턴입니다. 각 단계는 특정 작업을 수행하며, 한 단계의 출력이 다음 단계의 입력이 되는 방식입니다. 파이프라인 패턴을 사용하면 복잡한 작업을 작은 단계로 나누어 처리할 수 있어 코드의 이해와 유지보수가 쉬워집니다. Go 언어에서는 고루틴과 채널을 사용하여 파이프라인을 구현할 수 있습니다.
파이프라인의 구성 요소
- 생산자(Producer): 데이터를 생성하여 파이프라인에 입력하는 역할을 합니다.
- 처리자(Processor): 데이터를 처리하여 다음 단계로 전달하는 역할을 합니다.
- 소비자(Consumer): 최종 데이터를 받아 처리하는 역할을 합니다.
파이프라인의 기본 구조
- 단계(Stage): 각 단계는 독립적인 고루틴으로 실행됩니다.
- 채널(Channel): 각 단계 간 데이터를 전달하기 위해 채널을 사용합니다.
예제: 간단한 파이프라인
아래는 숫자를 생성하고, 각 숫자를 제곱한 후, 결과를 출력하는 파이프라인 예제입니다.
package main import ( "fmt" ) // producer는 데이터를 생성하여 out 채널에 보냅니다. func producer(out chan<- int) { for i := 1; i <= 5; i++ { out <- i } close(out) } // square는 입력 데이터를 제곱하여 out 채널에 보냅니다. func square(in <-chan int, out chan<- int) { for num := range in { out <- num * num } close(out) } // consumer는 최종 데이터를 받아 출력합니다. func consumer(in <-chan int) { for result := range in { fmt.Println(result) } } func main() { // 각 단계 간의 채널을 생성합니다. numbers := make(chan int) squares := make(chan int) // 각 단계의 고루틴을 실행합니다. go producer(numbers) go square(numbers, squares) consumer(squares) }
코드 설명
- producer 함수:
- out 채널에 1부터 5까지의 숫자를 생성하여 보냅니다.
- 모든 데이터를 보낸 후 out 채널을 닫습니다.
- square 함수:
- in 채널로부터 데이터를 받아 제곱한 후 out 채널로 보냅니다.
- 모든 데이터를 처리한 후 out 채널을 닫습니다.
- consumer 함수:
- in 채널로부터 데이터를 받아 출력합니다.
- main 함수:
- 각 단계 간의 데이터를 전달하기 위한 채널을 생성합니다.
- producer, square, consumer 고루틴을 실행합니다.
장점
- 모듈화: 각 단계가 독립적이므로 코드의 모듈화가 가능하고, 각 단계를 쉽게 이해하고 테스트할 수 있습니다.
- 병렬 처리: 각 단계가 고루틴으로 독립적으로 실행되므로 병렬 처리가 가능합니다.
- 유연성: 새로운 단계를 추가하거나 기존 단계를 변경하는 것이 용이합니다.
확장된 예제
더 복잡한 파이프라인을 구현해보겠습니다. 이 예제에서는 숫자를 생성하고, 필터링하고, 제곱한 후, 최종 결과를 출력합니다.
package main import ( "fmt" ) // producer는 데이터를 생성하여 out 채널에 보냅니다. func producer(out chan<- int) { for i := 1; i <= 10; i++ { out <- i } close(out) } // filter는 입력 데이터를 필터링하여 out 채널에 보냅니다. func filter(in <-chan int, out chan<- int, predicate func(int) bool) { for num := range in { if predicate(num) { out <- num } } close(out) } // square는 입력 데이터를 제곱하여 out 채널에 보냅니다. func square(in <-chan int, out chan<- int) { for num := range in { out <- num * num } close(out) } // consumer는 최종 데이터를 받아 출력합니다. func consumer(in <-chan int) { for result := range in { fmt.Println(result) } } func main() { numbers := make(chan int) filtered := make(chan int) squares := make(chan int) go producer(numbers) go filter(numbers, filtered, func(n int) bool { return n%2 == 0 }) go square(filtered, squares) consumer(squares) }
확장된 예제 설명
- filter 함수:
- in 채널로부터 데이터를 받아 조건에 맞는 데이터만 out 채널로 보냅니다.
- 여기서는 짝수만 필터링하는 예제를 사용합니다.
- main 함수:
- filter 단계를 추가하여 짝수만 제곱하도록 파이프라인을 확장합니다.
이와 같이 파이프라인 패턴을 사용하면 복잡한 데이터 처리 과정을 단순하고 명확하게 표현할 수 있습니다. 각 단계는 독립적으로 실행되며, 채널을 통해 데이터를 주고받기 때문에 고루틴 간의 동시성 문제를 효율적으로 관리할 수 있습니다.
공식홈페이지 예제
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
예제 설명
gen 함수
gen 함수는 숫자 리스트를 받아서 이를 채널에 전송하는 고루틴을 시작합니다. 모든 숫자가 채널에 전송된 후 채널을 닫습니다.
- nums ...int: 가변 인자를 사용하여 여러 개의 정수를 받을 수 있습니다.
- out := make(chan int): 정수를 전달할 채널을 생성합니다.
- go func() { ... }(): 고루틴을 시작하여 숫자를 채널에 전송하고, 모든 숫자를 전송한 후 채널을 닫습니다.
sq 함수
sq 함수는 입력 채널로부터 정수를 받아 그 제곱을 출력 채널로 전송합니다. 모든 작업이 끝나면 출력 채널을 닫습니다.
- in <-chan int: 입력 채널로부터 데이터를 받습니다.
- out := make(chan int): 출력 채널을 생성합니다.
- for n := range in: 입력 채널에서 데이터를 읽고, 이를 제곱하여 출력 채널에 전송합니다.
- close(out): 모든 데이터를 처리한 후 출력 채널을 닫습니다.
main 함수
main 함수에서는 파이프라인을 설정하고 최종 출력을 처리합니다.
- for n := range sq(sq(gen(2, 3))): gen 함수의 출력을 sq 함수에 전달하고, 그 결과를 다시 sq 함수에 전달하여 최종 출력을 처리합니다.
다양한 파이프라인 구성
파이프라인은 다양한 형태로 구성할 수 있습니다. 예를 들어, 각 단계를 별도의 함수로 분리하고, 채널을 통해 데이터를 전달하는 방식이 있습니다.
package main import ( "fmt" ) // gen generates numbers and sends them to the out channel. func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } // sq squares numbers from the in channel and sends them to the out channel. func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } func main() { // Set up the pipeline. c := gen(2, 3) out := sq(c) out2 := sq(out) // Consume the output. for n := range out2 { fmt.Println(n) // 16 then 81 } }
이와 같이 파이프라인을 구성하는 다양한 방법이 있으며, 특정 요구사항이나 코드 스타일에 따라 적절한 방법을 선택할 수 있습니다. 중요한 점은 파이프라인의 각 단계가 독립적이고, 채널을 통해 데이터를 주고받으면서 고루틴 간의 동시성을 효율적으로 관리할 수 있다는 것입니다.
done 채널을 사용한 파이프라인
파이프라인에서 done 채널을 사용하는 것은 파이프라인을 종료하거나 중단하는 기능을 추가하는 고급 기법입니다. done 채널을 사용하면 파이프라인의 어느 단계에서든 중단 신호를 받을 수 있으며, 이를 통해 전체 파이프라인을 안전하게 종료할 수 있습니다. 이러한 기법은 재귀 관계가 발생하거나 시스템 자원을 효율적으로 관리해야 할 때 유용합니다.
done 채널을 사용한 파이프라인 예제
아래 예제에서는 done 채널을 사용하여 파이프라인을 중단하고 리소스를 해제하는 방법을 설명합니다.
gen 함수 수정
gen 함수는 done 채널을 받아서, done 채널이 닫히면 즉시 작업을 중단합니다.
func gen(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out }
sq 함수 수정
sq 함수도 done 채널을 받아서, done 채널이 닫히면 즉시 작업을 중단합니다.
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
main 함수에서 done 채널 사용
main 함수에서 done 채널을 생성하고, 파이프라인의 각 단계에 전달합니다. 이 예제에서는 일정 시간 후에 done 채널을 닫아 파이프라인을 중단합니다.
package main import ( "fmt" "time" ) func gen(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out } func main() { done := make(chan struct{}) defer close(done) c := gen(done, 2, 3, 4, 5, 6) out := sq(done, c) // Consume the output. for n := range out { fmt.Println(n) } // Simulate a condition to cancel the pipeline. time.Sleep(1 * time.Second) close(done) }
코드 설명
- gen 함수:
- done 채널을 추가로 받아들입니다.
- select 문을 사용하여 done 채널이 닫히면 즉시 반환합니다.
- sq 함수:
- done 채널을 추가로 받아들입니다.
- select 문을 사용하여 done 채널이 닫히면 즉시 반환합니다.
- main 함수:
- done 채널을 생성합니다.
- gen과 sq 함수에 done 채널을 전달합니다.
- time.Sleep 후 done 채널을 닫아 파이프라인을 중단합니다.
중요한 점
- 선점 가능: done 채널을 사용하면 파이프라인의 어느 단계에서든 작업을 중단하고 리소스를 해제할 수 있어 전체 시스템이 선점 가능합니다.
- 재귀 관계 발생 방지: done 채널을 사용하여 파이프라인을 중단하면 재귀적으로 발생할 수 있는 문제를 방지할 수 있습니다.
- 리소스 해제: done 채널을 사용하면 고루틴과 채널이 안전하게 종료되어 리소스가 해제됩니다.
이러한 기법은 복잡한 파이프라인을 관리하고 안정적으로 운영하는 데 매우 유용합니다. done 채널을 통해 전체 파이프라인을 제어함으로써 효율적이고 안전한 동시성 처리가 가능합니다.
결론
함수 호출 안에 함수 호출을 하는 형태 (sq(sq(gen(2,3))))가 파이프라인 패턴의 본질은 아닙니다. 파이프라인 패턴의 본질은 데이터가 여러 단계의 처리 과정을 거치는 구조를 갖는다는 것입니다. 각 단계는 독립적으로 정의되며, 데이터를 처리하고 다음 단계로 전달합니다.
함수 호출 안에 함수 호출을 하는 형태는 파이프라인을 구성하는 한 가지 방법일 뿐입니다. 중요한 것은 데이터가 채널을 통해 각 단계에서 처리된다는 점입니다. Go 언어에서 파이프라인 패턴을 구현할 때는 고루틴과 채널을 사용하여 각 단계를 독립적으로 실행하고, 데이터를 채널을 통해 전달합니다.
파이프라인 패턴의 핵심은 각 단계가 독립적으로 데이터를 처리하고, 데이터를 다음 단계로 전달하는 구조를 갖는다는 것입니다. sq(sq(gen(2, 3))) 형태는 함수의 결과를 연쇄적으로 전달하는 한 가지 방법일 뿐이며, 이를 명확하게 분리된 단계로 표현해도 여전히 파이프라인 패턴입니다. 중요한 것은 데이터가 각 단계를 거쳐 처리된다는 점입니다.
반응형'Back-End > Golang' 카테고리의 다른 글
golang: 제네릭(Generic)이란? (0) 2024.08.03 golang: Tee 패턴이란? (1) 2024.05.15 golang: 배열(Array)과 슬라이스(Slice) (0) 2024.05.11 고루틴 누수(Goroutine Leak)란? (0) 2024.05.10 golang: 1급 시민(First-class citizen)이란? (1) 2024.04.28