golang: Tee 패턴이란?
Tee 패턴은 하나의 입력 채널에서 읽은 데이터를 여러 출력 채널로 복사하여 동시에 여러 곳에서 사용할 수 있게 하는 패턴입니다. 이 패턴은 이름 그대로 "티" 모양처럼 데이터를 여러 갈래로 나누는 기능을 합니다. 이 패턴은 데이터를 여러 곳에서 병렬로 처리할 때 유용합니다.
Tee 패턴의 구조
- 입력 채널: 데이터를 제공하는 채널입니다.
- 출력 채널들: 입력 채널에서 읽은 데이터를 복사하여 전달할 여러 개의 출력 채널입니다.
예제 코드
아래 예제에서는 tee 함수가 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 기능을 수행합니다.
package main
import (
"fmt"
"sync"
)
// tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다.
func tee(done <-chan struct{}, in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range in {
select {
case out1 <- val:
case <-done:
return
}
select {
case out2 <- val:
case <-done:
return
}
}
}()
return out1, out2
}
func main() {
done := make(chan struct{})
defer close(done)
// 입력 채널 생성
in := make(chan int)
// tee 함수 호출하여 두 개의 출력 채널 생성
out1, out2 := tee(done, in)
var wg sync.WaitGroup
wg.Add(2)
// 첫 번째 출력 채널에서 데이터 소비
go func() {
defer wg.Done()
for val := range out1 {
fmt.Println("out1:", val)
}
}()
// 두 번째 출력 채널에서 데이터 소비
go func() {
defer wg.Done()
for val := range out2 {
fmt.Println("out2:", val)
}
}()
// 입력 채널에 데이터 전송
go func() {
defer close(in)
for i := 1; i <= 5; i++ {
in <- i
}
}()
wg.Wait()
}
코드 설명
- tee 함수:
- done: 파이프라인을 중단하기 위한 채널입니다.
- in: 입력 채널입니다.
- out1, out2: 두 개의 출력 채널입니다.
- 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사합니다.
- select 문을 사용하여 done 채널이 닫히면 즉시 반환합니다.
- main 함수:
- done 채널을 생성하여 파이프라인 종료를 제어합니다.
- in 채널을 생성하여 입력 데이터를 전달합니다.
- tee 함수를 호출하여 두 개의 출력 채널을 생성합니다.
- wg를 사용하여 두 개의 출력 고루틴이 종료될 때까지 대기합니다.
- 두 개의 출력 고루틴에서 각각 출력 채널에서 데이터를 소비하고 출력합니다.
- 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.
주요 포인트
- 동시성: 입력 데이터를 동시에 여러 출력 채널로 복사하여 병렬로 처리할 수 있습니다.
- 채널 관리: 채널을 통해 데이터를 안전하게 전달하고, select 문을 사용하여 파이프라인을 종료할 수 있습니다.
- 리소스 해제: done 채널을 사용하여 파이프라인을 종료하면, 고루틴과 채널이 안전하게 종료됩니다.
추가 Tee 예시
func Tee(
done,
c chan interface{}) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, c) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
주요 부분 설명
1. Tee 함수 서명:
- done: 파이프라인을 중단할 수 있는 채널입니다.
- c: 입력 데이터를 제공하는 채널입니다.
- 반환 값으로 두 개의 출력 채널을 반환합니다.
2. 출력 채널 생성:
out1 := make(chan interface{})
out2 := make(chan interface{})
3. 고루틴 생성:
- 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 고루틴을 생성합니다.
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, c) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
4. orDone 함수 호출:
- orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다. 이 함수의 구현은 예제에 없지만, 일반적으로는 아래와 같이 구현됩니다:
func orDone(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
5. var out1, out2 = out1, out2:
- 이 부분은 각각의 출력 채널에 대한 복사본을 생성하는 것입니다. 이 복사본은 각 select 블록 내에서 특정 채널에 대한 쓰기를 중단하는 데 사용됩니다.
- out1이나 out2 중 하나에 데이터를 성공적으로 쓰면, 해당 채널을 nil로 설정하여 현재 값이 다른 채널로 중복되어 전송되는 것을 방지합니다.
코드의 핵심 부분 상세 설명
특히 헷갈리는 부분인 var out1, out2 = out1, out2와 관련된 로직을 설명드리겠습니다.
var out1, out2 = out1, out2:
for val := range orDone(done, c) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
// we set the shadowed copy to nil, to block further writes for current val
out1 = nil
case out2 <- val:
// we set the shadowed copy to nil, to block further writes for current val
out2 = nil
}
}
}
- var out1, out2 = out1, out2:
- 이 구문은 out1과 out2의 새 변수를 만듭니다. 이 변수들은 for 루프 안에서만 유효하며, 원래의 out1과 out2를 그림자(shadow)처럼 덮어씁니다.
- 각 select 문에서 성공적으로 쓰기가 이루어지면 해당 채널 변수를 nil로 설정하여 현재 값이 중복해서 다른 채널로 전송되지 않도록 합니다.
- select 문:
- 첫 번째 select 문에서 out1에 데이터를 성공적으로 쓰면 out1을 nil로 설정하여 두 번째 반복 시 out1에 다시 쓰는 것을 방지합니다.
- 두 번째 select 문에서 out2에 데이터를 성공적으로 쓰면 out2를 nil로 설정하여 두 번째 반복 시 out2에 다시 쓰는 것을 방지합니다.
예제 실행 흐름
- 입력 채널 c에서 데이터를 읽습니다:
- orDone 함수는 done 채널을 감시하면서 c에서 데이터를 읽어 반환합니다.
- 데이터를 출력 채널로 복사합니다:
- 각 val에 대해 두 번의 select 문이 실행됩니다.
- 첫 번째 select 문에서 out1에 성공적으로 데이터를 쓰면 out1을 nil로 설정합니다.
- 두 번째 select 문에서 out2에 성공적으로 데이터를 쓰면 out2을 nil로 설정합니다.
이렇게 하면 각 값이 정확히 한 번씩만 각 출력 채널로 전송됩니다. 이 과정에서 done 채널을 통해 파이프라인을 중단할 수 있습니다.
이해를 돕기 위해 요약하자면, 이 코드는 done 채널을 통해 파이프라인을 중단할 수 있도록 하면서 입력 채널의 데이터를 두 개의 출력 채널로 복사합니다. 각 값은 정확히 한 번씩만 출력 채널로 전송됩니다.
다중 출력 채널을 위한 Tee 함수
out1, out2, out3, ... out100과 같이 여러 개의 출력 채널을 사용하는 것도 가능합니다. 하지만 이 경우에는 코드를 조금 더 복잡하게 작성해야 합니다. 각 출력 채널로 데이터를 복사하는 로직을 동적으로 처리할 수 있도록 일반화할 필요가 있습니다.
다중 출력 채널을 위한 일반화된 Tee 함수
아래는 out 채널이 동적으로 여러 개 생성되도록 수정한 예제입니다.
package main
import (
"fmt"
"sync"
)
// tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다.
func tee(done <-chan struct{}, in <-chan interface{}, n int) []<-chan interface{} {
outs := make([]chan interface{}, n)
for i := range outs {
outs[i] = make(chan interface{})
}
go func() {
defer func() {
for _, out := range outs {
close(out)
}
}()
for val := range orDone(done, in) {
var wg sync.WaitGroup
wg.Add(len(outs))
for _, out := range outs {
go func(c chan interface{}) {
defer wg.Done()
select {
case c <- val:
case <-done:
}
}(out)
}
wg.Wait()
}
}()
outChans := make([]<-chan interface{}, len(outs))
for i := range outs {
outChans[i] = outs[i]
}
return outChans
}
// orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다.
func orDone(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
func main() {
done := make(chan struct{})
defer close(done)
in := make(chan interface{})
// tee 함수 호출하여 3개의 출력 채널 생성
outChans := tee(done, in, 3)
var wg sync.WaitGroup
wg.Add(len(outChans))
// 각 출력 채널에서 데이터 소비
for i, out := range outChans {
go func(out <-chan interface{}, idx int) {
defer wg.Done()
for val := range out {
fmt.Printf("out%d: %v\n", idx+1, val)
}
}(out, i)
}
// 입력 채널에 데이터 전송
go func() {
defer close(in)
for i := 1; i <= 5; i++ {
in <- i
}
}()
wg.Wait()
}
코드 설명
- tee 함수:
- done: 파이프라인을 중단하기 위한 채널입니다.
- in: 입력 데이터를 제공하는 채널입니다.
- n: 생성할 출력 채널의 수입니다.
- outs: n개의 출력 채널을 생성합니다.
- 고루틴을 생성하여 입력 채널에서 데이터를 읽고, 각 출력 채널로 데이터를 복사합니다.
- 각 출력 채널에 데이터를 전송할 때 sync.WaitGroup을 사용하여 모든 채널로 데이터가 전송될 때까지 대기합니다.
- orDone 함수:
- done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다.
- main 함수:
- done 채널을 생성하여 파이프라인 종료를 제어합니다.
- 입력 채널 in을 생성하여 입력 데이터를 전달합니다.
- tee 함수를 호출하여 n개의 출력 채널을 생성합니다.
- 각 출력 고루틴에서 각각의 출력 채널에서 데이터를 소비하고 출력합니다.
- 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.
요약
이 예제에서는 tee 함수가 n개의 출력 채널을 생성하도록 일반화되었습니다. 입력 채널에서 데이터를 읽어 각 출력 채널로 데이터를 전송하는 고루틴을 동적으로 생성하여 처리합니다. 이 방식은 여러 개의 출력 채널을 동적으로 생성하고 관리할 수 있어, 다중 출력 채널을 사용하는 다양한 시나리오에 유연하게 대응할 수 있습니다.
Tee 패턴의 활용
- 로깅 및 모니터링: 실시간으로 들어오는 데이터를 여러 곳에서 동시에 처리하여 로깅하거나 모니터링할 수 있습니다.
- 데이터 분배: 하나의 데이터 스트림을 여러 처리기로 분배하여 병렬로 처리할 수 있습니다.
- 백업 및 복제: 데이터를 동시에 여러 저장소에 백업하거나 복제할 수 있습니다.
Tee 패턴은 데이터의 병렬 처리 및 분배를 쉽게 구현할 수 있는 유용한 동시성 패턴입니다. 이 패턴을 이해하고 활용하면 Go 언어의 동시성 프로그래밍에서 더욱 효율적이고 유연한 설계를 할 수 있습니다.