This post explores how we can implement & mimic a simple streaming mechanism / functionality by just making use of lazy IO in Haskell.

More specifically, we explore the use of unsafeInterleaveIO can be used to implement stream manipulation code.

In computer science, a stream is a sequence of data elements made available over time. A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.

— Wikipedia on Streams

We make use of a typical data-pipeline scenario where we have a source, some transformations & a sink, where the data after transformations end up.

  • A Source produces data for the stream
  • Transformations work on that data produced by the source & transforms it in useful ways (there can be a series of transformations)
  • A Sink acts as an end point for our stream data, transforming it into a final form when required.

The streaming approach allows us to make data go through the entire pipe-line, incrementally, in small piecemeal parts, collecting it’s results as they happen in real-time. It works very well for distributed and large data-systems, where it’s neither feasible nor possible to load all data at once, and process it in a waterfall-model like fashion.

This is in contrast with the batch approach, where we load all the data or large batches of fixed size all at once, then process them, in a serial fashion - step 1 with all data, step 2 with all data… - and so on.

When Everything is pure

When everything is pure, we don’t need to do anything special.

When each piece of the pipeline & transformations looks at only a small portion of our stream we don’t have to hold the entire stream in memory all at once, the (pure) code executes in a streaming fashion by default here.

ghc/ghci seems to know how to optimize this

for example:

import Debug.Trace
import Data.Function ((&))

source :: [Int]
source = map (\x -> trace ("Source: " <> show x) x) [1..5]

odds :: [Int] -> [Int]
odds = filter (\x -> trace ("filter: " <> show x) $ odd x)

sink :: [Int] -> Int
sink [] = 0
sink (x:xs) = trace ("sink: " <> show x) (x + sink xs)

main :: IO ()
main = do
  let res = source & odds & sink -- our little pipeline
  print res

main
Source: 1
filter: 1
sink: 1
Source: 2
filter: 2
Source: 3
filter: 3
sink: 3
Source: 4
filter: 4
Source: 5
filter: 5
sink: 5
9

Tracing the above code, we can see it sort of happens in a stream-like fashion. First, only the element, integer 1, is produced by the source, which then goes to filter and then finally to the sink.

Note:- source did not produce the entire list at once, just the first element, which then went to transformations & then to sink, in a stream-like fashion

After that finishes, only then we start to work with the next data element, the integer 2. which again starts from the source, then to filter, which it doesn’t make through (since its not odd), thus it never reaches the sink, as can be see in the output below.

We didn’t need to do anything special there, but what happens when you throw impure code in the mix?

When stuff gets impure

When it gets impure, we need to take some extra steps.

The impure example

Our source, filter step & sink here impure functions, they perform some IO. It could be a network call, a server request, getting current time or file directory path it doesn’t matter, when some (or all) pieces of our pipeline perform some side-effects

-- The source is not pure now, as can be seen from it's the type
source :: IO [Int]
source = go 1
  where
    go 6 = pure []
    go x = do
      -- our source does some side-effect in producing data
      -- it could be a network call, a server request, getting current time etc.
      -- we just use a simple printing statement to elucidate the fact.
      putStrLn $ "Source: " <> show x
      xs <- go (x + 1)
      pure (x : xs)

-- The filter also isn't pure anymore
odds :: [Int] -> IO [Int]
odds [] = pure []
odds (x : xs) = do
  putStrLn $ "filter: " <> show x
  xs' <- odds xs
  pure $
    if odd x
      then x : xs'
      else xs'

-- The sink is also impure
sink :: [Int] -> IO Int
sink [] = pure 0
sink (x : xs) = do
  putStrLn $ "Sink: " <> show x
  (x +) <$> sink xs

main :: IO ()
main = do
  r <- source >>= odds >>= sink
  print r

main
Source: 1      -- completely serial results NO!!!! 🥵
Source: 2
Source: 3
Source: 4
Source: 5
filter: 1
filter: 2
filter: 3
filter: 4
filter: 5
Sink: 1
Sink: 3
Sink: 5
9

You can observe the results here, firs the entire input to source is processed first, then the entire input is filtered and then, as you could’ve guessed, they go to the sink, in a sequential fashion, which is stark opposite of our pure example above.

The result above is due to the semantics of the do block, which executes each statement line-by-line, completely before moving on to the next, as a result of which, for eg., our source function reads its entire input, before returning.

This is not something we want when we are processing streams - to process / hold the entire thing at once. and do that for all steps of the pipeline.

The impure example with fixed semnatics.

This, as it turns out, is pretty easy to correct.

we make use of unsafeInterleaveIO and guard our recursive IO calls with it. What it does it makes them lazy, we delay or defer those calls till they are needed, they are lazily evaluated when required, pretty much like the head and tail of a list, where the tail is evaluated only when required.

Fixed version: -

import System.IO.Unsafe

source :: IO [Int]
source = go 1
  where
    go 6 = pure []
    go x = do
      putStrLn $ "Source: " <> show x
      xs <- unsafeInterleaveIO $ go (x + 1) -- we guard our calls with unsafeInterleaveIO this time
      pure (x : xs)

odds :: [Int] -> IO [Int]
odds [] = pure []
odds (x : xs) = do
  putStrLn $ "filter: " <> show x
  xs' <- unsafeInterleaveIO $ odds xs -- likewise
  pure $
    if odd x
      then x : xs'
      else xs'

sink :: [Int] -> IO Int
sink [] = pure 0
sink (x : xs) = do
  putStrLn $ "Sink: " <> show x
  (x +) <$> unsafeInterleaveIO (sink xs) -- likewise

main :: IO ()
main = do
  r <- source >>= odds >>= sink
  print r

main
Source: 1     -- back to streaming again 😎
filter: 1
Sink: 1
Source: 2
filter: 2
Source: 3
filter: 3
Sink: 3
Source: 4
filter: 4
Source: 5
filter: 5
Sink: 5
9

Voilà, back to the streaming behavior with our pipeline again!

This happened because we understood how do-block semantics worked and therefore how making use of unsafeInterleaveIO gives us lazy IO, which we in-turn use to mimic pretty basic Streaming like functionality.

Conclusion

We saw how we can implement & mimic a basic streaming mechanism in haskell using nothing but simple lazy IO. The important take-aways here seem to be:

  • To use functions that process only a small part of the stream at a time,

so we don’t have to hold the entire thing in-memory

  • Nothing special needs to be done for pure functions.
  • To guard our recursive calls to IO (impure) functions with unsafeInterleaveIO

Write Soon, -Arjun