val data : int64 []
Full name: Document.data
module Array
from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []
Full name: Microsoft.FSharp.Collections.Array.map
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int64
--------------------
type int64 = System.Int64
Full name: Microsoft.FSharp.Core.int64
--------------------
type int64<'Measure> = int64
Full name: Microsoft.FSharp.Core.int64<_>
module Seq
from Microsoft.FSharp.Collections
val filter : predicate:('T -> bool) -> source:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Collections.Seq.filter
val i : int64
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>
Full name: Microsoft.FSharp.Collections.Seq.map
val sum : source:seq<'T> -> 'T (requires member ( + ) and member get_Zero)
Full name: Microsoft.FSharp.Collections.Seq.sum
val iter : f:('a -> unit) -> values:seq<'a> -> unit
Full name: Document.iter
val f : ('a -> unit)
val values : seq<'a>
val value : 'a
Multiple items
val seq : sequence:seq<'T> -> seq<'T>
Full name: Microsoft.FSharp.Core.Operators.seq
--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>
Full name: Microsoft.FSharp.Collections.seq<_>
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
type bool = System.Boolean
Full name: Microsoft.FSharp.Core.bool
val id : x:'T -> 'T
Full name: Microsoft.FSharp.Core.Operators.id
F# Streams
A lightweight F#/C# library for efficient functional-style pipelines on streams of data.
About Nessos
- ISV based in Athens, Greece
- .NET experts
- Open source F# projects
- MBrace
- FsPickler, Vagrant, LinqOptimizer, GpuLinq and of course Streams
- @anirothan, @krontogiannis, @eiriktsarpalis
https://github.com/nessos
Motivation
Make functional data query pipelines FAST
LinqOptimizer
- compiles LINQ queries into fast loop-based imperative code
- speedups of up to 15x
Example
The query
var query = (from num in nums.AsQueryExpr()
where num % 2 == 0
select num * num).Sum(); |
compiles to
int sum = 0;
for (int index = 0; index < nums.Length; index++)
{
int num = nums[index];
if (num % 2 == 0)
sum += num * num;
} |
Disadvantages
- Runtime compilation
- Overhead (mitigated by caching)
- Emitting IL not cross-platform (e.g. security restrictions in cloud, mobile)
- Access to private fields/methods?
- Problematic F# support
- New operations => compiler changes
Should become a Roslyn compile time plugin in future
Clash of the Lambdas
ICOOOLPS'14
arxiv
Performance Benchmarks
Sum (windows)

Sum (linux)

Sum of squares (windows)

Sum of squares (linux)

Sum of even squares (windows)

Sum of even squares (linux)

Cartesian product (windows)

Cartesian product (linux)

LinqOptimizer improving F#/C# performance
What makes Java 8 faster?
Typical Pipeline Pattern
1:
|
source |> inter |> inter |> inter |> terminal
|
- inter : intermediate (lazy) operations, e.g. map, filter
- terminal : produces result or side-effects, e.g. reduce, iter
Seq example
1:
2:
3:
4:
5:
|
let data = [| 1..10000000 |] |> Array.map int64
data
|> Seq.filter (fun i -> i % 2L = 0L) //lazy
|> Seq.map (fun i -> i + 1L) //lazy
|> Seq.sum //eager, forcing evaluation
|
Seq is pulling
1:
2:
3:
4:
5:
|
let data = [| 1..10000000 |] |> Array.map int64
data
|> Seq.filter (fun i -> i % 2L = 0L) //lazy inter
|> Seq.map (fun i -> i + 1L) //lazy inter
|> Seq.sum //eager terminal, forcing evaluation
|
The terminal is pulling data from the pipeline via IEnumerator.Current and IEnumerator.MoveNext()
With Streams
1:
2:
3:
4:
5:
|
let data = [| 1..10000000 |] |> Array.map int64
Stream.ofArray data //source
|> Stream.filter (fun i -> i % 2L = 0L) //lazy
|> Stream.map (fun i -> i + 1L) //lazy
|> Stream.sum //eager, forcing evaluation
|
Streams are pushing
1:
2:
3:
4:
|
Stream.ofArray data //source
|> Stream.filter (fun i -> i % 2L = 0L) //lazy
|> Stream.map (fun i -> i + 1L) //lazy
|> Stream.sum //eager, forcing evaluation
|
The source is pushing data down the pipeline.
Starting from Seq.iter
1:
2:
3:
4:
|
// iter : ('T -> unit) -> seq<'T> -> unit
let iter f values =
for value in values do
f value
|
Flip the args
1:
|
seq<'T> -> ('T -> unit) -> unit
|
Stream!
1:
2:
3:
4:
5:
6:
|
type Stream<'T> = ('T -> unit) -> unit
// iter : seq<'T> -> Stream<'T>
let iter values f =
for value in values do
f value
|
Continuation passing style!
Let's make us some (simple) Streams!
Simple Streams
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
|
type Stream = ('T -> unit) -> unit
// map : ('T -> 'R) -> Stream<'T> -> Stream<'R>
let map f stream =
fun k -> stream (fun v -> k (f v))
// filter : ('T -> bool) -> Stream<'T> -> Stream<'T>
let filter f stream =
fun k -> stream (fun v -> if f v then k v else ())
// length : Stream<'T> -> int
let length stream =
let counter = ref 0
stream (fun _ -> counter := counter.Value + 1)
counter.Value
|
When to stop pushing?
1:
|
type Stream = ('T -> unit) -> unit
|
Stopping push required for e.g.
1:
|
Stream.takeWhile : ('T -> bool) -> Stream<'T> -> Stream<'T>
|
Stopping push
Change
1:
|
type Stream = ('T -> unit) -> unit
|
to
1:
2:
3:
4:
5:
|
type Stream = ('T -> bool) -> unit
// takeWhile : ('T -> bool) -> Stream<'T> -> Stream<'T>
let takeWhile f stream =
fun k -> stream (fun v -> if f v then k v else false)
|
What about zip?
1:
|
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>
|
Zip needs to synchronise the flow of values.
Zip needs to pull!
Streams can push and pull
1:
2:
3:
4:
5:
6:
7:
8:
|
/// Provides functions for iteration
type Iterable<'T> = {
Bulk : unit -> unit
TryAdvance : unit -> bool
}
/// Represents a Stream of values.
type Stream<'T> = Stream of ('T -> bool) -> Iterable<'T>
|
The Streams library
Implements a rich set of operations
Parallel Streams
1:
2:
3:
4:
5:
6:
|
let data = [| 1..10000000 |] |> Array.map int64
data
|> ParStream.ofArray
|> ParStream.filter (fun x -> x % 2L = 0L)
|> ParStream.map (fun x -> x + 1L)
|> ParStream.sum
|
Some Benchmarks
i7 8 x 3.7 Ghz (4 physicall), 6 GB RAM
Sum

Sum of Squares

Sum of Even Squares

Cartesian Product

Parallel Sum of Squares

Parallel Sum of Squares Even

Parallel Cartesian

Cloud Streams!
Example: a word count
1:
2:
3:
4:
5:
6:
7:
8:
|
cfiles
|> CloudStream.ofCloudFiles CloudFile.ReadLines
|> CloudStream.collect Stream.ofSeq
|> CloudStream.collect (fun line -> splitWords line |> Stream.ofArray |> Stream.map wordTransform)
|> CloudStream.filter wordFilter
|> CloudStream.countBy id
|> CloudStream.sortBy (fun (_,c) -> -c) count
|> CloudStream.toCloudArray
|
Streams are lightweight and powerful
In sequential, parallel and distributed flavors
The Holy Grail is in reach
Write function pipelines with the performance of imperative code.
Almost
Depends on the compiler's ability to inline.
Inlining continuations = stream fusion
Stream operations are non-recursive
In principal, can be always fused (in-lined).
Not always done by F# compiler.
Can we make the F# compiler smarter?