cloudfoundry/cloud_controller_ng

View on GitHub
middleware/service_broker_rate_limiter.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'concurrent-ruby'
require 'redis'

module CloudFoundry
  module Middleware
    class ConcurrentRequestCounter
      def initialize(key_prefix, redis_connection_pool_size: nil)
        @key_prefix = key_prefix
        @redis_connection_pool_size = redis_connection_pool_size
      end

      def try_increment?(user_guid, max_concurrent_requests, logger)
        key = "#{@key_prefix}:#{user_guid}"
        store.try_increment?(key, max_concurrent_requests, logger)
      end

      def decrement(user_guid, logger)
        key = "#{@key_prefix}:#{user_guid}"
        store.decrement(key, logger)
      end

      private

      def store
        return @store if defined?(@store)

        redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket)
        @store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size)
      end

      class InMemoryStore
        def initialize
          @mutex = Mutex.new
          @data = {}
        end

        def try_increment?(key, max_concurrent_requests, _)
          @mutex.synchronize do
            @data[key] = Concurrent::Semaphore.new(max_concurrent_requests) unless @data.key?(key)
            @data[key].try_acquire
          end
        end

        def decrement(key, _)
          @mutex.synchronize do
            @data[key].release if @data.key?(key)
          end
        end
      end

      class RedisStore
        def initialize(socket, connection_pool_size)
          connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1
          @redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do
            Redis.new(timeout: 1, path: socket)
          end
        end

        def try_increment?(key, max_concurrent_requests, logger)
          count_str = @redis.incr(key)
          return true if count_str.to_i <= max_concurrent_requests

          @redis.decr(key)
          false
        rescue Redis::BaseError => e
          logger.error("Redis error: #{e.inspect}")
          true
        end

        def decrement(key, logger)
          count_str = @redis.decr(key)
          @redis.incr(key) if count_str.to_i < 0
        rescue Redis::BaseError => e
          logger.error("Redis error: #{e.inspect}")
        end
      end
    end

    class ServiceBrokerRateLimiter
      CONCURRENT_REQUEST_COUNTER = ConcurrentRequestCounter.new('service-broker-rate-limit')

      def initialize(app, opts)
        @app = app
        @logger = opts[:logger]
        @max_concurrent_requests = opts[:max_concurrent_requests]
        @broker_timeout_seconds = opts[:broker_timeout_seconds]
        @concurrent_request_counter = CONCURRENT_REQUEST_COUNTER
      end

      def call(env)
        decrement_after_call = false
        user_guid = env['cf.user_guid']
        if apply_rate_limiting?(env)
          return too_many_requests!(env, user_guid) unless @concurrent_request_counter.try_increment?(user_guid, @max_concurrent_requests, @logger)

          decrement_after_call = true

        end

        @app.call(env)
      ensure
        @concurrent_request_counter.decrement(user_guid, @logger) if decrement_after_call
      end

      private

      def apply_rate_limiting?(env)
        request = ActionDispatch::Request.new(env)
        !admin? && is_service_request?(request) && rate_limit_method?(request)
      end

      def admin?
        VCAP::CloudController::SecurityContext.admin? || VCAP::CloudController::SecurityContext.admin_read_only?
      end

      def is_service_request?(request)
        [
          %r{\A/v2/service_instances},
          %r{\A/v2/service_bindings},
          %r{\A/v2/service_keys},
          %r{\A/v3/service_instances},
          %r{\A/v3/service_credential_bindings},
          %r{\A/v3/service_route_bindings}
        ].any? { |re| request.fullpath.match(re) }
      end

      def rate_limit_method?(request)
        %w[PATCH PUT POST DELETE].include?(request.method)
      end

      def suggested_retry_after
        delay_range = (@broker_timeout_seconds * 0.5).floor..(@broker_timeout_seconds * 1.5).ceil
        rand(delay_range).to_i
      end

      def rate_limit_error(env)
        api_error = CloudController::Errors::ApiError.new_from_details('ServiceBrokerRateLimitExceeded')
        version   = env['PATH_INFO'][0..2]
        if version == '/v2'
          ErrorPresenter.new(api_error, Rails.env.test?, V2ErrorHasher.new(api_error)).to_hash
        elsif version == '/v3'
          ErrorPresenter.new(api_error, Rails.env.test?, V3ErrorHasher.new(api_error)).to_hash
        end
      end

      def too_many_requests!(env, user_guid)
        @logger.info("Service broker concurrent rate limit exceeded for user '#{user_guid}'")
        headers = {}
        headers['Retry-After'] = suggested_retry_after.to_s
        headers['Content-Type'] = 'text/plain; charset=utf-8'
        message = rate_limit_error(env).to_json
        headers['Content-Length'] = message.length.to_s
        [429, headers, [message]]
      end
    end
  end
end