Some Haskell Concurrent Programming Tests

Table of Contents

1 Abstract

Haskell supports nearly all concurrent programming paradigms (e.g. mutable variables, software transactional memory, futures, compilation to CUDA/OpenCL), but some of them have minor problems, that became important when used in production.

2 Failed tests

2.1 Default channels are wasting memory

If a producer is faster than the consumer, it can fill the channel (and the memory) with data to process, instead of synchronizing with the speed of the slower consumer. The problem can manifest when the producer had to reprocess large batches of data.

The solution: using bounded channels, so when the maximum capacity of the channel is reached, the producer will wait until the consumer has consumed some data.

This code simulate the problem, comparing different types of channels:

data OneChan a
  = ChanDefault (Chan.Chan (Maybe a))
  | ChanBounded (BChan.BoundedChan (Maybe a))
  | ChanMVar (MVar (Maybe a))

This is the producer:

-- | Generate vectors of values.
fastProducer :: Int -> Int -> Int -> OneChan RandomBV -> IO ()
fastProducer seed vectorDimension channelDimension chan = do
  case channelDimension of
    0 -> do writeToOneChan chan Nothing
            return ()
    _ -> do let bv = randomBV seed vectorDimension
            writeToOneChan chan (Just bv)
            putStrLn $ "producer: " ++ show channelDimension ++ " left elements to write" ++ ", hash: " ++ show (hash bv)
            fastProducer (seed + 1) vectorDimension (channelDimension - 1) chan

Note that we compute and use the `hash` of the produced vector, for forcing its real computation at run-time, otherwise the Haskell run-time will send only a thunk on the channel, instead of the real vector.

This is the consumer:

-- | Slowly consume values.
slowConsumer :: Int -> OneChan RandomBV -> IO ()
slowConsumer fuzzyNormalizer chan = do
  mbv <- readFromOneChan chan
  case mbv of
    Nothing
      -> do return ()
    Just bv
      -> do let r = slowComputationOnVector bv fuzzyNormalizer
            putStrLn $ "consumer: " ++ show r
            performMinorGC
            performMajorGC
            slowConsumer fuzzyNormalizer chan

Finally this is the coordinator launching the two process:

-- | Test the channels.
--   NOTE: a seed is passed as argument, so the compiler can not optimize the code in advance.
testChannels :: OneChanType -> Int -> Int -> Int -> Int -> IO ()
testChannels chanType seed fuzzyNormalizer vectorDimension channelDimension = do
  chan :: OneChan RandomBV
    <- case chanType of
          OneDefaultChan
            -> ChanDefault <$> Chan.newChan
          OneBoundedChan
            -> ChanBounded <$> BChan.newBoundedChan 2
          OneMVarChan
            -> ChanMVar <$> newEmptyMVar

  _ <- concurrently
         (fastProducer seed vectorDimension channelDimension chan)
         (slowConsumer fuzzyNormalizer chan)
  return ()

This is the used RAM, simulating 10 vectors (not too much) of 8MB each: the unbounded channel uses 3x times more memory, but simulating only 10 elements. If we put more elements in the channel the difference can be arbitrary higher.

Using unbounded (default) channels:
              75 MB total memory in use (1 MB lost due to fragmentation)
Using bounded channels: 
              27 MB total memory in use (0 MB lost due to fragmentation)
Using a single MVar: 
              18 MB total memory in use (0 MB lost due to fragmentation)

The unbounded channel is filled fast from the producer, and consumed slowly from the consumer:

prof test channels default 123456 1000000 10 +RTS -N2 -s -p -h (Thu Aug 31 12:27 2017) bytes seconds 0 5M 10M 15M 20M 25M 30M 35M 40M 45M 50M 55M 60M 65M 0 20 40 60 80 100 120 140 160 180 200 220 240 260 280 300 320 (24743)basicUnsafeNew/basicU... (trace elements)

The showud peak depends from the number of elements. In this case only 10. Instead in case of the bounded channel the peak is constant, and it depends only from the capacity of the channel.

prof test channels bounded 123456 1000000 10 +RTS -N2 -s -p -h (Thu Aug 31 12:32 2017) bytes seconds 0 2M 4M 6M 8M 10M 12M 14M 16M 18M 20M 22M 0 20 40 60 80 100 120 140 160 180 200 220 240 260 280 300 320 340 (24749)basicUnsafeNew/basicU... (trace elements)

In case of the MVar the RAM usage behaviour is the most regular, because it can contains only one element:

prof test channels mvar 123456 1000000 10 +RTS -N2 -s -p -h (Thu Aug 31 12:38 2017) bytes seconds 0 1M 2M 3M 4M 5M 6M 7M 8M 9M 10M 11M 12M 13M 14M 15M 0 20 40 60 80 100 120 140 160 180 200 220 240 260 280 300 320 340 360 (24746)basicUnsafeNew/basicU... (trace elements)

Note that the bounded channel degenerates to a MVar case after it reaches its bound, because from this point the producer will generate only one value at a time: i.e when the consumer will free a slot in the channel. This can not be optimal for caching, and context switches. Probably an ideal solution is a channel with a guarantee max bound, but that is filled again from the producer only when it decreases to 50% of its capacity. So it is filled in batched mode.

2.1.1 Derived pattern

  1. Intent

    You have a producer and consumer process, working concurrently/in-parallel.

  2. Solution

    Make them communicate using a shared channel, but making sure that the channel is bounded.

2.2 Lazy evaluation breaks resource management

Resource management is vital for robust production code. If too much file descriptors, database connections, or network sockets are left open, the system became unreliable. So resources must be acquired, used, and released in predictable way. But lazy evaluation renders the run-time behaviour more obscure and unpredictable, because there can be pending thunks instead of fully evaluated values.

Haskell uses `bracket` like functions for acquiring and releasing resources in a safe way. But current tested versions of `bracket` are buggy when not fully evaluated thunks are returned. In this example:

readFileContent1 :: FilePath -> IO LText.Text
readFileContent1 fileName
  = bracket
      (openFile fileName ReadMode)
      hClose
      (\handle -> do content <- LText.hGetContents handle
                     return $ toUpperCase content)
printFileContent1 :: FilePath -> IO () 
printFileContent1 fileName = do
  c <- readFileContent1 fileName
  putStrLn $ LText.unpack c
  • the acquired resource is the file descriptor
  • the computation done using the resource is the reading of the file content
  • the result is a not fully evaluated thunk, because `Data.Text.Lazy.readFileContent` read the file content chunk by chunk, and only on demand when really needed

What is the behaviour of Haskell run-time in this case?

  • if the resource is left open until the thunk is fully evaluated, we have a potential resource leak, because we have no guarantee at run-time of when (or if) the thunk will be fully evaluated
  • if the resource is closed before the thunk is fully evaluated, there will be an exception because during thunk evaluation the resource is not any more acquired

Up to date it is the worst possible behaviour: the resource is closed before thunk evaluation, but no exceptions is raised during thunk evaluation, and instead the file is considered as empty. So it is clearly a bug (already signaled).

Obviously this is a rather artificial example, but it can appear also in real world code, because there can be parts of a result managed from the run-time as not fully evaluated thunks.

2.2.1 The solution

I created a safe version of bracket, returning a fully evaluated value:

-- | Fully evaluate the returned result, intercepting and rethrowing all exceptions.
--   If during exception management, the resource `release` produce errors,
--   then only the parent exception of the `compute` function is rethrowned.
--   NOTE: returning a fully evaluated value assures that the resources can be
--   released correctly, because it is not any more needed.
safeBracket
  ::  forall m a b c . (MonadMask m, NFData c)
  => m a
  -- ^ acquire the resource
  -> (Maybe SomeException -> a -> m b)
  -- ^ release the resource at the end of normal result computation, or in case of an exception.
  -> (a -> m c)
  -- ^ compute the result
  -> m c
safeBracket acquire release compute = do
  !resource <- acquire
  withException
    (do !r <- force <$!> compute resource
        release Nothing resource
        return r)
    (\exc -> do release (Just exc) resource
                throw exc)

The drawbacks are that computations in two distinct brackets can not be fused together and optimized aggressively from the compiler, because they are two distinct and atomic computations. But this behaviour is in the spirit of safe resource management, where parallel resource acquisition is seen as a resource leak and not an optimization.

These are the differences executing the code using `bracket` and `safeBracket`:

File content using bad bracket: 


File content using safe bracket: 
THE MORNINNG HAS GOLD IN IT'S MOUTH
THE MORNINNG HAS GOLD IN IT'S MOUTH
THE MORNINNG HAS GOLD IN IT'S MOUTH

2.2.2 Derived pattern

  1. Intent

    You have a computation using resources.

  2. Solution

    Enclose the computation inside `safeBracket`. It will act like a predictable transactions: when the value is required the resource is acquired, the code executed completely, and the resource will be closed immediately.

2.3 Channels with unevaluated values

Haskell channels can contain unevaluated thunks, instead of fully evaluated values. But this is not nice because:

  • the receiving thread had to use its resources (CPU, and memory) for evaluating a thunk, and this complicate the reasoning about sharing of work between threads
  • the thunk can need resources like database connections, file handles, and so on, allocated on the producer service, it can not be evaluated on the consumer side
  • if the application switch to remote services, the network channel can only transmit fully evaluated values

So the best approach is sending always fully evaluated values. Obviously a fully evaluated value can be boxed in a pointer, so its transmission will be very cheap. The important thing it is that the value does not require any further evaluation on the receiver side. This simple library ensure this:

-- | A channel of fully evaluated (NFData) values.
newtype Chan a = Chan (BChan.BoundedChan (Maybe a))

-- | A channel bounded to number of cores + 1.
--   The channel is bounded so a producer faster than consumers
--   does not waste the memory filling the channel with new values to process.
newChan :: IO (Chan a)
newChan = do
  dim <- getNumCapabilities
  Chan <$> BChan.newBoundedChan (dim + 1)

{-# INLINE writeChan #-}
-- | Fully evaluate the value, and write on the channel.
writeChan :: (NFData a) => Chan a -> a -> IO ()
writeChan (Chan ch) v = BChan.writeChan ch (Just $ force v)

{-# INLINE writeEOFChan #-}
-- | When the producer send this command, it informs consumers that there are no any more
--   interesting values.
writeEOFChan :: Chan a -> IO ()
writeEOFChan (Chan ch) = BChan.writeChan ch Nothing

{-# INLINE readChan #-}
-- | Read the next value.
--   Nothing implies that no new values should be put on the channel.
readChan :: Chan a -> IO (Maybe a)
readChan (Chan ch) = BChan.readChan ch

There are no tests, but probably they can be similar to the `safeBrakect` case.

2.4 Forcing strict evaluation

This function applies some passages of random number generation using the elements of an array as parameters. It uses `foldl'` that is a strict (and fast) version of `foldl`, and it works on boxed vectors, that is one of the fastest Haskell data structures. If converted to imperative code, it is a series of simple loops. So it should be fast code.

slowComputationOnVector2
    :: RandomBV
    -> Int
    -- ^ add some noise to the calcs, so the compiler can not optimize the code too much
    -- @require this > 1000
    -> Int
slowComputationOnVector2 bv fuzzyNormalizer
  = let g0 = mkStdGen (fuzzyNormalizer * 5)

        nextR :: Int -> (Int, StdGen) -> (Int, StdGen)
        nextR 0 (x1, g1) = (x1, g1)
        nextR n (x1, g1) = nextR (n - 1) (random g1)

        maxRandRepetition :: Int
        maxRandRepetition = 20

        advance :: (Int, StdGen) -> Int -> (Int, StdGen)
        advance (s1, g1) x1
          = let (x2, g2) = nextR (x1 `mod` maxRandRepetition) (x1, g1)
            in  ((s1 + x2) `mod` fuzzyNormalizer, g2)

    in  fst $ BV.foldl' advance (0, g0) bv

These are the benchmarks using a vector of 100,000 integers:

benchmarking processVectorsBV
time                 3.125 s    (2.994 s .. 3.221 s)
                     1.000(0.999 R² .. 1.000)
mean                 3.143 s    (3.123 s .. 3.157 s)
std dev              21.78 ms   (0.0 s .. 24.85 ms)
variance introduced by outliers: 19% (moderately inflated)

processVectorsBV result: 3695476
46 MB total memory in use (0 MB lost due to fragmentation)

But by default Haskell code is lazy code. So the internal loops of this function are managed as thunks, also if in our case we want direct/strict computation.

Many optimizations in Haskell are easy to write. In our case it suffices to add bang patterns, forcing a value to be calculated in a strict way. These are the "new" internal expressions:

nextR :: Int -> (Int, StdGen) -> (Int, StdGen)
nextR 0 (!x1, !g1) = (x1, g1)
nextR n (!x1, !g1) = nextR (n - 1) (random g1)

maxRandRepetition :: Int
maxRandRepetition = 20

advance :: (Int, StdGen) -> Int -> (Int, StdGen)
advance (!s1, !g1) !x1
  = let (!x2, !g2) = nextR (x1 `mod` maxRandRepetition) (x1, g1)
    in  ((s1 + x2) `mod` fuzzyNormalizer, g2)

The effects at run-time are puzzling: the strict version uses 5x less RAM, but it is slightly slower.

benchmarking processVectorsBV
time                 3.665 s    (3.407 s .. 3.919 s)
                     0.999(0.998 R² .. 1.000)
mean                 3.625 s    (3.571 s .. 3.663 s)
std dev              58.44 ms   (0.0 s .. 67.01 ms)
variance introduced by outliers: 19% (moderately inflated)

processVectorsBV result: 3695476
9 MB total memory in use (4 MB lost due to fragmentation)

It is not really a bug, but a warning about the fact that GHC is not always good at infering when internal code is strict or not.

2.4.1 Derived pattern

  1. Intent

    Having elegant but efficient Haskell code.

  2. Solution

    First write elegant code. Then use faster data structures. Add strict annotations in internal expressions, if you are sure that you are not interested to benefits of lazy evaluation. Profile the effect of each change.

    Usually these optimizations are easy to apply to an existing Haskell code-base, and the code remain readable and elegant.

2.5 Profiling and benchmarking are distinct operations

For optimizing Haskell code you need a profiler, because developers can not easily predict where and why their code is slow. This is more true for an high-level language as Haskell, with a lot of compilation transformation pass, and a complex run-time.

The good news are that often the optimization of Haskell code involves very minimal changes: adding strict annotations, using alternative (but compatible) data structures, and so on. The static typing system reduces a lot the refactoring effort, and often the new code is immediately correct.

Adding profiling instrumentation to the code, changes its run-time behaviour, and micro benchmarks done with Criterion are not any more reliable. So profiling and benchmarking are two distinct operations, from an operational (not logical) point of view.

These are the build options of the profiling instrumented version of the application:

.PHONY : prg-prof
prg-prof:
  stack build threads-post:exe:prof --executable-profiling --library-profiling  
executable prof
  ghc-options: -O2 -threaded -fprof-auto -rtsopts

This is a synthetic benchmark testing effects of fusion on list and streams:

benchmarking list initial warm-up evaluation - not consider
time                 257.0 ms   (223.5 ms .. 312.4 ms)
                     0.980(0.897 R² .. 1.000)
mean                 251.3 ms   (240.8 ms .. 266.7 ms)
std dev              16.16 ms   (2.199 ms .. 20.72 ms)
variance introduced by outliers: 17% (moderately inflated)

benchmarking fusion list with foldl
time                 43.63 ms   (43.57 ms .. 43.68 ms)
                     1.000(1.000 R² .. 1.000)
mean                 43.64 ms   (43.61 ms .. 43.66 ms)
std dev              41.82 μs   (28.46 μs .. 61.55 μs)

benchmarking fused list with foldl
time                 15.93 ms   (15.92 ms .. 15.94 ms)
                     1.000(1.000 R² .. 1.000)
mean                 15.93 ms   (15.92 ms .. 15.94 ms)
std dev              26.19 μs   (18.35 μs .. 37.92 μs)

benchmarking fusion list with foldl'
time                 40.80 ms   (40.50 ms .. 41.36 ms)
                     1.000 R²   (0.999 R² .. 1.000 R²)
mean                 40.66 ms   (40.51 ms .. 41.02 ms)
std dev              380.5 μs   (45.48 μs .. 603.7 μs)

benchmarking fused list with foldl'
time                 16.18 ms   (16.14 ms .. 16.25 ms)
                     1.000(0.999 R² .. 1.000)
mean                 16.17 ms   (16.15 ms .. 16.26 ms)
std dev              107.5 μs   (30.96 μs .. 204.7 μs)

benchmarking fusion list with foldr
time                 162.9 ms   (157.2 ms .. 174.0 ms)
                     0.997(0.992 R² .. 1.000)
mean                 155.1 ms   (139.2 ms .. 159.4 ms)
std dev              9.849 ms   (960.6 μs .. 14.80 ms)
variance introduced by outliers: 13% (moderately inflated)

benchmarking fused list with foldr
time                 32.89 ms   (31.03 ms .. 34.33 ms)
                     0.987(0.968 R² .. 0.995)
mean                 31.74 ms   (30.04 ms .. 33.08 ms)
std dev              3.280 ms   (2.424 ms .. 4.402 ms)
variance introduced by outliers: 45% (moderately inflated)

benchmarking io-stream
time                 90.16 ms   (86.94 ms .. 94.61 ms)
                     0.998(0.992 R² .. 1.000)
mean                 92.29 ms   (91.05 ms .. 94.38 ms)
std dev              2.415 ms   (1.162 ms .. 3.061 ms)

benchmarking fused io-stream
time                 35.34 ms   (35.00 ms .. 35.65 ms)
                     1.000(0.999 R² .. 1.000)
mean                 34.60 ms   (34.38 ms .. 34.85 ms)
std dev              461.3 μs   (396.1 μs .. 510.9 μs)

fusion list with foldl: 8338764493050923690
fusion list with foldr: 8338764493050923690
fused list with foldl: 8338764493050923690
fused list with foldr: 8338764493050923690
io-stream: 8338764493050923690
fused io-stream: 8338764493050923690

Note: this benchmark is executed without activating at run-time the profiling options.

This benchmark shows that GHC is not applying automatic fusion on lists, because the hand-fused version is at least 3x faster. But this is a lie, introduced by the GHC profiler. Benchmarking the production code tells us that the compiler applies correctly fusion.

2.5.1 Derived pattern

  1. Intent

    Writing faster versions of an application.

  2. Solution

    Use GHC profiler for macro optimizations, and Criterion for micro optimizations, but not both at the same time.

    If you want discover why/where an application is slow, use a version of the application compiled with GHC profiling options. It influences the run-time behaviour, but its information is useful for discovering what your application is doing. Note: compile and execute always with `-O` or `-O2` options, for profiling the real application.

    If you want discover which version of a function is faster, you had to benchmark using Criterion on a no-profiling-instrumented code. So on the same code you want to use in production.

    If you want optimize concurrent and parallel code consider also code instrumented for ThreadScope.

3 Passed tests

3.1 Using streams for local IO processing

Suppose you have data in the IO world (e.g. the content of a big file, the result of a DB query, network events), that you want process in chunks, using a constant amount of memory. Stream oriented libraries like io-streams and pipes ensure that elements are generated and processed in a predictable way (usually one at a time). On the contrary lists and other data structures as LazyByteStrings, can exhibits space leaks problems, due to unpredictable lazy evaluation behaviour at run-time.

There is a minimal overhead in accessing stream elements, so they are not good for processing big data and for number-crunching. In these cases you can still import data using a stream, but the internal transformations can be done using other methods (vectors, parallel code, etc..), and then you can still use streams for exporting the transformed data to the IO world.

When transformations are not very complex, you can use streams also for internal intermediate transformation passages, like in the next example. Moreover streams can be easily combined with parsers and other data transformations libraries, so they have a wide range of application.

The majority of stream libraries do not support fusion of intermediate streams. This breaks some of the assumptions of elegant Haskell code: you can reuse and compose functions, being sure that the compiler will fuse/optimize intermediate passages. But in case of streams, the lack of fusion is not serious like in case of lists or vectors, because streams are never fully materialized. So you are repeating operations on one element, without fully materialization of the intermediate streams. The next benchmark will confirm that the problem is negligible in case of normal chain of streams, and it can became important only in case of very long chain of streams (unlikely) or of heavy number-crunching/parallel code (but this is not the domain of streams in any case). By the way: it seems that there is some work on supporting fusion also on streams: see https://twanvl.nl/blog/haskell/streaming-vector and http://www.yesodweb.com/blog/2016/02/first-class-stream-fusion.

This is the code transforming a list content in the sum of only the odd elements, multiplied by 2.

fusionOnListFoldl :: [Int] -> Int
fusionOnListFoldl l = L.foldl (+) 0 $ map (* 2) $ filter odd l

fusionOnListFoldl' :: [Int] -> Int
fusionOnListFoldl' l = L.foldl' (+) 0 $ map (* 2) $ filter odd l

fusionOnListFoldr :: [Int] -> Int
fusionOnListFoldr l = L.foldr (+) 0 $ map (* 2) $ filter odd l

fusedCalcOnFoldl :: Int -> Int -> Int
fusedCalcOnFoldl s x
  = case odd x of
      True -> s + (x * 2)
      False -> s
{-# INLINE fusedCalcOnFoldl #-}

fusedCalcOnFoldr :: Int -> Int -> Int
fusedCalcOnFoldr x s = fusedCalcOnFoldl s x
{-# INLINE fusedCalcOnFoldr #-}

fusedOnListFoldl' :: [Int] -> Int
fusedOnListFoldl' l = L.foldl' fusedCalcOnFoldl 0 l

fusedOnListFoldl :: [Int] -> Int
fusedOnListFoldl l = L.foldl fusedCalcOnFoldl 0 l

fusedOnListFoldr :: [Int] -> Int
fusedOnListFoldr l = L.foldr fusedCalcOnFoldr 0 l

This is the equivalent code using Streams:

fusionOnStream :: [Int] -> IO Int
fusionOnStream l = do
  inS1 <- S.fromList l
  inS2 <- S.filter odd inS1
  inS3 <- S.map (* 2) inS2
  S.fold (+) 0 inS3

fusedOnStream :: [Int] -> IO Int
fusedOnStream l = do
  inS1 <- S.fromList l
  S.fold fusedCalcOnFoldl 0 inS1

These are the benchmarks:

benchmarking list initial warm-up evaluation - not consider
time                 192.4 ms   (190.3 ms .. 193.7 ms)
                     1.000(1.000 R² .. 1.000)
mean                 195.3 ms   (194.1 ms .. 196.6 ms)
std dev              1.654 ms   (1.391 ms .. 1.883 ms)
variance introduced by outliers: 14% (moderately inflated)

benchmarking fusion list with foldl
time                 14.31 ms   (14.29 ms .. 14.32 ms)
                     1.000(1.000 R² .. 1.000)
mean                 14.30 ms   (14.29 ms .. 14.34 ms)
std dev              34.73 μs   (11.53 μs .. 67.81 μs)

benchmarking fused list with foldl
time                 13.87 ms   (13.84 ms .. 13.95 ms)
                     1.000(1.000 R² .. 1.000)
mean                 13.86 ms   (13.85 ms .. 13.91 ms)
std dev              63.44 μs   (13.77 μs .. 126.0 μs)

benchmarking fusion list with foldl'
time                 13.94 ms   (13.92 ms .. 13.95 ms)
                     1.000 R²   (1.000 R² .. 1.000 R²)
mean                 13.94 ms   (13.93 ms .. 13.94 ms)
std dev              15.54 μs   (9.801 μs .. 21.46 μs)

benchmarking fused list with foldl'
time                 14.34 ms   (14.27 ms .. 14.46 ms)
                     1.000(0.999 R² .. 1.000)
mean                 14.34 ms   (14.32 ms .. 14.41 ms)
std dev              97.36 μs   (27.98 μs .. 186.7 μs)

benchmarking fusion list with foldr
time                 28.30 ms   (26.25 ms .. 30.18 ms)
                     0.984(0.974 R² .. 0.994)
mean                 26.51 ms   (24.98 ms .. 27.56 ms)
std dev              2.595 ms   (1.880 ms .. 3.348 ms)
variance introduced by outliers: 43% (moderately inflated)

benchmarking fused list with foldr
time                 26.08 ms   (25.13 ms .. 26.85 ms)
                     0.995(0.990 R² .. 0.998)
mean                 25.56 ms   (24.52 ms .. 26.55 ms)
std dev              2.143 ms   (1.469 ms .. 3.125 ms)
variance introduced by outliers: 35% (moderately inflated)

benchmarking io-stream
time                 20.66 ms   (20.63 ms .. 20.68 ms)
                     1.000(1.000 R² .. 1.000)
mean                 20.67 ms   (20.64 ms .. 20.70 ms)
std dev              65.90 μs   (47.04 μs .. 100.5 μs)

benchmarking fused io-stream
time                 17.86 ms   (17.76 ms .. 17.94 ms)
                     1.000(0.999 R² .. 1.000)
mean                 17.79 ms   (17.73 ms .. 17.86 ms)
std dev              156.2 μs   (90.87 μs .. 252.9 μs)

fusion list with foldl: 8338764493050923690
fusion list with foldr: 8338764493050923690
fused list with foldl: 8338764493050923690
fused list with foldr: 8338764493050923690
io-stream: 8338764493050923690
fused io-stream: 8338764493050923690

Observe that:

  • the difference between the lists fused from the compiler, and the hand-coded version is near 0, so GHC is effectively transforming elegant code (reusing already defined functions) in efficient code (like the hand-coded version)
  • the chain of io-stream is only 25% slower than the hand-fused io-stream version, so not very important in practice, also because usually io-streams process data to/from the IO monad, and so the external actions will dominate the small io-stream overhead
  • in this example io-streams are from 25% to 50% slower than lists, so they have more than acceptable speed for their usage scenario
  • curiously in this example `foldl` and `foldl'` have near the same speed, while `foldr` is 2x slower, but these are only micro benchmarks, and all can change in other examples

3.1.1 Derived pattern

  1. Intent

    Writing robust code for processing data from/to IO world.

  2. Solution

    Streams are fast, and their overhead is negligible respect the involved IO operations. Stream processing is robust because they guarantee that only one element at a time is generated and processed. So there is no waste of memory and space leaks. On the contrary lists are prone to this type of problems.

3.2 Using channels for concurrent interaction between services

In Haskell you can map a local Service or Actor to a thread. The thread can receive/send events using a shared channel.

Channels are not very fast because they must support sharing and synchronization between different threads. We will test their speed against io-streams and vectors.

This code applies the already known function `fuseCalcOnFoldl` on all the elements of an unboxed vector of `Int`. It is one of the fastest possible operations in Haskell, because it access directly the memory, without any overhead. An operation like this can only be executed locally from a thread, for processing internal data:

sumOnVector :: BV.Vector Int -> Int
sumOnVector l = BV.foldl' fusedCalcOnFoldl 0 l

This code performs the same operation, but using an io-stream instead of a vector. Every time we are receiving data from the external IO world (e.g. reading a file, receiving the answer of a query from a DBMS), we should use a stream, because it guarantees nice chunk-by-chunk processing of data, with constant memory usage. Note that many io-stream based libraries send data grouped in vector-like chunks, so in practice the processing of this data is not far from vector processing.

sumOnStream :: BV.Vector Int -> IO Int
sumOnStream l = do
  inS1 <- S.fromVector l
  S.fold fusedCalcOnFoldl 0 inS1

This code performs the same operation, but using a channel instead of an io-stream. The channel is more robust in case of concurrent access, because it guarantees fair hold/resumption of threads. So it is also a mechanism for scheduling threads. A channel can support two or more concurrently threads. On the contrary an io-stream is only a unidirectional connection.

sumOnChan :: BV.Vector Int -> IO Int
sumOnChan l = do
  ch <- newChan

  (_, r) <- concurrently
              (do BV.mapM_ (writeChan ch) l
                  writeEOFChan ch)
              (sumOnChan' ch 0)
  return r
 where

   sumOnChan' ch !s = do
     mx <- readChan ch
     case mx of
       Nothing -> return s
       Just x -> sumOnChan' ch (fusedCalcOnFoldl s x)

This code performs the same operation, but using an Unagi-chan that is a channel implemented using lock-free x86 instructions (so it is not portable to other CPUs). It can connect different threads (unlike io-strems), but I didn't tested if its scheduling policies are fair like default channels.

sumOnUnagi :: BV.Vector Int -> IO Int
sumOnUnagi l = do
  (inCh, outCh) <- UStream.newChan

  (_, r) <- concurrently
              (do BV.mapM_ (\i -> UStream.writeChan inCh (Just i)) l
                  UStream.writeChan inCh Nothing)
              (sumOnChan' outCh 0)
  return r
 where

   sumOnChan' ch !s = do
     mx <- UStream.readChan ch
     case mx of
       Nothing -> return s
       Just x -> sumOnChan' ch (fusedCalcOnFoldl s x)

These are the benchmarks:

benchmarking list initial warm-up evaluation - not consider
time                 1.014 ms   (1.013 ms .. 1.014 ms)
                     1.000(1.000 R² .. 1.000)
mean                 1.014 ms   (1.014 ms .. 1.015 ms)
std dev              2.073 μs   (934.7 ns .. 4.031 μs)

benchmarking sumOnVector
time                 11.86 ms   (11.85 ms .. 11.87 ms)
                     1.000(1.000 R² .. 1.000)
mean                 11.85 ms   (11.84 ms .. 11.85 ms)
std dev              18.02 μs   (13.67 μs .. 23.35 μs)

benchmarking sumOnStream
time                 90.26 ms   (87.45 ms .. 91.44 ms)
                     0.999(0.996 R² .. 1.000)
mean                 90.61 ms   (89.30 ms .. 91.07 ms)
std dev              1.260 ms   (136.8 μs .. 2.054 ms)

benchmarking sumOnChan
time                 1.194 s    (1.149 s .. 1.284 s)
                     0.999(0.998 R² .. 1.000)
mean                 1.160 s    (1.149 s .. 1.183 s)
std dev              19.61 ms   (0.0 s .. 19.62 ms)
variance introduced by outliers: 19% (moderately inflated)

benchmarking sumOnUnagi
time                 57.81 ms   (57.49 ms .. 58.14 ms)
                     1.000(1.000 R² .. 1.000)
mean                 57.37 ms   (57.19 ms .. 57.52 ms)
std dev              292.7 μs   (218.2 μs .. 390.3 μs)

sumOnVector: 8338764493050923690
sumOnStream: 8338764493050923690
sumOnChan: 8338764493050923690
sumOnUnagi: 8338764493050923690

The io-stream is 10x times slower than the vector. But for processing IO data it makes sense, expecially if the data is received chunked inside vectors. And considering that we are comparing against the fastest possible vector, it is indeed a lot lot faster.

The channel is 10x times slower than the io-stream. It is a big difference, but we must remember that a channel offers also fair scheduling services between threads. So in reality here we are testing also the hold and resume of two threads for 500K times. So the time is not so bad.

The Unagi channel is 2x faster than the io-stream, and 20x times faster than default channels. Probably for IO bound operations this does not matter. It can be useful for threads exchanging calculations on big-data, but in these cases it is also useful transferring data in batches, because accessing values batched inside a vector is still 60x times faster than accessing an Unagi channel, and many big-data computations occurs naturally in batches.

3.2.1 Derived pattern

  1. Intent

    Process data in an efficient and robust way.

  2. Solution

    If the data is exchanged between threads, use channels because they guarantee fair scheduling between threads.

    If the data is received from a resource in the IO world (or is sent to IO world), use streams, because they guarantee fair/constant usage of memory.

    If the data is internal to thread, and the computation is local (it is done from the thread for answering to service requests), process it using vector-like operations.

3.3 Using threads for concurrent Tasks

A concurrent Task is a sequence of operations that can interact with the IO world and/or other threads, and that can be executed concurrently with other tasks.

3.3.1 Task mapped to Operating System threads

In traditional operating systems, tasks are implemented mapping them to threads. But context switch between threads is extremely slow on current CPUs. Threads have also some memory overhead.

Using OS threads for managing tasks does not scale well.

3.3.2 Tasks managed from services, using a queue of events

A task can be managed directly from a Service, but each IO interaction can not block the Service, because it must serve also other clients. So the IO requests are managed as events put in an event queue, and they are resumed when the answer is arrived.

The code structured in this way is a lot faster (e.g. NGINX event based model vs Apache thread model), but it is not readable, because we haven't any more the explicit sequence of actions of the Task. They are spread inside the Service, and the events in the queue.

3.3.3 Tasks mapped to Haskell internal threads

Haskell threads are very cheap: they uses very few memory, and the context switch between a thread to another has a cost comparable to a function call. So there can be thousands of active threads in an Haskell application.

When a service receives a complex request, it can create a thread for the task to accomplish. The code is comprehensible because it is a sequence of commands. The code is also fast because Haskell threads are cheap. The code is concurrent because the thread can interact with the IO world using MVars and channels, and it will be put in hold and resumed in an efficient and transparent way.

So Haskell threads joins the benefits of events (fast concurrent code), and of threads (readable sequence of instructions).

3.3.4 Benchmarks

We will process the elements of a vector, using a thread for each element, and a status MVar containing the current partial sum, and the number of the remaining elements to sum. When the count of elements to sum will reach 0, the last thread will write the final result on the MVar dedicated to the final result. The main thread will be resumed and it will return the result:

sumUsingTasks :: BV.Vector Int -> IO Int
sumUsingTasks l = do

  finalResult :: MVar Int <- newEmptyMVar

  currentState :: MVar (Int, Int) <- newEmptyMVar

  nrOfElements <- BV.foldM (\i x -> do forkIO $ sumTask x finalResult currentState
                                       return $ i + 1) 0 l

  putMVar currentState (0, nrOfElements)

  takeMVar finalResult

 where

   sumTask :: Int -> MVar Int -> MVar (Int, Int) -> IO ()
   sumTask x finalResult currentState = do
     (s1, leftSums) <- takeMVar currentState
     let !currentState2@(!s2, !leftSums2) = (fusedCalcOnFoldl s1 x, leftSums - 1)
     case leftSums2 of
       0 -> putMVar finalResult s2
       _ -> putMVar currentState currentState2 

This is a rather stressing algorithm:

  • there is a thread for each element of the vector (500K in our case)
  • they are all in hold on the status MVar
  • they will be resumed one time, they will update the sum, and then terminate

These are the benchmarks:

benchmarking list initial warm-up evaluation - not consider
time                 1.010 ms   (1.009 ms .. 1.011 ms)
                     1.000(1.000 R² .. 1.000)
mean                 1.011 ms   (1.011 ms .. 1.012 ms)
std dev              2.687 μs   (2.224 μs .. 3.333 μs)

benchmarking sumOnChan
time                 1.198 s    (1.156 s .. 1.237 s)
                     1.000(NaN R² .. 1.000)
mean                 1.223 s    (1.212 s .. 1.229 s)
std dev              9.798 ms   (0.0 s .. 10.65 ms)
variance introduced by outliers: 19% (moderately inflated)

benchmarking sumUsingTasks
time                 4.002 s    (3.405 s .. 4.490 s)
                     0.997(0.991 R² .. 1.000)
mean                 3.937 s    (3.827 s .. 4.007 s)
std dev              105.7 ms   (0.0 s .. 121.7 ms)
variance introduced by outliers: 19% (moderately inflated)

sumOnChan: 8338764493050923690
sumUsingTasks: 8338764493050923690
1289 MB total memory in use (0 MB lost due to fragmentation)

The `sumOnChan` version using only two threads (a producer and a consumer) and a shared channel receiving 500K elements, required a little more than 1s. The version using 500K concurrent threads required 4s, but 1.2GB of RAM.

The benchmark shows that Haskell can scale to an high number of threads, and so we can map each active task to a thread.

3.3.5 Derived pattern

  1. Intent

    Process complex requests (Tasks) using readable but efficient code.

  2. Solution

    Map each active request to a Task that is a sequence of (concurrent) actions, and implement the task using an Haskell thread.

4 General guidelines

These are words without code… :-)

4.1 The mandatory book

"Parallel and Concurrent Programming in Haskell" by Simon Marlow is a very comprehensive and readable book on the subject. IMHO is a mandatory starting point, before writing any concurrent code.

4.2 Sane libraries

Haskell accumulated with years different ways for managing exceptions and threads.

safe-exceptions and async libraries simplifies a lot of things, and they are designed and tested for production environments.

4.3 The right concurrent programming paradigm

The concurrent paradigm to use must be chosen during the initial design phase, according the characteristics of the problem domain.

For example in a bank application the optimistic approach (STM) wins because the majority of concurrent transactions will be on distinct bank accounts, and you must only manage the rare conflicts. On the contrary for a working queue the pessimistic approach (MVar, non-blocking updates, etc..) is better, because the concurrent threads will try to access the shared data structure mainly at the same time, and conflicts will be the norm, not the exception. In case of distributed process acting like services, an approach like the Erlang Actors Model is good, but not for parallel code working on nested parallel numeric data structures. And so on…

4.4 External tools

Some concurrent problems are already solved from external services/tools:

  • a DBMS manages concurrent transactions "for free"
  • a big-data engine compiles data processing code into efficient parallel code

4.5 Prototype

Haskell supports many (all) concurrent paradigms, but maybe not at the same quality level. For example it supports light threads and Software Transactional Memory in a wonderful way, but probably the actor model is better supported in Erlang. Moreover some libraries are written initially for being included in scientific papers, and they can lack some refinements necessary for being used hassle-free in real world production environments.

So take time to test them before committing blindly. In any case the tools without any problem are the exceptions and not the norm in IT, so sorry for the FUD :-)

4.6 The Haskell run-time

For writing reasonable code from the beginning, and understanding the profiler hints, one must have a general idea of the Haskell run-time. The main concepts are:

  • thunk: instead of the result of an expression (a value), a thunk contains a call to the code calculating the expression result
  • lazy evaluation: an expression is evaluated only when/if it is really needed. So initially an expression is represented by a thunk. The first time its value is needed, the thunk is replaced with the result of the code. Subsequent evaluation of the same thunk returns directly the result of the thunk, without recalculating it.
  • strict evaluation: an expression is immediately calculated, without using a thunk, bypassing all the lazy evaluation machinery
  • not fully evaluated value: a value can be composed of parts (other values), and each value can be represented as a thunk pointing to the code calculating it. Note that all the lazy evaluation concepts can be applied to values, also because values are expressions.
  • pull model: if an IO action, or a strict expression, requires the value of a thunk, then the thunk is evaluated calling the corresponding code. If the thunk contains other thunks, these thunks are recursively evaluated. Think to a graph where each node is a thunk. You select a thunk, evaluate it, and all connected thunks are evaluated. It is a sort of evaluation on demand, that is starting from the consumer of values, and not in advance from the producer. If only a subset of the graph is (momentary) required, then only a part of the graph will be evaluated. The pull model can represent also values. You can have part of a value not fully evaluated/expanded, and you expand the graph representing the value on demand. For many types of applications (e.g. reactive programming, nested data type analysis) the pull model is the right model, and it is given for free from the Haskell run-time. For other applications it is not the right model, and you had to fight for removing its inefficiencies.
  • NFData/Normal Form: a fully evaluated value, without sub parts expressed by thunks
  • Weak Head Normal Form (WHNF): a value with its first constructor (its initial part) fully evaluated, e.g. in `Just <some-unevaluated-thunk>` we have the `Just` part fully evaluated (we know that it is not Nothing), but the sub parts can be still unevaluated.
  • unboxed value: a value represented directly, without any indirection. For example an unboxed vector of `Int`, is a vector containing directly the `Int`, and not pointer/references/thunks to `Int`. Pros: their access is fast. Cons: they can not be lazily evaluated (pull model) or shared.
  • boxed value: a value stored as a thunk/reference to it, instead of directly.
  • fusion: expressions generating intermediate data structures are transformed from the compiler in equivalent expressions, not generating all of them. Haskell libraries can specify rewrite rules, applying these high-level optimizations. This increase the reuse and usefulness of code, because you can reuse small functions for composing bigger expressions, knowing that the compiler will transform and optimize the code.
  • Haskell thread: calling/resuming a thread in Haskell has the cost of a function call, so they can be used like events in other languages, because they have a very low overhead.
  • OS thread: a native thread of the operating system. Context switch is very heavy, and there can not be many threads of this type without saturating the system. Note that performances of modern CPU suffers a lot in case of context switch (10x/100x slower), so usually high performing applications starts a thread for each core, and then manages their operations as events (e.g. Apache web server vs NGINX).
  • foreign function call: call to a function in C. Often Haskell types had to be converted to C format and vice-versa, so it can be costly respect native Haskell functions.

5 Conclusions

Haskell simplifies a lot concurrent programming respect traditional imperative programming languages, but there can be minor problems in the libraries, requiring some care.

6 About

I'm using Haskell in production, for rating VoIP calls, but I'm not a true expert of concurrent programming, so feel free to fork and extend this document. The source code of this document is here, on GitHub. It is released under BSD2 license. It is written in Babel, under Emacs Org mode, and it contains live code, that is executed every time the document is generated.

The included code is tested on

flags: {}
packages:
- .
extra-deps:
- unagi-chan-0.4.0.0
resolver: lts-9.2

and in particular

The Glorious Glasgow Haskell Compilation System, version 8.0.2

Anything can change/improve with new versions of Stackage and GHC.

Author: Massimo Zaniboni

Created: 2017-09-03 dom 17:37

Validate