Concurrent worker pools in go

Table of contents

What are worker pools?

Worker pools are a group (pool) of processes (workers) doing similar tasks in parallel (concurrently). Worker pools are commonly used to process an unknown number of tasks (or tasks dynamically being added at an unknown rate) while setting a limit on the maximum tasks that can be processed at the same time.

Creating a worker pool

Go provides the sync package in it's standard library, which contains several utilities for dealing with concurrency. In our case, we are interested in the sync.WaitGroup type that allows us to coordinate multiple goroutines from a single location.

package main

import "fmt"
import "sync"
import "time"

var numWorkers int = 3
var queue chan string = make(chan string)

func main() {
   wg := sync.WaitGroup{}
   wg.Add(numWorkers)
   for i := 0; i < numWorkers; i++ {
      go func(n int) {
         fmt.Printf("[Worker %d] started\n", n)
         for task := range queue {
            fmt.Printf("[Worker %d] processing task %q\n", n, task)
            time.Sleep(3 * time.Second)
            fmt.Printf("[Worker %d] finished task %q\n", n, task)
         }
         fmt.Printf("[Worker %d] stopped\n", n)
         wg.Done()
      }(i)
   }
   for _, task := range []string{"one", "two", "three", "four", "five"} {
      queue <- task
   }
   close(queue)
   wg.Wait()
}

First, we declare the number of workers we want to run simultaneously, a queue for all our tasks and a WaitGroup to coordinate our workers. In this example the type in our task queue is string, but it could be any other type like struct or func as well. Before starting any workers, we call wg.Add() to tellt he WaitGroup how many workers we are waiting for - only then we launch our goroutines from the for loop.

The workers range over the the queue chan, which gives us two benefits:

  1. The worker will block indefinitely until a task becomes available from the queue.
  2. The worker will process an infinite amount of tasks in sequence.
  3. If the queue chan is closed, the for loop terminates and the goroutine can return, calling wg.Done() to signal that it has completed.

Once the goroutine workers are all running, they will block while waiting for tasks to be fed into the queue chan. In our example, we queue 5 tasks, then close the queue chan and wait for all worker goroutines to complete. As you will notice, even though each task should cause a delay of 3 seconds, the program only needs 6 seconds to complete, as it can process up to 3 tasks at once (one by each worker).

Task sequence and worker scheduling

It is important to understand that while this approach can be used to distribute any number of tasks onto a fixed number of workers, the order in which these tasks complete may differ from the order in which they were added to the queue.

Running the program above may produce output similar to this:

[Worker 2] started
[Worker 2] processing task "one"
[Worker 0] started
[Worker 0] processing task "two"
[Worker 1] started
[Worker 1] processing task "three"
[Worker 1] finished task "three"
[Worker 0] finished task "two"
[Worker 0] processing task "five"
[Worker 2] finished task "one"
[Worker 1] processing task "four"
[Worker 2] stopped
[Worker 1] finished task "four"
[Worker 1] stopped
[Worker 0] finished task "five"
[Worker 0] stopped

As you can see, the tasks are almost processed in the order they were given, with only the fourth and fifth swapping positions - but the order they completed in is totall different (3 -> 2 -> 1 -> 4 -> 5). Although tasks are taken out of the queue chan in sequence, some goroutines may complete earlier than others or temporarily receive more cpu time, allowing them to process faster. If you need to maintain the order of tasks between queuing and completion, a worker pool may not be a good fit for your problem.

More articles

Pointers 101: The Good, the Bad and the Ugly

Make the most of pointers without worrying about their pitfalls anymore

Producing smaller docker images for go applications

Smaller docker images mean less cost for storage and bandwidth, and faster deployment times. But how do you decrease the size of docker images?

Optimizing SQL queries with ChatGPT

Take advantage of ChatGPT's processing capabilities to make sense of complex query costs and optimization opportunities

The downsides of source-available software licenses

And how it differs from real open-source licenses

Configure linux debian to boot into a fullscreen application

Running kiosk-mode applications with confidence

How to use ansible with vagrant environments

Painlessly connect vagrant infrastructure and ansible playbooks