ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • golang: Tee 패턴이란?
    Back-End/Golang 2024. 5. 15. 21:12
    반응형

     Tee 패턴은 하나의 입력 채널에서 읽은 데이터를 여러 출력 채널로 복사하여 동시에 여러 곳에서 사용할 수 있게 하는 패턴입니다. 이 패턴은 이름 그대로 "티" 모양처럼 데이터를 여러 갈래로 나누는 기능을 합니다. 이 패턴은 데이터를 여러 곳에서 병렬로 처리할 때 유용합니다.

    Tee 패턴의 구조

    Tee 패턴 구조

    1. 입력 채널: 데이터를 제공하는 채널입니다.
    2. 출력 채널들: 입력 채널에서 읽은 데이터를 복사하여 전달할 여러 개의 출력 채널입니다.

    예제 코드

    아래 예제에서는 tee 함수가 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 기능을 수행합니다.

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다.
    func tee(done <-chan struct{}, in <-chan int) (<-chan int, <-chan int) {
        out1 := make(chan int)
        out2 := make(chan int)
    
        go func() {
            defer close(out1)
            defer close(out2)
            for val := range in {
                select {
                case out1 <- val:
                case <-done:
                    return
                }
    
                select {
                case out2 <- val:
                case <-done:
                    return
                }
            }
        }()
        return out1, out2
    }
    
    func main() {
        done := make(chan struct{})
        defer close(done)
    
        // 입력 채널 생성
        in := make(chan int)
    
        // tee 함수 호출하여 두 개의 출력 채널 생성
        out1, out2 := tee(done, in)
    
        var wg sync.WaitGroup
        wg.Add(2)
    
        // 첫 번째 출력 채널에서 데이터 소비
        go func() {
            defer wg.Done()
            for val := range out1 {
                fmt.Println("out1:", val)
            }
        }()
    
        // 두 번째 출력 채널에서 데이터 소비
        go func() {
            defer wg.Done()
            for val := range out2 {
                fmt.Println("out2:", val)
            }
        }()
    
        // 입력 채널에 데이터 전송
        go func() {
            defer close(in)
            for i := 1; i <= 5; i++ {
                in <- i
            }
        }()
    
        wg.Wait()
    }

    코드 설명

    1. tee 함수:
      • done: 파이프라인을 중단하기 위한 채널입니다.
      • in: 입력 채널입니다.
      • out1, out2: 두 개의 출력 채널입니다.
      • 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사합니다.
      • select 문을 사용하여 done 채널이 닫히면 즉시 반환합니다.
    2. main 함수:
      • done 채널을 생성하여 파이프라인 종료를 제어합니다.
      • in 채널을 생성하여 입력 데이터를 전달합니다.
      • tee 함수를 호출하여 두 개의 출력 채널을 생성합니다.
      • wg를 사용하여 두 개의 출력 고루틴이 종료될 때까지 대기합니다.
      • 두 개의 출력 고루틴에서 각각 출력 채널에서 데이터를 소비하고 출력합니다.
      • 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.

    주요 포인트

    • 동시성: 입력 데이터를 동시에 여러 출력 채널로 복사하여 병렬로 처리할 수 있습니다.
    • 채널 관리: 채널을 통해 데이터를 안전하게 전달하고, select 문을 사용하여 파이프라인을 종료할 수 있습니다.
    • 리소스 해제: done 채널을 사용하여 파이프라인을 종료하면, 고루틴과 채널이 안전하게 종료됩니다.

     

    추가 Tee 예시

    func Tee(
    	done,
    	c chan interface{}) (<-chan interface{}, <-chan interface{}) {
    	out1 := make(chan interface{})
    	out2 := make(chan interface{})
    
    	go func() {
    		defer close(out1)
    		defer close(out2)
    		for val := range orDone(done, c) {
    			var out1, out2 = out1, out2
    			for i := 0; i < 2; i++ {
    				select {
    				case <-done:
    				case out1 <- val:
    					out1 = nil
    				case out2 <- val:
    					out2 = nil
    				}
    			}
    		}
    	}()
    
    	return out1, out2
    }

    주요 부분 설명

    1. Tee 함수 서명:

    • done: 파이프라인을 중단할 수 있는 채널입니다.
    • c: 입력 데이터를 제공하는 채널입니다.
    • 반환 값으로 두 개의 출력 채널을 반환합니다.

    2. 출력 채널 생성:

    out1 := make(chan interface{})
    out2 := make(chan interface{})

     

    3. 고루틴 생성:

    • 입력 채널에서 데이터를 읽어 두 개의 출력 채널로 복사하는 고루틴을 생성합니다.
    go func() {
        defer close(out1)
        defer close(out2)
        for val := range orDone(done, c) {
            var out1, out2 = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case <-done:
                case out1 <- val:
                    out1 = nil
                case out2 <- val:
                    out2 = nil
                }
            }
        }
    }()

     

    4. orDone 함수 호출:

    • orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다. 이 함수의 구현은 예제에 없지만, 일반적으로는 아래와 같이 구현됩니다:
    func orDone(done, c <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                select {
                case <-done:
                    return
                case v, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case valStream <- v:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }

     

    5. var out1, out2 = out1, out2:

    • 이 부분은 각각의 출력 채널에 대한 복사본을 생성하는 것입니다. 이 복사본은 각 select 블록 내에서 특정 채널에 대한 쓰기를 중단하는 데 사용됩니다.
    • out1이나 out2 중 하나에 데이터를 성공적으로 쓰면, 해당 채널을 nil로 설정하여 현재 값이 다른 채널로 중복되어 전송되는 것을 방지합니다.

    코드의 핵심 부분 상세 설명

    특히 헷갈리는 부분인 var out1, out2 = out1, out2와 관련된 로직을 설명드리겠습니다.

    var out1, out2 = out1, out2:

    for val := range orDone(done, c) {
        var out1, out2 = out1, out2
        for i := 0; i < 2; i++ {
            select {
            case <-done:
            case out1 <- val:
                // we set the shadowed copy to nil, to block further writes for current val
                out1 = nil
            case out2 <- val:
                // we set the shadowed copy to nil, to block further writes for current val
                out2 = nil
            }
        }
    }
    • var out1, out2 = out1, out2:
      • 이 구문은 out1과 out2의 새 변수를 만듭니다. 이 변수들은 for 루프 안에서만 유효하며, 원래의 out1과 out2를 그림자(shadow)처럼 덮어씁니다.
      • 각 select 문에서 성공적으로 쓰기가 이루어지면 해당 채널 변수를 nil로 설정하여 현재 값이 중복해서 다른 채널로 전송되지 않도록 합니다.
    • select 문:
      • 첫 번째 select 문에서 out1에 데이터를 성공적으로 쓰면 out1을 nil로 설정하여 두 번째 반복 시 out1에 다시 쓰는 것을 방지합니다.
      • 두 번째 select 문에서 out2에 데이터를 성공적으로 쓰면 out2를 nil로 설정하여 두 번째 반복 시 out2에 다시 쓰는 것을 방지합니다.

    예제 실행 흐름

    1. 입력 채널 c에서 데이터를 읽습니다:
      • orDone 함수는 done 채널을 감시하면서 c에서 데이터를 읽어 반환합니다.
    2. 데이터를 출력 채널로 복사합니다:
      • 각 val에 대해 두 번의 select 문이 실행됩니다.
      • 첫 번째 select 문에서 out1에 성공적으로 데이터를 쓰면 out1을 nil로 설정합니다.
      • 두 번째 select 문에서 out2에 성공적으로 데이터를 쓰면 out2을 nil로 설정합니다.

    이렇게 하면 각 값이 정확히 한 번씩만 각 출력 채널로 전송됩니다. 이 과정에서 done 채널을 통해 파이프라인을 중단할 수 있습니다.

    이해를 돕기 위해 요약하자면, 이 코드는 done 채널을 통해 파이프라인을 중단할 수 있도록 하면서 입력 채널의 데이터를 두 개의 출력 채널로 복사합니다. 각 값은 정확히 한 번씩만 출력 채널로 전송됩니다.

     

    다중 출력 채널을 위한 Tee 함수

     out1, out2, out3, ... out100과 같이 여러 개의 출력 채널을 사용하는 것도 가능합니다. 하지만 이 경우에는 코드를 조금 더 복잡하게 작성해야 합니다. 각 출력 채널로 데이터를 복사하는 로직을 동적으로 처리할 수 있도록 일반화할 필요가 있습니다.

    다중 출력 채널을 위한 일반화된 Tee 함수

    아래는 out 채널이 동적으로 여러 개 생성되도록 수정한 예제입니다.

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    // tee 함수는 입력 채널에서 데이터를 읽어 여러 출력 채널로 복사합니다.
    func tee(done <-chan struct{}, in <-chan interface{}, n int) []<-chan interface{} {
        outs := make([]chan interface{}, n)
        for i := range outs {
            outs[i] = make(chan interface{})
        }
    
        go func() {
            defer func() {
                for _, out := range outs {
                    close(out)
                }
            }()
    
            for val := range orDone(done, in) {
                var wg sync.WaitGroup
                wg.Add(len(outs))
                for _, out := range outs {
                    go func(c chan interface{}) {
                        defer wg.Done()
                        select {
                        case c <- val:
                        case <-done:
                        }
                    }(out)
                }
                wg.Wait()
            }
        }()
    
        outChans := make([]<-chan interface{}, len(outs))
        for i := range outs {
            outChans[i] = outs[i]
        }
        return outChans
    }
    
    // orDone 함수는 done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다.
    func orDone(done, c <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                select {
                case <-done:
                    return
                case v, ok := <-c:
                    if !ok {
                        return
                    }
                    select {
                    case valStream <- v:
                    case <-done:
                    }
                }
            }
        }()
        return valStream
    }
    
    func main() {
        done := make(chan struct{})
        defer close(done)
    
        in := make(chan interface{})
    
        // tee 함수 호출하여 3개의 출력 채널 생성
        outChans := tee(done, in, 3)
    
        var wg sync.WaitGroup
        wg.Add(len(outChans))
    
        // 각 출력 채널에서 데이터 소비
        for i, out := range outChans {
            go func(out <-chan interface{}, idx int) {
                defer wg.Done()
                for val := range out {
                    fmt.Printf("out%d: %v\n", idx+1, val)
                }
            }(out, i)
        }
    
        // 입력 채널에 데이터 전송
        go func() {
            defer close(in)
            for i := 1; i <= 5; i++ {
                in <- i
            }
        }()
    
        wg.Wait()
    }

    코드 설명

    1. tee 함수:
      • done: 파이프라인을 중단하기 위한 채널입니다.
      • in: 입력 데이터를 제공하는 채널입니다.
      • n: 생성할 출력 채널의 수입니다.
      • outs: n개의 출력 채널을 생성합니다.
      • 고루틴을 생성하여 입력 채널에서 데이터를 읽고, 각 출력 채널로 데이터를 복사합니다.
      • 각 출력 채널에 데이터를 전송할 때 sync.WaitGroup을 사용하여 모든 채널로 데이터가 전송될 때까지 대기합니다.
    2. orDone 함수:
      • done 채널을 감시하면서 입력 채널에서 데이터를 읽습니다. done 채널이 닫히면 읽기를 중단합니다.
    3. main 함수:
      • done 채널을 생성하여 파이프라인 종료를 제어합니다.
      • 입력 채널 in을 생성하여 입력 데이터를 전달합니다.
      • tee 함수를 호출하여 n개의 출력 채널을 생성합니다.
      • 각 출력 고루틴에서 각각의 출력 채널에서 데이터를 소비하고 출력합니다.
      • 입력 채널에 데이터를 전송하는 고루틴을 생성합니다.

    요약

    이 예제에서는 tee 함수가 n개의 출력 채널을 생성하도록 일반화되었습니다. 입력 채널에서 데이터를 읽어 각 출력 채널로 데이터를 전송하는 고루틴을 동적으로 생성하여 처리합니다. 이 방식은 여러 개의 출력 채널을 동적으로 생성하고 관리할 수 있어, 다중 출력 채널을 사용하는 다양한 시나리오에 유연하게 대응할 수 있습니다.

     

    Tee 패턴의 활용

    • 로깅 및 모니터링: 실시간으로 들어오는 데이터를 여러 곳에서 동시에 처리하여 로깅하거나 모니터링할 수 있습니다.
    • 데이터 분배: 하나의 데이터 스트림을 여러 처리기로 분배하여 병렬로 처리할 수 있습니다.
    • 백업 및 복제: 데이터를 동시에 여러 저장소에 백업하거나 복제할 수 있습니다.

    Tee 패턴은 데이터의 병렬 처리 및 분배를 쉽게 구현할 수 있는 유용한 동시성 패턴입니다. 이 패턴을 이해하고 활용하면 Go 언어의 동시성 프로그래밍에서 더욱 효율적이고 유연한 설계를 할 수 있습니다.

    반응형
Designed by Tistory.