val data : int []
Full name: Document.data
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val incr : cell:int ref -> unit
Full name: Microsoft.FSharp.Core.Operators.incr
module Array
from Microsoft.FSharp.Collections
val length : array:'T [] -> int
Full name: Microsoft.FSharp.Collections.Array.length
val raise : exn:System.Exception -> 'T
Full name: Microsoft.FSharp.Core.Operators.raise
type bool = System.Boolean
Full name: Microsoft.FSharp.Core.bool
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
union case Option.None: Option<'T>
union case Option.Some: Value: 'T -> Option<'T>
type ResizeArray<'T> = System.Collections.Generic.List<'T>
Full name: Microsoft.FSharp.Collections.ResizeArray<_>
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []
Full name: Microsoft.FSharp.Collections.Array.map
type 'T array = 'T []
Full name: Microsoft.FSharp.Core.array<_>
val sum : array:'T [] -> 'T (requires member ( + ) and member get_Zero)
Full name: Microsoft.FSharp.Collections.Array.sum
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
Java 8 Streams
through the lens of OCaml/F#
Clash of the Lambdas
ICOOOLPS'14
arxiv
Performance Benchmarks
Sum (windows)
Sum (linux)
Sum of squares (windows)
Sum of squares (linux)
Sum of even squares (windows)
Sum of even squares (linux)
Cartesian product (windows)
Cartesian product (linux)
What makes Java 8 faster?
Typical Pipeline Pattern
1:
|
source |> inter |> inter |> inter |> terminal
|
- inter : intermediate operations, e.g. map, filter
- terminal : produces result or side-effects, e.g. reduce, iter
OCaml Streams example
1:
2:
3:
4:
5:
6:
7:
|
// let (|>) a f = f a
let data = [| 1..10000000 |]
data
|> Stream.ofArray
|> Stream.filter (fun i -> i % 2 = 0)
|> Stream.map (fun i -> i * i)
|> Stream.sum
|
OCaml Streams
Stream is pulling
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
|
type Stream<'T> = unit -> (unit -> 'T)
val ofArray : 'T[] -> Stream<'T>
let ofArray values =
fun () ->
let index = ref -1
(fun () ->
incr index
if !index < Array.length values then
f values.[!index])
else
raise StreamEnd)
|
Simple functions
1:
2:
3:
4:
5:
|
val map : ('T -> 'R) -> Stream<'T> -> Stream<'R>
let map f stream =
fun () ->
let next = stream ()
fun () -> f (next ())
|
Filter is recursive!
1:
2:
3:
4:
5:
6:
7:
|
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T>
let filter p stream =
let rec loop v next =
if p v then v else loop (next ()) next
fun () ->
let next = stream ()
fun () -> loop (next ()) next
|
Simple functions
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
val iter : ('T -> unit) -> Stream<'T> -> unit
let iter f stream
let rec loop v next =
f v; loop (next ()) next
let next = stream ()
try
loop (next ()) next
with StreamEnd -> ()
val sum : Stream<'T> -> int
let sum stream =
let sum = ref 0
iter (fun v -> sum := !sum + v) stream
!sum
|
Simple functions
1:
2:
3:
4:
5:
|
val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R>
let zip stream stream' =
fun () ->
let next, next' = stream (), stream' ()
fun () -> (next (), next' ())
|
flatMap is tricky!
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
|
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R>
let flatMap f stream =
let current = ref None
fun () ->
let next = stream ()
fun () ->
let rec loop () =
match !current with
| None ->
current := Some (f (next ()))
loop ()
| Some next' ->
try
next' ()
with StreamEnd ->
current := f (next ())
loop ()
loop ()
|
Java 8 Streams are pushing
1:
2:
3:
4:
|
Stream.OfArray(data)
.filter(i -> i % 2 == 0)
.map(i -> i * i)
.sum();
|
The source is pushing data down the pipeline.
Starting from foreach
1:
2:
3:
4:
|
val iter : ('T -> unit) -> 'T[] -> unit
let iter f values =
for value in values do
f value
|
Flip the args
1:
|
'T[] -> ('T -> unit) -> unit
|
Stream!
1:
2:
3:
4:
5:
6:
|
type Stream<'T> = ('T -> unit) -> unit
val ofArray : 'T[] -> Stream<'T>
let ofArray values k =
for value in values do
k value
|
Let's make us some (simple) Streams!
Simple functions
1:
2:
3:
4:
5:
|
type Stream = ('T -> unit) -> unit
val map : ('T -> 'R) -> Stream<'T> -> Stream<'R>
let map f stream =
fun k -> stream (fun v -> k (f v))
|
Simple functions
1:
2:
3:
|
val filter : ('T -> bool) -> Stream<'T> -> Stream<'T>
let filter f stream =
fun k -> stream (fun v -> if f v then k v else ())
|
Simple functions
1:
2:
3:
4:
5:
|
val sum : Stream<'T> -> int
let sum stream =
let sum = ref 0
stream (fun v -> sum := !sum + v)
!sum
|
flatMap is easy
1:
2:
3:
|
val flatMap : ('T -> Stream<'R>) -> Stream<'T> -> Stream<'R>
let flatMap f stream =
fun k -> stream (fun v -> let stream' = f v in stream' k)
|
What about zip?
1:
|
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>
|
Zip needs to synchronise the flow of values.
Zip needs to pull!
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
type Stream<'T> = ('T -> unit) -> unit
type StreamPull<'T> = unit -> (unit -> 'T)
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream = ???
val zip : Stream<'T> -> Stream<'R> -> Stream<'T * 'R>
let zip stream stream' =
let pullStream, pullStream' = toPull stream, toPull stream'
let next, next' = pullStream (), pullStream' ()
fun k ->
try
while true do
k (next (), next' ())
with StreamEnd -> ()
|
Streams can push and pull
1:
2:
3:
4:
5:
6:
7:
8:
|
/// Provides functions for iteration
type Iterable = {
Bulk : unit -> unit
TryAdvance : unit -> bool
}
/// Represents a Stream of values.
type Stream<'T> = Stream of ('T -> unit) -> Iterable
|
ofArray
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
|
val ofArray : 'T[] -> Stream<'T>
let ofArray values =
fun k ->
let bulk () =
for value in values do
k value
let index = ref -1
let tryAdvance () =
incr index;
if !index < Array.length values then
(k values.[!index])
true
else
false
{ Builk = bulk; TryAdvance = tryAdvance }
|
toPull
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream =
fun () ->
let current = ref None
let { Bulk = _; TryAdvance = next } = stream (fun v -> current := v)
fun () ->
let rec loop () =
if next () then
match !current with
| Some v ->
current := None
v
| None -> loop ()
else raise StreamEnd
loop ()
|
toPull - Revised
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
|
val toPull : Stream<'T> -> StreamPull<'T>
let toPull stream =
fun () ->
let buffer = new ResizeArray<'T>()
let { Bulk = _; TryAdvance = next } = stream (fun v -> buffer.Add(v))
let index = ref -1
fun () ->
let rec loop () =
incr index
if !index < buffer.Count then
buffer.[!index]
else
buffer.Clear()
index := -1
if next () then
loop ()
else raise StreamEnd
loop ()
|
Gotcha!
1:
2:
3:
4:
5:
6:
7:
8:
|
let pull =
[|1..10|]
|> Stream.ofArray
|> Stream.flatMap (fun _ -> Stream.infinite)
|> Stream.toPull
let next = pull ()
next () // OutOfMemory Exception
|
The Streams library
Implements a rich set of operations
Parallel Streams
1:
2:
3:
4:
5:
6:
|
let data = [| 1..10000000 |]
data
|> ParStream.ofArray
|> ParStream.filter (fun x -> x % 2 = 0)
|> ParStream.map (fun x -> x * x)
|> ParStream.sum
|
ParStream
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
|
type ParStream<'T> = (unit -> ('T -> unit)) -> unit
val ofArray : 'T[] -> ParStream<'T>
let ofArray values =
fun thunk ->
let forks =
values
|> partitions
|> Array.map (fun p -> (p, thunk ()))
|> Array.map (fun (p, k) -> fork p k)
join forks
|
ParStream
1:
2:
3:
4:
5:
6:
7:
8:
|
type ParStream<'T> = (unit -> ('T -> unit)) -> unit
val map : ('T -> 'R) -> ParStream<'T> -> ParStream<'R>
let map f stream =
fun thunk ->
stream (fun () ->
let k = thunk ()
(fun v -> k (f v)))
|
ParStream
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
|
type ParStream<'T> = (unit -> ('T -> unit)) -> unit
val sum : ParStream<'T> -> int
let sum stream =
let array = new ResizeArray<int ref>()
stream (fun () ->
let sum = ref 0
array.Add(sum)
(fun v -> sum := sum + v)
)
array |> Array.map (fun sum -> !sum) |> Array.sum
|
Some Benchmarks
i7 8 x 3.7 Ghz (4 physical), 6 GB RAM
Sum of Squares
Sum of Even Squares
Cartesian Product
Parallel Sum of Squares
Parallel Sum of Squares Even
Parallel Cartesian
Cloud Streams!
Example: a word count
1:
2:
3:
4:
5:
6:
7:
8:
|
cfiles
|> CloudStream.ofCloudFiles CloudFile.ReadLines
|> CloudStream.collect Stream.ofSeq
|> CloudStream.collect (fun line -> splitWords line |> Stream.ofArray)
|> CloudStream.filter wordFilter
|> CloudStream.countBy id
|> CloudStream.sortBy (fun (_,c) -> -c) count
|> CloudStream.toCloudArray
|
https://github.com/mbraceproject/MBrace.Streams
Streams are lightweight and powerful
In sequential, parallel and distributed flavors
Beauty and the Beast
Write beautiful functional code with the performance of imperative code.
Stream operations are non-recursive
In principal, can be always fused (in-lined).
Not always done by F#/OCaml compilers.