val data : int []

Full name: Document.data
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val incr : cell:int ref -> unit

Full name: Microsoft.FSharp.Core.Operators.incr
module Array

from Microsoft.FSharp.Collections
val length : array:'T [] -> int

Full name: Microsoft.FSharp.Collections.Array.length
val raise : exn:System.Exception -> 'T

Full name: Microsoft.FSharp.Core.Operators.raise
type bool = System.Boolean

Full name: Microsoft.FSharp.Core.bool
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
type ResizeArray<'T> = System.Collections.Generic.List<'T>

Full name: Microsoft.FSharp.Collections.ResizeArray<_>
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
type 'T array = 'T []

Full name: Microsoft.FSharp.Core.array<_>
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Array.sum
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id

Java 8 Streams

through the lens of OCaml/F#

Clash of the Lambdas

Clash of the Lambdas ICOOOLPS'14 arxiv

Performance Benchmarks

Sum (windows)

Sum

Sum (linux)

Sum

Sum of squares (windows)

Sum Squares

Sum of squares (linux)

Sum Squares

Sum of even squares (windows)

Sum Even Squares

Sum of even squares (linux)

Sum Even Squares

Cartesian product (windows)

Cartesian product

Cartesian product (linux)

Cartesian product

Java 8 very fast

What makes Java 8 faster?

Streams!

Typical Pipeline Pattern

1: 
source |> inter |> inter |> inter |> terminal
  • inter : intermediate operations, e.g. map, filter
  • terminal : produces result or side-effects, e.g. reduce, iter

OCaml Streams example

1: 
2: 
3: 
4: 
5: 
6: 
7: 
// let (|>) a f = f a
let data = [| 1..10000000 |] 
data
|> Stream.ofArray
|> Stream.filter (fun i -> i % 2 = 0) 
|> Stream.map (fun i -> i * i) 
|> Stream.sum 

OCaml Streams

Stream is pulling

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
type Stream<'T> = unit -> (unit -> 'T)

val ofArray : 'T[] -> Stream<'T>
let ofArray values =  
   fun () -> 
       let index = ref -1
       (fun () ->
           incr index
           if !index < Array.length values then 
               f values.[!index])
           else
               raise StreamEnd)

Simple functions

1: 
2: 
3: 
4: 
5: 
val map : ('T -> 'R) -> Stream<'T> -> Stream<'R> 
let map f stream = 
   fun () ->
       let next = stream ()
       fun () -> f (next ())

Filter is recursive!

1: 
2: 
3: 
4: 
5: 
6: 
7: 
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T> 
let filter p stream = 
   let rec loop v next =
       if p v then v else loop (next ()) next
   fun () -> 
       let next = stream ()
       fun () -> loop (next ()) next

Simple functions

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
 val iter : ('T -> unit) -> Stream<'T> -> unit
 let iter f stream
    let rec loop v next =
        f v; loop (next ()) next
    let next = stream ()
    try
        loop (next ()) next
    with StreamEnd -> ()


val sum : Stream<'T> -> int
let sum stream = 
    let sum = ref 0
    iter (fun v -> sum := !sum + v) stream
    !sum

Simple functions

1: 
2: 
3: 
4: 
5: 
val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R> 
let zip stream stream' = 
   fun () ->
       let next, next' = stream (), stream' ()
       fun () -> (next (), next' ())

flatMap is tricky!

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R> 
let flatMap f stream = 
   let current = ref None
   fun () ->
       let next = stream ()
       fun () -> 
           let rec loop () =
               match !current with
               | None -> 
                   current := Some (f (next ()))
                   loop ()
               | Some next' -> 
                   try 
                       next' () 
                   with StreamEnd -> 
                       current := f (next ())
                       loop ()
           loop ()

Java 8 Streams are pushing

1: 
2: 
3: 
4: 
Stream.OfArray(data)
      .filter(i -> i % 2 == 0) 
      .map(i -> i * i) 
      .sum();   

The source is pushing data down the pipeline.

How does it work?

Starting from foreach

1: 
2: 
3: 
4: 
val iter : ('T -> unit) -> 'T[] -> unit
let iter f values = 
    for value in values do
        f value

Flip the args

1: 
'T[] -> ('T -> unit) -> unit

Stream!

1: 
2: 
3: 
4: 
5: 
6: 
type Stream<'T> = ('T -> unit) -> unit

val ofArray : 'T[] -> Stream<'T>
let ofArray values k = 
    for value in values do
        k value

Let's make us some (simple) Streams!

Simple functions

1: 
2: 
3: 
4: 
5: 
type Stream = ('T -> unit) -> unit

val map : ('T -> 'R) -> Stream<'T> -> Stream<'R> 
let map f stream = 
    fun k -> stream (fun v -> k (f v))

Simple functions

1: 
2: 
3: 
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T> 
let filter f stream = 
    fun k -> stream (fun v -> if f v then k v else ()) 

Simple functions

1: 
2: 
3: 
4: 
5: 
val sum : Stream<'T> -> int
let sum stream = 
    let sum = ref 0
    stream (fun v -> sum := !sum + v)
    !sum

flatMap is easy

1: 
2: 
3: 
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R> 
let flatMap f stream = 
    fun k -> stream (fun v -> let stream' = f v in stream' k)

What about zip?

1: 
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>

Zip needs to synchronise the flow of values.

Zip needs to pull!

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
type Stream<'T>     = ('T -> unit) -> unit
type StreamPull<'T> = unit -> (unit -> 'T)

val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream = ???

val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R>
let zip stream stream' = 
    let pullStream, pullStream' = toPull stream, toPull stream'
    let next, next' = pullStream (), pullStream' () 
    fun k ->
        try
            while true do
                k (next (), next' ())
        with StreamEnd -> ()

Streams can push and pull

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
/// Provides functions for iteration
type Iterable = {
    Bulk : unit -> unit 
    TryAdvance : unit -> bool 
}

/// Represents a Stream of values.
type Stream<'T> = Stream of ('T -> unit) -> Iterable

ofArray

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
val ofArray : 'T[] -> Stream<'T>
let ofArray values = 
    fun k ->
        let bulk () =
            for value in values do
                k value

        let index = ref -1
        let tryAdvance () =
            incr index;
            if !index < Array.length values then 
                (k values.[!index])
                true
            else
                false
        { Builk = bulk; TryAdvance = tryAdvance  }

toPull

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream = 
    fun () ->
        let current = ref None
        let { Bulk = _; TryAdvance = next } = stream (fun v -> current := v)
        fun () ->
            let rec loop () =
                if next () then
                    match !current with
                    | Some v ->
                        current := None
                        v
                    | None -> loop ()
                else raise StreamEnd
            loop ()

toPull - Revised

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream = 
    fun () ->
        let buffer = new ResizeArray<'T>()
        let { Bulk = _; TryAdvance = next } = stream (fun v -> buffer.Add(v))
        let index = ref -1
        fun () ->
            let rec loop () =
                incr index
                if !index < buffer.Count then
                    buffer.[!index]
                else
                    buffer.Clear()
                    index := -1
                    if next () then
                        loop ()
                    else raise StreamEnd
            loop ()

Gotcha!

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
 let pull = 
    [|1..10|]
    |> Stream.ofArray
    |> Stream.flatMap (fun _ -> Stream.infinite)
    |> Stream.toPull

let next = pull () 
next () // OutOfMemory Exception

The Streams library

Implements a rich set of operations

More examples

Parallel Streams

1: 
2: 
3: 
4: 
5: 
6: 
let data = [| 1..10000000 |]
data
|> ParStream.ofArray
|> ParStream.filter (fun x -> x % 2 = 0)
|> ParStream.map (fun x -> x * x)
|> ParStream.sum

ParStream

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
type ParStream<'T> = (unit -> ('T -> unit)) -> unit

val ofArray : 'T[] -> ParStream<'T>
let ofArray values = 
    fun thunk -> 
        let forks = 
            values 
            |> partitions 
            |> Array.map (fun p -> (p, thunk ())) 
            |> Array.map (fun (p, k) -> fork p k)
        join forks  

ParStream

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
type ParStream<'T> = (unit -> ('T -> unit)) -> unit

val map : ('T -> 'R) -> ParStream<'T> -> ParStream<'R> 
let map f stream = 
    fun thunk ->
        stream (fun () ->
                    let k = thunk ()
                    (fun v -> k (f v)))

ParStream

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
type ParStream<'T> = (unit -> ('T -> unit)) -> unit

val sum : ParStream<'T> -> int 
let sum stream = 
    let array = new ResizeArray<int ref>()
    stream (fun () ->
                let sum = ref 0
                array.Add(sum)
                (fun v -> sum := sum + v)
            )
    array |> Array.map (fun sum -> !sum) |> Array.sum

Some Benchmarks

i7 8 x 3.7 Ghz (4 physical), 6 GB RAM

Sum of Squares

Sum of Squares

Sum of Even Squares

Sum of Even Squares

Cartesian Product

Cartesian

Parallel Sum of Squares

Parallel Sum

Parallel Sum of Squares Even

Parallel Sum

Parallel Cartesian

Parallel Cartesian

Cloud Streams!

Example: a word count

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
cfiles
|> CloudStream.ofCloudFiles CloudFile.ReadLines
|> CloudStream.collect Stream.ofSeq 
|> CloudStream.collect (fun line -> splitWords line |> Stream.ofArray)
|> CloudStream.filter wordFilter
|> CloudStream.countBy id
|> CloudStream.sortBy (fun (_,c) -> -c) count
|> CloudStream.toCloudArray

https://github.com/mbraceproject/MBrace.Streams

Streams are lightweight and powerful

In sequential, parallel and distributed flavors

Beauty and the Beast

Write beautiful functional code with the performance of imperative code.

Stream fusion in Haskell

Stream fusion http://code.haskell.org/~dons/papers/icfp088-coutts.pdf

Stream fusion in Haskell

Haskell beats C http://research.microsoft.com/en-us/um/people/simonpj/papers/ndp/haskell-beats-C.pdf

Stream fusion in Haskell

HERMIT http://www.ittc.ku.edu/~afarmer/concatmap-pepm14.pdf

Stream operations are non-recursive

In principal, can be always fused (in-lined).

Not always done by F#/OCaml compilers.

Experiments with MLton

by @biboudis

https://github.com/biboudis/sml-streams

MLton appears to always be fusing.

GitHub

https://github.com/nessos/Streams

NuGet

https://www.nuget.org/packages/Streams/

Slides and samples

https://github.com/palladin/StreamsPLSeminar

Thank you!

Questions?