val data : int64 []

Full name: Document.data
module Array

from Microsoft.FSharp.Collections
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
Multiple items
val int64 : value:'T -> int64 (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int64

--------------------
type int64 = System.Int64

Full name: Microsoft.FSharp.Core.int64

--------------------
type int64<'Measure> = int64

Full name: Microsoft.FSharp.Core.int64<_>
module Seq

from Microsoft.FSharp.Collections
val filter : predicate:('T -> bool) -> source:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.filter
val i : int64
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val sum : source:seq<'T> -> 'T (requires member ( + ) and member get_Zero)

Full name: Microsoft.FSharp.Collections.Seq.sum
val iter : f:('a -> unit) -> values:seq<'a> -> unit

Full name: Document.iter
val f : ('a -> unit)
val values : seq<'a>
val value : 'a
Multiple items
val seq : sequence:seq<'T> -> seq<'T>

Full name: Microsoft.FSharp.Core.Operators.seq

--------------------
type seq<'T> = System.Collections.Generic.IEnumerable<'T>

Full name: Microsoft.FSharp.Collections.seq<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
type bool = System.Boolean

Full name: Microsoft.FSharp.Core.bool
val id : x:'T -> 'T

Full name: Microsoft.FSharp.Core.Operators.id

Prelude

Amstrad CPC 6128 cpc code

Google data center Linq

F# Streams

A lightweight F#/C# library for efficient functional-style pipelines on streams of data.

About Me

About Nessos

  • ISV based in Athens, Greece
  • .NET experts
  • Open source F# projects
    • MBrace
    • FsPickler, Vagrant, LinqOptimizer, GpuLinq and of course Streams
  • @anirothan, @krontogiannis, @eiriktsarpalis

https://github.com/nessos

Motivation

Make functional data query pipelines FAST

LinqOptimizer

An automatic query optimizer-compiler for Sequential and Parallel LINQ.

https://github.com/nessos/LinqOptimizer

LinqOptimizer

  • compiles LINQ queries into fast loop-based imperative code
  • speedups of up to 15x

Example

The query

var query = (from num in nums.AsQueryExpr()
             where num % 2 == 0
             select num * num).Sum();

compiles to

int sum = 0;
for (int index = 0; index < nums.Length; index++)
{
   int num = nums[index];
   if (num % 2 == 0)
      sum += num * num;
}

Disadvantages

  • Runtime compilation
    • Overhead (mitigated by caching)
    • Emitting IL not cross-platform (e.g. security restrictions in cloud, mobile)
    • Access to private fields/methods?
  • Problematic F# support
  • New operations => compiler changes

Should become a Roslyn compile time plugin in future

Clash of the Lambdas

Clash of the Lambdas ICOOOLPS'14 arxiv

Performance Benchmarks

Sum (windows)

Sum

Sum (linux)

Sum

Sum of squares (windows)

Sum Squares

Sum of squares (linux)

Sum Squares

Sum of even squares (windows)

Sum Even Squares

Sum of even squares (linux)

Sum Even Squares

Cartesian product (windows)

Cartesian product

Cartesian product (linux)

Cartesian product

Java 8 very fast

LinqOptimizer improving F#/C# performance

What makes Java 8 faster?

Streams!

Typical Pipeline Pattern

1: 
source |> inter |> inter |> inter |> terminal
  • inter : intermediate (lazy) operations, e.g. map, filter
  • terminal : produces result or side-effects, e.g. reduce, iter

Seq example

1: 
2: 
3: 
4: 
5: 
let data = [| 1..10000000 |] |> Array.map int64
data
|> Seq.filter (fun i -> i % 2L = 0L) //lazy
|> Seq.map (fun i -> i + 1L) //lazy
|> Seq.sum //eager, forcing evaluation

Seq is pulling

1: 
2: 
3: 
4: 
5: 
let data = [| 1..10000000 |] |> Array.map int64
data
|> Seq.filter (fun i -> i % 2L = 0L) //lazy inter
|> Seq.map (fun i -> i + 1L) //lazy inter
|> Seq.sum //eager terminal, forcing evaluation

The terminal is pulling data from the pipeline via IEnumerator.Current and IEnumerator.MoveNext()

With Streams

1: 
2: 
3: 
4: 
5: 
let data = [| 1..10000000 |] |> Array.map int64
Stream.ofArray data //source
|> Stream.filter (fun i -> i % 2L = 0L) //lazy
|> Stream.map (fun i -> i + 1L) //lazy
|> Stream.sum //eager, forcing evaluation

Streams are pushing!

Streams are pushing

1: 
2: 
3: 
4: 
Stream.ofArray data //source
|> Stream.filter (fun i -> i % 2L = 0L) //lazy
|> Stream.map (fun i -> i + 1L) //lazy
|> Stream.sum //eager, forcing evaluation

The source is pushing data down the pipeline.

How does it work?

Starting from Seq.iter

1: 
2: 
3: 
4: 
// iter : ('T -> unit) -> seq<'T> -> unit
let iter f values = 
    for value in values do
        f value

Flip the args

1: 
seq<'T> -> ('T -> unit) -> unit

Stream!

1: 
2: 
3: 
4: 
5: 
6: 
type Stream<'T> = ('T -> unit) -> unit

// iter : seq<'T> -> Stream<'T>
let iter values f = 
    for value in values do
        f value

Continuation passing style!

Let's make us some (simple) Streams!

Simple Streams

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
type Stream = ('T -> unit) -> unit

// map : ('T -> 'R) -> Stream<'T> -> Stream<'R> 
let map f stream = 
    fun k -> stream (fun v -> k (f v))

// filter : ('T -> bool) -> Stream<'T> -> Stream<'T> 
let filter f stream = 
    fun k -> stream (fun v -> if f v then k v else ()) 

// length : Stream<'T> -> int
let length stream = 
    let counter = ref 0
    stream (fun _ -> counter := counter.Value + 1)
    counter.Value

When to stop pushing?

1: 
type Stream = ('T -> unit) -> unit

Stopping push required for e.g.

1: 
Stream.takeWhile : ('T -> bool) -> Stream<'T> -> Stream<'T>

Stopping push

Change

1: 
type Stream = ('T -> unit) -> unit

to

1: 
2: 
3: 
4: 
5: 
type Stream = ('T -> bool) -> unit

// takeWhile : ('T -> bool) -> Stream<'T> -> Stream<'T>
let takeWhile f stream =
    fun k -> stream (fun v -> if f v then k v else false)

What about zip?

1: 
Stream.zip : Stream<'T> -> Stream<'S> -> Stream<'T * 'S>

Zip needs to synchronise the flow of values.

Zip needs to pull!

Streams can push and pull

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
/// Provides functions for iteration
type Iterable<'T> = {
    Bulk : unit -> unit 
    TryAdvance : unit -> bool 
}

/// Represents a Stream of values.
type Stream<'T> = Stream of ('T -> bool) -> Iterable<'T>

The Streams library

Implements a rich set of operations

More examples

Parallel Streams

1: 
2: 
3: 
4: 
5: 
6: 
let data = [| 1..10000000 |] |> Array.map int64
data
|> ParStream.ofArray
|> ParStream.filter (fun x -> x % 2L = 0L)
|> ParStream.map (fun x -> x + 1L)
|> ParStream.sum

Some Benchmarks

i7 8 x 3.7 Ghz (4 physicall), 6 GB RAM

Sum

Sum

Sum of Squares

Sum of Squares

Sum of Even Squares

Sum of Even Squares

Cartesian Product

Cartesian

Parallel Sum of Squares

Parallel Sum

Parallel Sum of Squares Even

Parallel Sum

Parallel Cartesian

Parallel Cartesian

Cloud Streams!

Example: a word count

1: 
2: 
3: 
4: 
5: 
6: 
7: 
8: 
cfiles
|> CloudStream.ofCloudFiles CloudFile.ReadLines
|> CloudStream.collect Stream.ofSeq 
|> CloudStream.collect (fun line -> splitWords line |> Stream.ofArray |> Stream.map wordTransform)
|> CloudStream.filter wordFilter
|> CloudStream.countBy id
|> CloudStream.sortBy (fun (_,c) -> -c) count
|> CloudStream.toCloudArray

Streams are lightweight and powerful

In sequential, parallel and distributed flavors

The Holy Grail is in reach

Write function pipelines with the performance of imperative code.

Stream fusion in Haskell

Stream fusion http://code.haskell.org/~dons/papers/icfp088-coutts.pdf

Almost

Depends on the compiler's ability to inline.

Inlining continuations = stream fusion

Stream operations are non-recursive

In principal, can be always fused (in-lined).

Not always done by F# compiler.

Experiments with MLton

by @biboudis

https://github.com/biboudis/sml-streams

MLton appears to always be fusing.

Can we make the F# compiler smarter?

GitHub

https://github.com/nessos/Streams

NuGet

https://www.nuget.org/packages/Streams/

Slides and samples

https://github.com/palladin/StreamsPresentation

Thank you!

Questions?