What are worker pools?
Worker pools are a group (pool) of processes (workers) doing similar tasks in parallel (concurrently). Worker pools are commonly used to process an unknown number of tasks (or tasks dynamically being added at an unknown rate) while setting a limit on the maximum tasks that can be processed at the same time.
Creating a worker pool
Go provides the sync
package in it's standard library, which contains several utilities for dealing with concurrency. In our case, we are interested in the sync.WaitGroup
type that allows us to coordinate multiple goroutines
from a single location.
package main
import "fmt"
import "sync"
import "time"
var numWorkers int = 3
var queue chan string = make(chan string)
func main() {
wg := sync.WaitGroup{}
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func(n int) {
fmt.Printf("[Worker %d] started\n", n)
for task := range queue {
fmt.Printf("[Worker %d] processing task %q\n", n, task)
time.Sleep(3 * time.Second)
fmt.Printf("[Worker %d] finished task %q\n", n, task)
}
fmt.Printf("[Worker %d] stopped\n", n)
wg.Done()
}(i)
}
for _, task := range []string{"one", "two", "three", "four", "five"} {
queue <- task
}
close(queue)
wg.Wait()
}
First, we declare the number of workers we want to run simultaneously, a queue for all our tasks and a WaitGroup
to coordinate our workers. In this example the type in our task queue is string
, but it could be any other type like struct
or func
as well. Before starting any workers, we call wg.Add()
to tellt he WaitGroup
how many workers we are waiting for - only then we launch our goroutines
from the for
loop.
The workers range over the the queue
chan, which gives us two benefits:
- The worker will block indefinitely until a task becomes available from the
queue
. - The worker will process an infinite amount of tasks in sequence.
- If the
queue
chan is closed, the for loop terminates and thegoroutine
can return, callingwg.Done()
to signal that it has completed.
Once the goroutine workers are all running, they will block while waiting for tasks to be fed into the queue
chan. In our example, we queue 5 tasks, then close the queue
chan and wait for all worker goroutines to complete. As you will notice, even though each task should cause a delay of 3 seconds, the program only needs 6 seconds to complete, as it can process up to 3 tasks at once (one by each worker).
Task sequence and worker scheduling
It is important to understand that while this approach can be used to distribute any number of tasks onto a fixed number of workers, the order in which these tasks complete may differ from the order in which they were added to the queue.
Running the program above may produce output similar to this:
[Worker 2] started
[Worker 2] processing task "one"
[Worker 0] started
[Worker 0] processing task "two"
[Worker 1] started
[Worker 1] processing task "three"
[Worker 1] finished task "three"
[Worker 0] finished task "two"
[Worker 0] processing task "five"
[Worker 2] finished task "one"
[Worker 1] processing task "four"
[Worker 2] stopped
[Worker 1] finished task "four"
[Worker 1] stopped
[Worker 0] finished task "five"
[Worker 0] stopped
As you can see, the tasks are almost processed in the order they were given, with only the fourth and fifth swapping positions - but the order they completed in is totall different (3 -> 2 -> 1 -> 4 -> 5). Although tasks are taken out of the queue chan in sequence, some goroutines may complete earlier than others or temporarily receive more cpu time, allowing them to process faster. If you need to maintain the order of tasks between queuing and completion, a worker pool may not be a good fit for your problem.