jstotz/jstreams

View on GitHub
README.md

Summary

Maintainability
Test Coverage
# jstreams

[![Gem](https://img.shields.io/gem/v/jstreams.svg)](https://rubygems.org/gems/jstreams)
[![CircleCI](https://img.shields.io/circleci/project/github/jstotz/jstreams/master.svg)](https://circleci.com/gh/jstotz/jstreams)
[![Test Coverage](https://img.shields.io/codeclimate/coverage/jstotz/jstreams.svg)](https://codeclimate.com/github/jstotz/jstreams/test_coverage)
[![Maintainability](https://img.shields.io/codeclimate/maintainability/jstotz/jstreams.svg)](https://codeclimate.com/github/jstotz/jstreams/maintainability)
[![Docs](https://img.shields.io/badge/docs-yard-green.svg)](https://www.rubydoc.info/github/jstotz/jstreams/master)

A distributed streaming platform for Ruby built on top of Redis Streams.

Provides a multi-threaded publisher/subscriber.

## Project Status

This is alpha software and not suitable for production use.

## Features

- Load balancing among subscribers within a group
- Automatically message reassignment when consumers go away
- Multi-threaded subscribers
- Configurable message serialization

## Roadmap

- Configurable retry logic
- Replay a stream from a given point
- Wildcard subscriptions

## Installation

Add this line to your application's Gemfile:

```ruby
gem 'jstreams'
```

And then execute:

    $ bundle

Or install it yourself as:

    $ gem install jstreams

## Usage

### Publisher

```ruby
jstreams = Jstreams::Context.new

jstreams.publish(
  :users,
  event: 'user_created',
  user_id: 1,
  name: 'King Buzzo',
  email: 'buzzo@example.com'
)

jstreams.publish(:users, event: 'user_logged_in', user_id: 1)
```

### Subscriber

```ruby
jstreams = Jstreams::Context.new

jstreams.subscribe(
  :user_activity_logger,
  :users
) do |message, _stream, _subscriber|
  case message['event']
  when 'user_created'
    logger.info "User #{message['name']} created"
  when 'user_logged_in'
    logger.info "User #{message['id']} logged in"
  end
end

jstreams.subscribe(
  :send_welcome_email,
  :users
) do |message, _stream, _subscriber|
  send_user_welcome_email(message['id']) if message['event'] == 'user_created'
end

# Spawns subscriber threads and blocks
jstreams.run
```

### Replay

Starts a temporary copy of the given subscriber until messages have been replayed up to the checkpoint stored at the time replay is called.

```ruby
jstreams.replay(:user_activity_logger, from: message_id)
```

### Retries

By default subscribers will process messages indefinitely until successful.

```ruby
# TODO
```

### Serialization

```ruby
class Serializer
  MESSAGE_TYPES = {
    user_created: UserCreatedMessage, user_logged_in: UserLoggedInMessage
  }

  def serialize(type, message)
    message_class(type).serialize(message)
  end

  def deserialize(type, message)
    message_class(type).deserialize(message)
  end

  private

  def message_class(type)
    MESSAGE_TYPES.fetch(type) { raise "Unknown message type: #{type}" }
  end
end

jstreams = Jstreams::Context.new(serializer: Serializer)
```

## Development

After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake spec` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run `bundle exec rake install`. To release a new version, update the version number in `version.rb`, and then run `bundle exec rake release`, which will create a git tag for the version, push git commits and tags, and push the `.gem` file to [rubygems.org](https://rubygems.org).

## Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/jstotz/jstreams. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the [Contributor Covenant](http://contributor-covenant.org) code of conduct.

## License

The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).

## Code of Conduct

Everyone interacting in the jstreams project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the [code of conduct](https://github.com/jstotz/jstreams/blob/master/CODE_OF_CONDUCT.md).