-
golang: Tee 패턴이란?Back-End/Golang 2024. 5. 15. 21:12반응형
Tee 패턴은 하나의 입력 채널에서 읽은 데이터를 여러 출력 채널로 복사하여 동시에 여러 곳에서 사용할 수 있게 하는 패턴입니다. 이 패턴은 이름 그대로 "티" 모양처럼 데이터를 여러 갈래로 나누는 기능을 합니다. 이 패턴은 데이터를 여러 곳에서 병렬로 처리할 때 유용합니다.
Tee 패턴의 구조
- 입력 채널: 데이터를 제공하는 채널입니다.
- 출력 채널들: 입력 채널에서 읽은 데이터를 복사하여 전달할 여러 개의 출력 채널입니다.
예제 코드
아래 예제에서는 tee 함수가 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 기능을 수행합니다.
package main import ( "fmt" "sync" ) // tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다. func tee(done <-chan struct{}, in <-chan int) (<-chan int, <-chan int) { out1 := make(chan int) out2 := make(chan int) go func() { defer close(out1) defer close(out2) for val := range in { select { case out1 <- val: case <-done: return } select { case out2 <- val: case <-done: return } } }() return out1, out2 } func main() { done := make(chan struct{}) defer close(done) // 입력 채널 생성 in := make(chan int) // tee 함수 호출하여 두 개의 출력 채널 생성 out1, out2 := tee(done, in) var wg sync.WaitGroup wg.Add(2) // 첫 번째 출력 채널에서 데이터 소비 go func() { defer wg.Done() for val := range out1 { fmt.Println("out1:", val) } }() // 두 번째 출력 채널에서 데이터 소비 go func() { defer wg.Done() for val := range out2 { fmt.Println("out2:", val) } }() // 입력 채널에 데이터 전송 go func() { defer close(in) for i := 1; i <= 5; i++ { in <- i } }() wg.Wait() }
코드 설명
- tee 함수:
- done: 파이프라인을 중단하기 위한 채널입니다.
- in: 입력 채널입니다.
- out1, out2: 두 개의 출력 채널입니다.
- 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사합니다.
- select 문을 사용하여 done 채널이 닫히면 즉시 반환합니다.
- main 함수:
- done 채널을 생성하여 파이프라인 종료를 제어합니다.
- in 채널을 생성하여 입력 데이터를 전달합니다.
- tee 함수를 호출하여 두 개의 출력 채널을 생성합니다.
- wg를 사용하여 두 개의 출력 고루틴이 종료될 때까지 대기합니다.
- 두 개의 출력 고루틴에서 각각 출력 채널에서 데이터를 소비하고 출력합니다.
- 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.
주요 포인트
- 동시성: 입력 데이터를 동시에 여러 출력 채널로 복사하여 병렬로 처리할 수 있습니다.
- 채널 관리: 채널을 통해 데이터를 안전하게 전달하고, select 문을 사용하여 파이프라인을 종료할 수 있습니다.
- 리소스 해제: done 채널을 사용하여 파이프라인을 종료하면, 고루틴과 채널이 안전하게 종료됩니다.
추가 Tee 예시
func Tee( done, c chan interface{}) (<-chan interface{}, <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range orDone(done, c) { var out1, out2 = out1, out2 for i := 0; i < 2; i++ { select { case <-done: case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }() return out1, out2 }
주요 부분 설명
1. Tee 함수 서명:
- done: 파이프라인을 중단할 수 있는 채널입니다.
- c: 입력 데이터를 제공하는 채널입니다.
- 반환 값으로 두 개의 출력 채널을 반환합니다.
2. 출력 채널 생성:
out1 := make(chan interface{}) out2 := make(chan interface{})
3. 고루틴 생성:
- 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 고루틴을 생성합니다.
go func() { defer close(out1) defer close(out2) for val := range orDone(done, c) { var out1, out2 = out1, out2 for i := 0; i < 2; i++ { select { case <-done: case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }()
4. orDone 함수 호출:
- orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다. 이 함수의 구현은 예제에 없지만, 일반적으로는 아래와 같이 구현됩니다:
func orDone(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if !ok { return } select { case valStream <- v: case <-done: } } } }() return valStream }
5. var out1, out2 = out1, out2:
- 이 부분은 각각의 출력 채널에 대한 복사본을 생성하는 것입니다. 이 복사본은 각 select 블록 내에서 특정 채널에 대한 쓰기를 중단하는 데 사용됩니다.
- out1이나 out2 중 하나에 데이터를 성공적으로 쓰면, 해당 채널을 nil로 설정하여 현재 값이 다른 채널로 중복되어 전송되는 것을 방지합니다.
코드의 핵심 부분 상세 설명
특히 헷갈리는 부분인 var out1, out2 = out1, out2와 관련된 로직을 설명드리겠습니다.
var out1, out2 = out1, out2:
for val := range orDone(done, c) { var out1, out2 = out1, out2 for i := 0; i < 2; i++ { select { case <-done: case out1 <- val: // we set the shadowed copy to nil, to block further writes for current val out1 = nil case out2 <- val: // we set the shadowed copy to nil, to block further writes for current val out2 = nil } } }
- var out1, out2 = out1, out2:
- 이 구문은 out1과 out2의 새 변수를 만듭니다. 이 변수들은 for 루프 안에서만 유효하며, 원래의 out1과 out2를 그림자(shadow)처럼 덮어씁니다.
- 각 select 문에서 성공적으로 쓰기가 이루어지면 해당 채널 변수를 nil로 설정하여 현재 값이 중복해서 다른 채널로 전송되지 않도록 합니다.
- select 문:
- 첫 번째 select 문에서 out1에 데이터를 성공적으로 쓰면 out1을 nil로 설정하여 두 번째 반복 시 out1에 다시 쓰는 것을 방지합니다.
- 두 번째 select 문에서 out2에 데이터를 성공적으로 쓰면 out2를 nil로 설정하여 두 번째 반복 시 out2에 다시 쓰는 것을 방지합니다.
예제 실행 흐름
- 입력 채널 c에서 데이터를 읽습니다:
- orDone 함수는 done 채널을 감시하면서 c에서 데이터를 읽어 반환합니다.
- 데이터를 출력 채널로 복사합니다:
- 각 val에 대해 두 번의 select 문이 실행됩니다.
- 첫 번째 select 문에서 out1에 성공적으로 데이터를 쓰면 out1을 nil로 설정합니다.
- 두 번째 select 문에서 out2에 성공적으로 데이터를 쓰면 out2을 nil로 설정합니다.
이렇게 하면 각 값이 정확히 한 번씩만 각 출력 채널로 전송됩니다. 이 과정에서 done 채널을 통해 파이프라인을 중단할 수 있습니다.
이해를 돕기 위해 요약하자면, 이 코드는 done 채널을 통해 파이프라인을 중단할 수 있도록 하면서 입력 채널의 데이터를 두 개의 출력 채널로 복사합니다. 각 값은 정확히 한 번씩만 출력 채널로 전송됩니다.
다중 출력 채널을 위한 Tee 함수
out1, out2, out3, ... out100과 같이 여러 개의 출력 채널을 사용하는 것도 가능합니다. 하지만 이 경우에는 코드를 조금 더 복잡하게 작성해야 합니다. 각 출력 채널로 데이터를 복사하는 로직을 동적으로 처리할 수 있도록 일반화할 필요가 있습니다.
다중 출력 채널을 위한 일반화된 Tee 함수
아래는 out 채널이 동적으로 여러 개 생성되도록 수정한 예제입니다.
package main import ( "fmt" "sync" ) // tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다. func tee(done <-chan struct{}, in <-chan interface{}, n int) []<-chan interface{} { outs := make([]chan interface{}, n) for i := range outs { outs[i] = make(chan interface{}) } go func() { defer func() { for _, out := range outs { close(out) } }() for val := range orDone(done, in) { var wg sync.WaitGroup wg.Add(len(outs)) for _, out := range outs { go func(c chan interface{}) { defer wg.Done() select { case c <- val: case <-done: } }(out) } wg.Wait() } }() outChans := make([]<-chan interface{}, len(outs)) for i := range outs { outChans[i] = outs[i] } return outChans } // orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. func orDone(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if !ok { return } select { case valStream <- v: case <-done: } } } }() return valStream } func main() { done := make(chan struct{}) defer close(done) in := make(chan interface{}) // tee 함수 호출하여 3개의 출력 채널 생성 outChans := tee(done, in, 3) var wg sync.WaitGroup wg.Add(len(outChans)) // 각 출력 채널에서 데이터 소비 for i, out := range outChans { go func(out <-chan interface{}, idx int) { defer wg.Done() for val := range out { fmt.Printf("out%d: %v\n", idx+1, val) } }(out, i) } // 입력 채널에 데이터 전송 go func() { defer close(in) for i := 1; i <= 5; i++ { in <- i } }() wg.Wait() }
코드 설명
- tee 함수:
- done: 파이프라인을 중단하기 위한 채널입니다.
- in: 입력 데이터를 제공하는 채널입니다.
- n: 생성할 출력 채널의 수입니다.
- outs: n개의 출력 채널을 생성합니다.
- 고루틴을 생성하여 입력 채널에서 데이터를 읽고, 각 출력 채널로 데이터를 복사합니다.
- 각 출력 채널에 데이터를 전송할 때 sync.WaitGroup을 사용하여 모든 채널로 데이터가 전송될 때까지 대기합니다.
- orDone 함수:
- done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다.
- main 함수:
- done 채널을 생성하여 파이프라인 종료를 제어합니다.
- 입력 채널 in을 생성하여 입력 데이터를 전달합니다.
- tee 함수를 호출하여 n개의 출력 채널을 생성합니다.
- 각 출력 고루틴에서 각각의 출력 채널에서 데이터를 소비하고 출력합니다.
- 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.
요약
이 예제에서는 tee 함수가 n개의 출력 채널을 생성하도록 일반화되었습니다. 입력 채널에서 데이터를 읽어 각 출력 채널로 데이터를 전송하는 고루틴을 동적으로 생성하여 처리합니다. 이 방식은 여러 개의 출력 채널을 동적으로 생성하고 관리할 수 있어, 다중 출력 채널을 사용하는 다양한 시나리오에 유연하게 대응할 수 있습니다.
Tee 패턴의 활용
- 로깅 및 모니터링: 실시간으로 들어오는 데이터를 여러 곳에서 동시에 처리하여 로깅하거나 모니터링할 수 있습니다.
- 데이터 분배: 하나의 데이터 스트림을 여러 처리기로 분배하여 병렬로 처리할 수 있습니다.
- 백업 및 복제: 데이터를 동시에 여러 저장소에 백업하거나 복제할 수 있습니다.
Tee 패턴은 데이터의 병렬 처리 및 분배를 쉽게 구현할 수 있는 유용한 동시성 패턴입니다. 이 패턴을 이해하고 활용하면 Go 언어의 동시성 프로그래밍에서 더욱 효율적이고 유연한 설계를 할 수 있습니다.
반응형'Back-End > Golang' 카테고리의 다른 글
golang: 루프 변수의 스코프 이슈(Fixing For Loops in Go 1.22) (0) 2024.08.05 golang: 제네릭(Generic)이란? (0) 2024.08.03 golang: 파이프라인(pipeline)이란? (0) 2024.05.15 golang: 배열(Array)과 슬라이스(Slice) (0) 2024.05.11 고루틴 누수(Goroutine Leak)란? (0) 2024.05.10