ddfreyne/d-stream

View on GitHub
README.md

Summary

Maintainability
Test Coverage
# D★Stream

[![Gem version](http://img.shields.io/gem/v/d-stream.svg)](http://rubygems.org/gems/d-stream)
[![Build status](http://img.shields.io/travis/ddfreyne/d-stream.svg)](https://travis-ci.org/ddfreyne/d-stream)
[![Code Climate](http://img.shields.io/codeclimate/github/ddfreyne/d-stream.svg)](https://codeclimate.com/github/ddfreyne/d-stream)
[![Code Coverage](http://img.shields.io/codecov/c/github/ddfreyne/d-stream.svg)](https://codecov.io/github/ddfreyne/d-stream)

_D★Stream_ is a set of extensions for writing stream-processing code in Ruby.

**CAUTION:** D★Stream is work in progress, and pre-alpha quality.

## Examples

### Example 1: straightforward

The following example takes a sequence of events for a given ticket, and calculates the history for that ticket, using slowly changing dimensions:

```ruby
events =
  Enumerator.new do |y|
    y << { id: 40562348, at: Time.now - 400, status: 'new' }
    y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
    y << { id: 40565795, at: Time.now - 250, priority: 'high' }
    y << { id: 40569932, at: Time.now - 100, status: 'solved' }
  end.lazy

S = DStream

history_builder =
  S.compose(
    # calculate new state
    S.scan({}, &:merge),

    # add `version`
    S.zip(1..),
    S.map { |(e, i)| e.merge(version: i) },

    # remove `id`
    S.map { |e| e.except(:id) } },

    # add `valid_to` and `valid_from`, and remove `at`
    S.with_next,
    S.map { |(a, b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
    S.map { |e| e.merge(valid_from: e.fetch(:at)) },
    S.map { |e| e.except(:at) } },

    # add `row_is_current`
    S.with_next,
    S.map { |(a, b)| a.merge(row_is_current: b.nil?) },
  )

history = history_builder.call(events)
history.each { |e| p e }
```

The output is as follows:

```
{
  :status=>"new",
  :valid_from=>2017-05-05 20:18:14 +0200,
  :valid_to=>2017-05-05 20:19:54 +0200,
  :version=>1,
  :row_is_current=>false
}
{
  :status=>"new",
  :assignee_id=>2,
  :valid_from=>2017-05-05 20:19:54 +0200,
  :valid_to=>2017-05-05 20:20:44 +0200,
  :version=>2,
  :row_is_current=>false
}
{
  :status=>"new",
  :assignee_id=>2,
  :priority=>"high",
  :valid_from=>2017-05-05 20:20:44 +0200,
  :valid_to=>2017-05-05 20:23:14 +0200,
  :version=>3,
  :row_is_current=>false
}
{
  :status=>"solved",
  :assignee_id=>2,
  :priority=>"high",
  :valid_from=>2017-05-05 20:23:14 +0200,
  :valid_to=>nil,
  :version=>4,
  :row_is_current=>true
}
```

### Example 2: better factored

This example is functionally identical to the one above, but uses `S.compose` in order to make the final process, `history_builder`, easier to understand.

```ruby
events =
  Enumerator.new do |y|
    y << { id: 40562348, at: Time.now - 400, status: 'new' }
    y << { id: 40564682, at: Time.now - 300, assignee_id: 2 }
    y << { id: 40565795, at: Time.now - 250, priority: 'high' }
    y << { id: 40569932, at: Time.now - 100, status: 'solved' }
  end.lazy

S = DStream

merge =
  S.scan({}, &:merge),

add_version =
  S.compose(
    S.zip(1..),
    S.map { |(e,i)| e.merge(version: i) },
  )

remove_id =
  S.map { |e| e.except(:id) } }

add_valid_dates =
  S.compose(
    S.with_next,
    S.map { |(a,b)| a.merge(valid_to: b ? b.fetch(:at) : nil) },
    S.map { |e| e.merge(valid_from: e.fetch(:at)) },
    S.map { |e| e.except(:at) } },
  )

add_row_is_current =
  S.compose(
    S.with_next,
    S.map { |(a,b)| a.merge(row_is_current: b.nil?) },
  )

history_builder =
  S.compose(
    merge,
    add_version,
    remove_id,
    add_valid_dates,
    add_row_is_current,
  )

history = history_builder.call(events)
history.each { |h| p h }
```

## API

The following functions create individual processors:

* `map(&block)` (similar to `Enumerable#map`)

    ```ruby
    S.map(&:odd?).call(1..5).to_a
    # => [true, false, true, false, true]
    ```

* `select(&block)` (similar to `Enumerable#select`)

    ```ruby
    S.select(&:odd?).call(1..5).to_a
    # => [1, 3, 5]
    ```

* `reduce(&block)` (similar to `Enumerable#reduce`)

    ```ruby
    S.reduce(&:+).call(1..5)
    # => 15
    ```

* `take(n)` (similar to `Enumerable#take`)

    ```ruby
    S.take(3).call(1..10).to_a
    # => [1, 2, 3]
    ```

* `zip(other)` (similar to `Enumerable#zip`):

    ```ruby
    S.zip((10..13)).call(1..3).to_a
    # => [[1, 10], [2, 11], [3, 12]]
    ```

* `buffer(size)` yields each stream element, but keeps an internal buffer of not-yet-yielded stream elements. This is useful when reading from a slow and bursty data source, such as a paginated HTTP API.

* `with_next` yields an array containing the stream element and the next stream element, or nil when the end of the stream is reached:

    ```ruby
    S.with_next.call(1..5).to_a
    # => [[1, 2], [2, 3], [3, 4], [4, 5], [5, nil]]
    ```

* `scan(init, &block)` is similar to `reduce`, but rather than returning a single aggregated value, returns all intermediate aggregated values:

    ```ruby
    S.scan(0, &:+).call(1..5).to_a
    # => [1, 3, 6, 10, 15]
    ```

* `flatten2` yields the stream element if it is not an array, otherwise yields the stream element array’s contents:

    ```ruby
    S.compose(S.with_next, S.flatten2).call(1..5).to_a
    # => [1, 2, 2, 3, 3, 4, 4, 5, 5, nil]
    ```

To call a processor to a stream, use `#call`:

```ruby
S = DStream

stream = ['hi', 'hello']

S.map(&:upcase).call(stream).to_a
# => ["HI", "HELLO"]
```

To combine one or more processors, use `.compose`:

```ruby
S = DStream

stream = ['hi', 'hello']

processor = S.compose(
  S.map(&:upcase),
  S.map(&:reverse),
)

processor.call(stream).to_a
# => ["IH", "OLLEH"]
```