Exploring Concurrency

By Zach Bullough on 8/30/2021

At OMG, many of our backend microservices are written in the Go Programming Language. Among a few other things, Go is primarily famous for having extremely efficient, first class concurrency support.

Go’s primary form of concurrency is called a goroutine. Goroutines, like coroutines in other languages, allow for asynchronous programming. At a low level, Go’s runtime multiplexes these coroutines onto OS threads, providing parallelism in addition to concurrency. While many other languages have something similar, Go’s goroutines are primitives, built into the base language, and the Go runtime manages them extremely efficiently.

 

Shared Memory Safety

Parallel processing in mainstream computing boils down to splitting a task between either threads or processes. Threads are substantially more lightweight and are generally more desired than processes, especially when you need a lot of them. However, threads present a new problem: unlike separate processes, which all have their own, isolated memory space, threads all share the same memory space. A number of best practices and models can be utilized to avoid dangerous runtime race conditions, crashes or unexpected behavior.

While Go supports numerous ways to safely share memory, the preferred way of sharing information in Go is to use the channel primitives that Go provides. Channels actually eliminate the need to mutate shared memory across threads at all, and simply send information from one thread to another, safely. To demonstrate how channels work, we’re going to implement the Chudnovsky algorithm for calculating Pi.

If you would like to try this program for yourself and don’t have go installed on your system, you can run a slightly modified version on the Go Playground.

package main

import (
   "fmt"
   "math"
   "strconv"
   "time"
)

type pool struct {
   task func(uint64) float64
   input chan uint64
   output chan float64
}

func newPool(numWorkers int, task func(uint64) float64) *pool {
   pool := &pool{
      task: task,
      input: make(chan uint64),
      output: make(chan float64),
   }
   for i := 0; i < numWorkers; i++ {
      go func() { // start this function (a worker) in a new goroutine
         for input := range pool.input {
            pool.output <- pool.task(input)
         }
      }()
   }
   return pool
}

func factorial(n uint64) (res uint64) {
   if n > 0 {
      res = n * factorial(n-1)
      return res
   }

   return 1
}

func calculateNumerator(i uint64) float64 {
   return math.Pow(-1, float64(i)) * float64(factorial(6*i)) *
   (545140134*float64(i) + 13591409)
}

func calculateDenominator(i uint64) float64 {
   return float64(factorial(3*i)) * math.Pow(float64(factorial(i)), 3) *
   math.Pow(262537412640768000, float64(i)+0.5)
}

func main() {
   pool := newPool(8, func(i uint64) float64 {
      return calculateNumerator(i) / calculateDenominator(i)
   })
   var input string
   fmt.Println("Enter number of iterations: ")
   fmt.Scanln(&input)
   max, _ := strconv.Atoi(input)
   numIterations := uint64(max)
   sum := 0.0
   start := time.Now()
   go func() {
      for i := uint64(0); i < numIterations; i++ {
         pool.input <- i
      }
   }()
   for i := uint64(0); i < numIterations; i++ {
      sum += <-pool.output
   }
   end := time.Now()
   println(end.Sub(start).String())
   close(pool.input)
   println(1 / (12 * sum))
}

 

Executing this program with an input of 20 (or so) will yield an answer close to Pi: 3.141593

This is pretty straightforward: we build a pool of workers, which can process 8 tasks at a time. We do this by spawning a goroutine 8 times. The goroutine is going to do nothing but take input from a channel, call the provided function, then send the return value of that function to the output channel.

Here is a snippet of the most critical components, with comments for added clarity:

// Our pool struct has three fields:
// 1. the task itself (which is a function that takes an integer
//     and returns a floating point number)
// 2. the input channel, which listens for an integer from the
//     "other end" of the channel
// 3. the output channel, which sends the result of the iteration
//     computation 
pool := &pool{
   task: task,
   input: make(chan uint64),
   output: make(chan float64),
}
// The 'go' keyword takes a function and executes it in a new goroutine
for i := 0; i < numWorkers; i++ {
   go func() { // start this function (a worker) in a new goroutine
      for input := range pool.input {
         pool.output <- pool.task(input)
      }
   }()
}

// Then, in our main function, we feed the worker pool via the  // channel. We do this in another go routine, which means we  // immediately continue to the next line in our program, and this  // function will continue running asynchronously. This is because // channels (by default) will block writes until the previous value // has been read. So, we will send a number, then wait until the  // other end reads before we send another.
 // You can think of a channel as a pipe, exactly big enough for 1  // message. // If we want to send more messages, we have to wait until the  // other side has removed it from the pipe. 
go func() {
   for i := uint64(0); i < numIterations; i++ {
      pool.input <- i // "send the value of i over the input channel"
   }
}() 
// Here, in the main goroutine/thread, we now read the output channel. This  // will free the workers to write to the channel again. Since our input is // happening in another routine we only have to iterate over the results // until we've consumed them all. // The workers could report the results in any order, but order doesn't // matter for this operation 
for i := uint64(0); i < numIterations; i++ {
   sum += <- pool.output
}

 

Strengths and Weaknesses

The huge plus to Go’s channel system is that there is actually no shared memory being mutated here. The “main” thread simply tells the worker threads what to do by writing to the input channel, and then reads from the output channel, waiting if necessary for the workers to send a value. This can be scaled to substantially more complicated implementations, selecting over multiple channels, sending complicated message bodies to implement an Actor model, and numerous other concurrency patterns.

However, while Go provides powerful primitives in the form of goroutines and channels, it is still easy to write code that will fail spectacularly at runtime. As an example, if I mistake the number of executions and have my output loop iterate one too many times, it will wait forever for another message that will never arrive, deadlock, and panic:

for i := uint64(0); i <= numIterations; i++ {
   sum += <-pool.output
} 
// Changing just this loop to a <= instead of a < will cause the program to  // crash: fatal error: all goroutines are asleep - deadlock!

 

In addition, while using channels is indeed perfectly safe, in most cases Go won’t know or care if you also mutate shared state using normal variables. The responsibility is on the user and their tools to catch those mistakes and to avoid sharing references across goroutines, and strictly use channels for communication.

 

Alternatives

There are many different methods and patterns for sharing memory safely, but one that is particularly interesting is Software Transactional Memory. In short, Transactional Memory as a concept enforces database-like transactions for memory operations, effectively eliminating concerns about memory safety. Software Transactional Memory simply means that the transaction is implemented in software, as opposed to hardware (in the memory or memory controller itself).

Possibly the coolest implementation of STM is in the Clojure language, which is a functional, JVM-based Lisp dialect. It’s unique among mainstream languages for having STM built into the core language itself instead of requiring libraries or other external tools. Due to Clojure being a functional language, and thus mutating state is only permitted in strict, controlled situations, Clojure can prevent unsafe mutation of shared memory at compile time in most situations.

Here is the Chudnovsky algorithm again, in Clojure:

Here is a gist of both this source, and the package configuration for running locally. You will need to install Clojure and the Leiningen package manager on your system to run it.

(ns clojure-chudnovskys.core)

(require '[clojure.math.numeric-tower :as math :refer [expt]])
(import '(java.util.concurrent Executors))

(defn factorial [n]
   (reduce *' (range 1 n))) ; *' automatically handles bigints for us

(defn calc-denominator [i]
   (*' (factorial (*' 3 i)) (expt (factorial i) 3)
      (expt 262537412640768000 (+' 0.5 i))))

(defn calc-numerator [i]
   (*' (expt -1 i) (factorial (*' 6 i))
      (+' (*' 545140134 i) 13591409)))

(defn -main [& args]
   (println "Enter number of iterations: ")
   (def iterations (Integer/parseInt (read-line)))
   ; Concurrent calculation using STM
   (def sum (ref 0))
   (def pool (Executors/newFixedThreadPool 8))
   (def tasks
      (map
         (fn [i] ; map a function to each value of i
            (fn []; that returns a closure to calculate the itervalue
               (def itervalue (/ (calc-numerator i) (calc-denominator i)))
                  (dosync ; Start our transaction
                  (alter sum + itervalue)
               ) ; End our transaction
            )
         ), (range iterations) ; -> (0..iterations)
      ) ;; basically, for i in 0..iterations, return a closure
   )
   (doseq [future (.invokeAll pool tasks)]
      (.get future))
   (.shutdown pool)
   (println (/ 1 (*' 12 (deref sum))))
)

While clojure can certainly be a bit more challenging to read unless you’re familiar with Lisp or functional patterns, the bulk of our parallelism logic is in these few lines:

; Create a ThreadPool Executor, with 8 threads
(def pool (Executors/newFixedThreadPool 8))
   ; doseq is essentially a for loop.
   (doseq [future (.invokeAll pool tasks)]; foreach pool.invokeAll(tasks) 
      (.get future)) ; future.get()
(.shutdown pool) ; pool.shutdown()
; Each future is executing the following code in a closure
; for a given iteration number, i
; itervalue = calc-numerator(i) / calc-denominator(i)
(def itervalue (/ (calc-numerator i) (calc-denominator i)))
(dosync ; Start our transaction 
   (alter sum + itervalue) ; mutate sum, sum += itervalue
) ; Commit our transaction

 

STM STrengths And Weaknesses

The main advantage of STM over Go’s channel system is that it is far more similar to regular, synchronous programming. If you have a working synchronous version, you can typically convert it to an asynchronous version with fewer changes to the code’s structure than with channels. Clojure’s functional nature combined with native STM support also makes it more difficult to accidentally mutate shared memory in an unsafe way.

However, STM is comparatively slow. The entire object in memory (in this case, sum) is locked and all other, contending threads must wait for the transaction to commit or rollback before they can access that memory. In this case, the operation being performed is extremely fast, but if we had also calculated the itervalue inside the transaction instead of before it, our implementation would be much slower, and possibly slower than a synchronous implementation. So, in this case there is essentially no way for us to unsafely manipulate shared memory, but we could very easily write a program that is worse off for being parallel.

 

Zach Bullough – OMG’er #76
Principal Engineer