Back-End/Golang

golang: Tee 패턴이란?

슝슝이입니다 2024. 5. 15. 21:12
반응형

 Tee 패턴은 하나의 입력 채널에서 읽은 데이터를 여러 출력 채널로 복사하여 동시에 여러 곳에서 사용할 수 있게 하는 패턴입니다. 이 패턴은 이름 그대로 "티" 모양처럼 데이터를 여러 갈래로 나누는 기능을 합니다. 이 패턴은 데이터를 여러 곳에서 병렬로 처리할 때 유용합니다.

Tee 패턴의 구조

Tee 패턴 구조

  1. 입력 채널: 데이터를 제공하는 채널입니다.
  2. 출력 채널들: 입력 채널에서 읽은 데이터를 복사하여 전달할 여러 개의 출력 채널입니다.

예제 코드

아래 예제에서는 tee 함수가 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 기능을 수행합니다.

package main

import (
    "fmt"
    "sync"
)

// tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다.
func tee(done <-chan struct{}, in <-chan int) (<-chan int, <-chan int) {
    out1 := make(chan int)
    out2 := make(chan int)

    go func() {
        defer close(out1)
        defer close(out2)
        for val := range in {
            select {
            case out1 <- val:
            case <-done:
                return
            }

            select {
            case out2 <- val:
            case <-done:
                return
            }
        }
    }()
    return out1, out2
}

func main() {
    done := make(chan struct{})
    defer close(done)

    // 입력 채널 생성
    in := make(chan int)

    // tee 함수 호출하여 두 개의 출력 채널 생성
    out1, out2 := tee(done, in)

    var wg sync.WaitGroup
    wg.Add(2)

    // 첫 번째 출력 채널에서 데이터 소비
    go func() {
        defer wg.Done()
        for val := range out1 {
            fmt.Println("out1:", val)
        }
    }()

    // 두 번째 출력 채널에서 데이터 소비
    go func() {
        defer wg.Done()
        for val := range out2 {
            fmt.Println("out2:", val)
        }
    }()

    // 입력 채널에 데이터 전송
    go func() {
        defer close(in)
        for i := 1; i <= 5; i++ {
            in <- i
        }
    }()

    wg.Wait()
}

코드 설명

  1. tee 함수:
    • done: 파이프라인을 중단하기 위한 채널입니다.
    • in: 입력 채널입니다.
    • out1, out2: 두 개의 출력 채널입니다.
    • 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사합니다.
    • select 문을 사용하여 done 채널이 닫히면 즉시 반환합니다.
  2. main 함수:
    • done 채널을 생성하여 파이프라인 종료를 제어합니다.
    • in 채널을 생성하여 입력 데이터를 전달합니다.
    • tee 함수를 호출하여 두 개의 출력 채널을 생성합니다.
    • wg를 사용하여 두 개의 출력 고루틴이 종료될 때까지 대기합니다.
    • 두 개의 출력 고루틴에서 각각 출력 채널에서 데이터를 소비하고 출력합니다.
    • 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.

주요 포인트

  • 동시성: 입력 데이터를 동시에 여러 출력 채널로 복사하여 병렬로 처리할 수 있습니다.
  • 채널 관리: 채널을 통해 데이터를 안전하게 전달하고, select 문을 사용하여 파이프라인을 종료할 수 있습니다.
  • 리소스 해제: done 채널을 사용하여 파이프라인을 종료하면, 고루틴과 채널이 안전하게 종료됩니다.

 

추가 Tee 예시

func Tee(
	done,
	c chan interface{}) (<-chan interface{}, <-chan interface{}) {
	out1 := make(chan interface{})
	out2 := make(chan interface{})

	go func() {
		defer close(out1)
		defer close(out2)
		for val := range orDone(done, c) {
			var out1, out2 = out1, out2
			for i := 0; i < 2; i++ {
				select {
				case <-done:
				case out1 <- val:
					out1 = nil
				case out2 <- val:
					out2 = nil
				}
			}
		}
	}()

	return out1, out2
}

주요 부분 설명

1. Tee 함수 서명:

  • done: 파이프라인을 중단할 수 있는 채널입니다.
  • c: 입력 데이터를 제공하는 채널입니다.
  • 반환 값으로 두 개의 출력 채널을 반환합니다.

2. 출력 채널 생성:

out1 := make(chan interface{})
out2 := make(chan interface{})

 

3. 고루틴 생성:

  • 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 고루틴을 생성합니다.
go func() {
    defer close(out1)
    defer close(out2)
    for val := range orDone(done, c) {
        var out1, out2 = out1, out2
        for i := 0; i < 2; i++ {
            select {
            case <-done:
            case out1 <- val:
                out1 = nil
            case out2 <- val:
                out2 = nil
            }
        }
    }
}()

 

4. orDone 함수 호출:

  • orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다. 이 함수의 구현은 예제에 없지만, 일반적으로는 아래와 같이 구현됩니다:
func orDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

 

5. var out1, out2 = out1, out2:

  • 이 부분은 각각의 출력 채널에 대한 복사본을 생성하는 것입니다. 이 복사본은 각 select 블록 내에서 특정 채널에 대한 쓰기를 중단하는 데 사용됩니다.
  • out1이나 out2 중 하나에 데이터를 성공적으로 쓰면, 해당 채널을 nil로 설정하여 현재 값이 다른 채널로 중복되어 전송되는 것을 방지합니다.

코드의 핵심 부분 상세 설명

특히 헷갈리는 부분인 var out1, out2 = out1, out2와 관련된 로직을 설명드리겠습니다.

var out1, out2 = out1, out2:

for val := range orDone(done, c) {
    var out1, out2 = out1, out2
    for i := 0; i < 2; i++ {
        select {
        case <-done:
        case out1 <- val:
            // we set the shadowed copy to nil, to block further writes for current val
            out1 = nil
        case out2 <- val:
            // we set the shadowed copy to nil, to block further writes for current val
            out2 = nil
        }
    }
}
  • var out1, out2 = out1, out2:
    • 이 구문은 out1과 out2의 새 변수를 만듭니다. 이 변수들은 for 루프 안에서만 유효하며, 원래의 out1과 out2를 그림자(shadow)처럼 덮어씁니다.
    • 각 select 문에서 성공적으로 쓰기가 이루어지면 해당 채널 변수를 nil로 설정하여 현재 값이 중복해서 다른 채널로 전송되지 않도록 합니다.
  • select 문:
    • 첫 번째 select 문에서 out1에 데이터를 성공적으로 쓰면 out1을 nil로 설정하여 두 번째 반복 시 out1에 다시 쓰는 것을 방지합니다.
    • 두 번째 select 문에서 out2에 데이터를 성공적으로 쓰면 out2를 nil로 설정하여 두 번째 반복 시 out2에 다시 쓰는 것을 방지합니다.

예제 실행 흐름

  1. 입력 채널 c에서 데이터를 읽습니다:
    • orDone 함수는 done 채널을 감시하면서 c에서 데이터를 읽어 반환합니다.
  2. 데이터를 출력 채널로 복사합니다:
    • 각 val에 대해 두 번의 select 문이 실행됩니다.
    • 첫 번째 select 문에서 out1에 성공적으로 데이터를 쓰면 out1을 nil로 설정합니다.
    • 두 번째 select 문에서 out2에 성공적으로 데이터를 쓰면 out2을 nil로 설정합니다.

이렇게 하면 각 값이 정확히 한 번씩만 각 출력 채널로 전송됩니다. 이 과정에서 done 채널을 통해 파이프라인을 중단할 수 있습니다.

이해를 돕기 위해 요약하자면, 이 코드는 done 채널을 통해 파이프라인을 중단할 수 있도록 하면서 입력 채널의 데이터를 두 개의 출력 채널로 복사합니다. 각 값은 정확히 한 번씩만 출력 채널로 전송됩니다.

 

다중 출력 채널을 위한 Tee 함수

 out1, out2, out3, ... out100과 같이 여러 개의 출력 채널을 사용하는 것도 가능합니다. 하지만 이 경우에는 코드를 조금 더 복잡하게 작성해야 합니다. 각 출력 채널로 데이터를 복사하는 로직을 동적으로 처리할 수 있도록 일반화할 필요가 있습니다.

다중 출력 채널을 위한 일반화된 Tee 함수

아래는 out 채널이 동적으로 여러 개 생성되도록 수정한 예제입니다.

package main

import (
    "fmt"
    "sync"
)

// tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다.
func tee(done <-chan struct{}, in <-chan interface{}, n int) []<-chan interface{} {
    outs := make([]chan interface{}, n)
    for i := range outs {
        outs[i] = make(chan interface{})
    }

    go func() {
        defer func() {
            for _, out := range outs {
                close(out)
            }
        }()

        for val := range orDone(done, in) {
            var wg sync.WaitGroup
            wg.Add(len(outs))
            for _, out := range outs {
                go func(c chan interface{}) {
                    defer wg.Done()
                    select {
                    case c <- val:
                    case <-done:
                    }
                }(out)
            }
            wg.Wait()
        }
    }()

    outChans := make([]<-chan interface{}, len(outs))
    for i := range outs {
        outChans[i] = outs[i]
    }
    return outChans
}

// orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다.
func orDone(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

func main() {
    done := make(chan struct{})
    defer close(done)

    in := make(chan interface{})

    // tee 함수 호출하여 3개의 출력 채널 생성
    outChans := tee(done, in, 3)

    var wg sync.WaitGroup
    wg.Add(len(outChans))

    // 각 출력 채널에서 데이터 소비
    for i, out := range outChans {
        go func(out <-chan interface{}, idx int) {
            defer wg.Done()
            for val := range out {
                fmt.Printf("out%d: %v\n", idx+1, val)
            }
        }(out, i)
    }

    // 입력 채널에 데이터 전송
    go func() {
        defer close(in)
        for i := 1; i <= 5; i++ {
            in <- i
        }
    }()

    wg.Wait()
}

코드 설명

  1. tee 함수:
    • done: 파이프라인을 중단하기 위한 채널입니다.
    • in: 입력 데이터를 제공하는 채널입니다.
    • n: 생성할 출력 채널의 수입니다.
    • outs: n개의 출력 채널을 생성합니다.
    • 고루틴을 생성하여 입력 채널에서 데이터를 읽고, 각 출력 채널로 데이터를 복사합니다.
    • 각 출력 채널에 데이터를 전송할 때 sync.WaitGroup을 사용하여 모든 채널로 데이터가 전송될 때까지 대기합니다.
  2. orDone 함수:
    • done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다.
  3. main 함수:
    • done 채널을 생성하여 파이프라인 종료를 제어합니다.
    • 입력 채널 in을 생성하여 입력 데이터를 전달합니다.
    • tee 함수를 호출하여 n개의 출력 채널을 생성합니다.
    • 각 출력 고루틴에서 각각의 출력 채널에서 데이터를 소비하고 출력합니다.
    • 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.

요약

이 예제에서는 tee 함수가 n개의 출력 채널을 생성하도록 일반화되었습니다. 입력 채널에서 데이터를 읽어 각 출력 채널로 데이터를 전송하는 고루틴을 동적으로 생성하여 처리합니다. 이 방식은 여러 개의 출력 채널을 동적으로 생성하고 관리할 수 있어, 다중 출력 채널을 사용하는 다양한 시나리오에 유연하게 대응할 수 있습니다.

 

Tee 패턴의 활용

  • 로깅 및 모니터링: 실시간으로 들어오는 데이터를 여러 곳에서 동시에 처리하여 로깅하거나 모니터링할 수 있습니다.
  • 데이터 분배: 하나의 데이터 스트림을 여러 처리기로 분배하여 병렬로 처리할 수 있습니다.
  • 백업 및 복제: 데이터를 동시에 여러 저장소에 백업하거나 복제할 수 있습니다.

Tee 패턴은 데이터의 병렬 처리 및 분배를 쉽게 구현할 수 있는 유용한 동시성 패턴입니다. 이 패턴을 이해하고 활용하면 Go 언어의 동시성 프로그래밍에서 더욱 효율적이고 유연한 설계를 할 수 있습니다.

반응형