October 6, 2020 by Drew DeVault

In-process work queueing for Go

In the course of our API 2.0 efforts, it is necessary for us to implement some kind of mechanism for queueing and retrying work in our Go platform. The most obvious examples of this work is sending emails and delivering webhooks, but there are more subtle examples, too — just about anything which can be moved out of the request → response hot path would improve performance for the end-user. As such, some kind of task queue is called for.

Our Python codebase uses Celery, and we have already had some experience at building Celery workers in Go. However, on the whole we’re pretty dissatisfied with Celery. It’s too complicated and inflexible for our needs. Something lighter weight and more toolkit-oriented (as opposed to midlayer-oriented) would address our use-case better.

So, I set out to design us a bespoke task queueing system for our Go daemons. Some of the goals were:

The solution came in the form of our new “dowork” Go library. Here’s the basic usage:

import "git.sr.ht/~sircmpwn/dowork"

queue := work.NewQueue()
queue.Start(context.Background()) // Does not block
queue.Submit(func(ctx context.Context) error {
    // Do work...
    return nil
})
queue.Shutdown() // Blocks until all pending tasks are complete

If you want to handle retries, create and enqueue your task in two steps:

task := work.NewTask(func(ctx context.Context) error {
    // Do work...
    return nil
}).Retries(5)
queue.Enqueue(task)

This will automatically retry your task with an exponential backoff if it returns an error.

Let’s take a look at this in action! Consider our email handling module.

// Returns a task which will send this email for the work queue. If the caller
// does not need to customize the task parameters, the Enqueue function may be
// more desirable.
func NewTask(ctx context.Context, m *gomail.Message) *work.Task {
	conf := config.ForContext(ctx)
	return work.NewTask(func(ctx context.Context) error {
		return Send(config.Context(ctx, conf), m)
	}).Retries(10).After(func(ctx context.Context, task *work.Task) {
		if task.Result() == nil {
			log.Printf("MAIL TO %s: '%s' sent after %d attempts",
				strings.Join(m.GetHeader("To"), ";"),
				strings.Join(m.GetHeader("Subject"), ";"),
				task.Attempts())
		} else {
			log.Printf("MAIL TO %s: '%s' failed after %d attempts: %v",
				strings.Join(m.GetHeader("To"), ";"),
				strings.Join(m.GetHeader("Subject"), ";"),
				task.Attempts(), task.Result())
		}
	})
}

// Enqueues an email for sending with the default parameters.
func Enqueue(ctx context.Context, m *gomail.Message) {
	ForContext(ctx).Enqueue(NewTask(ctx, m))
}

// Creates a new email processing queue.
func NewQueue() *work.Queue {
	return work.NewQueue("email")
}

Some code for handling contexts is omitted for brevity. Here we use a closure to enclose the message to be sent, and some extra details like the config file (which includes our SMTP details). We configure it for up to 10 retries, and log the result after the task is complete. Sending an email with this is pretty straightforward:

m := gomail.NewMessage()
m.SetAddressHeader("From", "jdoe@example.org", "Jane Doe")
m.SetAddressHeader("To", "jsmith@example.org", "John Smith")
m.SetHeader("Subject", "An email subject")
m.SetBody("text/plain", "An email body")
email.Enqueue(ctx, m) // Doesn't block!

The next interesting component comes when it’s time to terminate the process. We want to do the following things:

  1. Stop accepting new connections and free up the HTTP port for the new daemon
  2. Finish servicing existing requests, up to a timeout
  3. Finish running any already-queued tasks
  4. Terminate the process

So the process looks like this:

mail := email.NewQueue()
mail.Start(context.Background())

// ...

go qserver.Serve(qlistener) // Asynchronously start the main HTTP server

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
signal.Reset(os.Interrupt)
log.Println("SIGINT caught, initiating warm shutdown")
log.Println("SIGINT again to terminate immediately and drop pending requests & tasks")

log.Println("Terminating server...")
ctx, cancel := context.WithDeadline(context.Background(),
  time.Now().Add(30 * time.Second))
qserver.Shutdown(ctx)
cancel()

log.Println("Terminating work queues...")
work.Join(mail)
qserver.Close()
log.Println("Terminating process.")

As a bonus for observability, we also set up a secondary HTTP server on a kernel-assigned TCP port, which we can use to monitor the shutdown process:

mux := &http.ServeMux{}
mux.Handle("/metrics", promhttp.Handler())
pserver := &http.Server{Handler: mux}
plistener, err := net.Listen("tcp", ":0")
if err != nil {
  panic(err)
}
log.Printf("Prometheus listening on :%d", plistener.Addr().(*net.TCPAddr).Port)
go pserver.Serve(plistener)

I also added a little extra log during the shutdown process:

log.Printf("Progress available via Prometheus stats on port %d",
  plistener.Addr().(*net.TCPAddr).Port)

This is just to print the Prometheus port closer to the shutdown event in the logs, for easy reference. curl http://[::1]:$port/metrics will provide Prometheus metrics, including the queue drain progress for the sysadmin to monitor.

That’s it! Some future improvements along these lines will include:

Stay tuned.

View the “dowork” project on SourceHut →


  1. Without necessarily making it difficult to move executors to separate processes or machines later on ↩︎

  2. Update 2020-10-07: We’ve successfully tested using SO_REUSEPORT to allow the replacement daemon to start up before the defunct daemon starts shutting down, which completely eliminates the window during which connections could be dropped. We still need to work on coaxing OpenRC into handling this service lifecycle; we’ll likely write a follow-up post about this. ↩︎