deliveroo/routemaster

View on GitHub
routemaster/controllers/subscriber.rb

Summary

Maintainability
A
0 mins
Test Coverage
require 'routemaster/controllers/base'
require 'routemaster/models/topic'
require 'routemaster/models/subscriber'
require 'routemaster/models/batch'
require 'routemaster/services/update_subscriber_topics'

module Routemaster
  module Controllers
    class Subscriber < Base
      VALID_KEYS = %w(topics callback uuid max timeout)

      post %r{^/(subscription|subscriber)$}, auth: :client, parse: :json do
        if (data.keys - VALID_KEYS).any?
          halt 400, 'bad data in payload'
        end

        unless data['topics'].kind_of?(Array)
          halt 400, 'need an array of topics'
        end

        topics = data['topics'].map do |name|
          Models::Topic.find_or_create!(name: name)
        end
        halt 404 unless topics.all?

        _log.info do
          "Client #{_current_client} subscribing to: [#{data['topics'].join(', ')}]."
        end

        begin
          sub = Models::Subscriber.new(name: current_token)
          sub.callback   = data['callback']
          sub.uuid       = data['uuid']
          sub.timeout    = data['timeout'] if data['timeout']
          sub.max_events = data['max']     if data['max']
          sub.save
        rescue ArgumentError => e
          halt 400, e.message
        end

        Services::UpdateSubscriberTopics.new(
          topics:     topics,
          subscriber: sub,
        ).call

        halt 204
      end

      delete '/subscriber', auth: %i[root client] do
        _log.info do
          "Client #{_current_client} deleting itself and unsubscribing from everything."
        end

        _load_subscriber.destroy
        halt 204
      end

      delete '/subscriber/topics/:name', auth: %i[root client] do
        subscriber = _load_subscriber
        topic = Models::Topic.find(params['name'])
        if topic.nil?
          halt 404, 'topic not found'
        end

        subscription = Models::Subscription.find(topic: topic, subscriber: subscriber)
        if subscription.nil?
          halt 404, 'not subscribed'
        end

        _log.info do
          "Client #{_current_client} unsubscribing from: '#{topic.name}'."
        end

        subscription.destroy
        halt 204
      end

      # GET /subscribers
      # [
      #   {
      #     subscriber: <username>,
      #     uuid: <uuid>,
      #     callback:   <url>,
      #     max_events: <max_events>,
      #     timeout: <timeout>,
      #     topics:     [<name>, ...],
      #     events: {
      #       sent:       <sent_count>,
      #       queued:     <queue_size>,
      #       oldest:     <staleness>,
      #     }
      #   }, ...
      # ]

      get %r{^/(subscriptions|subscribers)$}, auth: %i[root client] do
        content_type :json
        gauges = Models::Batch.gauges
        payload = Models::Subscriber.map do |subscriber|
          {
            subscriber: subscriber.name,
            uuid: subscriber.uuid,
            callback: subscriber.callback,
            max_events: subscriber.max_events,
            timeout: subscriber.timeout,
            topics: subscriber.topics.map(&:name),
            events: {
              sent:   nil,
              queued: gauges[:events][subscriber.name],
              oldest: nil,
            }
          }
        end
        payload.to_json
      end

      private

      def _load_subscriber
        sub = Models::Subscriber.find(current_token)
        sub or halt 404, 'subscriber not found'
      end

      def _current_client
        @_current_client ||= Models::ClientToken.token_name(current_token)
      end
    end
  end
end