您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何使用频道广播消息

如何使用频道广播消息

您正在执行的是扇出模式,也就是说,多个端点正在侦听单个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是close渠道。close所有的听众都将认识到这一点,因此是“广播”。

但是您想要做的是广播从连接读取的消息,因此我们可以执行以下操作:

让每个工作人员收听专用广播频道,并将消息从主频道分发到每个专用广播频道。

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

然后我们可能会有很多工人:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后启动我们的监听器:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

和调度员:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当需要新工作人员时,都需要创建一个新工作人员,将其启动,然后将其推入workers切片。但是此方法需要一个线程安全的切片,该切片周围需要一个锁。一种实现可能如下所示:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

每当您想开始工作时:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

您的调度员将更改为:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

好的做法之一是:永远不要离开悬空的goroutine。因此,当您听完后,需要关闭所有触发的goroutine。这将通过以下quit渠道进行worker

首先,我们需要创建一个全局quit信令通道:

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们都会为其分配globalQuit通道作为其退出信号:

worker.quit = globalQuit

然后,当我们要关闭所有工作程序时,我们只需执行以下操作:

close(globalQuit)

由于close所有侦听的goroutine都可以识别(这是您理解的重点),因此将返回所有goroutine。记住也要关闭调度程序,但我会留给您:)

其他 2022/1/1 18:14:31 有448人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶