func (m *Map[T, R]) doStream() { sem := make(chan struct{}, m.parallelism) for elem := range m.in { sem <- struct{}{} go func(element T) { defer func() { <-sem }() result := m.mapFunction(element) m.out <- result }(elem.(T)) } for i := 0; i < int(m.parallelism); i++ { sem <- struct{}{} } close(m.out) } 优化
时间: 2024-03-03 21:49:12 浏览: 12
这段代码是一个Map操作的实现,使用了并发处理输入数据。其中,使用了一个channel(sem)来控制并发度,控制goroutine的数量。在处理完数据后,还需要等待所有goroutine执行完成,关闭输出channel。为了优化这段代码,可以使用sync.WaitGroup来等待所有goroutine执行完成,避免使用额外的channel。下面是代码优化的示例:
```
func (m *Map[T, R]) doStream() {
var wg sync.WaitGroup
for elem := range m.in {
wg.Add(1)
go func(element T) {
defer wg.Done()
result := m.mapFunction(element)
m.out <- result
}(elem.(T))
}
wg.Wait()
close(m.out)
}
```
在这个优化后的代码中,我们使用一个WaitGroup来等待所有goroutine执行完成。在处理输入数据时,我们调用WaitGroup的Add方法增加goroutine的计数器,表示有新的goroutine开始执行。在goroutine执行完成后,我们调用WaitGroup的Done方法减少计数器,表示有一个goroutine执行完成。最后,我们调用Wait方法等待所有goroutine执行完成,并关闭输出channel。这样可以避免使用额外的channel,代码更加简洁。