stefan-kolb/nucleus

View on GitHub
lib/nucleus/adapters/v1/cloud_foundry_v2/logs.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module Nucleus
  module Adapters
    module V1
      class CloudFoundryV2 < Stub
        module Logs
          LOGGREGATOR_TYPES = [Enums::ApplicationLogfileType::API,
                               Enums::ApplicationLogfileType::APPLICATION,
                               Enums::ApplicationLogfileType::REQUEST,
                               Enums::ApplicationLogfileType::SYSTEM].freeze
          # Carriage return (newline in Mac OS) + line feed (newline in Unix) == CRLF (newline in Windows)
          CRLF = "\r\n".freeze
          WSP  = ' '.freeze

          # @see Stub#logs
          def logs(application_name_or_id)
            app_guid = app_guid(application_name_or_id)
            # retrieve app for timestamps only :/
            app_created = get("/v2/apps/#{app_guid}").body[:metadata][:created_at]
            logs = []

            begin
              log_files_list = download_file(app_guid, 'logs')
              # parse raw response to array
              log_files_list.split(CRLF).each do |logfile_line|
                filename = logfile_line.rpartition(' ').first.strip
                if filename == 'staging_task.log'
                  filename = 'build'
                  log_type = Enums::ApplicationLogfileType::BUILD
                else
                  log_type = Enums::ApplicationLogfileType::OTHER
                end
                # TODO: right now, we always assume the log has recently been updated
                logs.push(id: filename, name: filename, type: log_type, created_at: app_created,
                          updated_at: Time.now.utc.iso8601)
              end
            rescue Errors::AdapterError
              log.debug('no logs directory found for cf application')
            end

            # add the default logtypes, available according to:
            # http://docs.cloudfoundry.org/devguide/deploy-apps/streaming-logs.html#format
            LOGGREGATOR_TYPES.each do |type|
              logs.push(id: type, name: type, type: type, created_at: app_created, updated_at: Time.now.utc.iso8601)
            end
            # TODO: 'all' is probably not perfect, since the build log wont be included
            logs.push(id: 'all', name: 'all', type: Enums::ApplicationLogfileType::OTHER,
                      created_at: app_created, updated_at: Time.now.utc.iso8601)
            logs
          end

          # @see Stub#log?
          def log?(application_name_or_id, log_id)
            app_guid = app_guid(application_name_or_id)
            # test file existence
            log_id = 'staging_task.log' if log_id.to_sym == Enums::ApplicationLogfileType::BUILD
            # checks also if application is even valid
            response = get("/v2/apps/#{app_guid}/instances/0/files/logs/#{log_id}",
                           follow_redirects: false, expects: [200, 302, 400])
            return true if response == 200 || log_stream?(log_id)
            return false if response == 400
            # if 302 (only remaining option), followup...

            # download log file
            download_file(app_guid, "logs/#{log_id}")
            # no error, file exists
            true
          rescue Errors::AdapterResourceNotFoundError, Errors::UnknownAdapterCallError,
                 Excon::Errors::NotFound, Excon::Errors::BadRequest
            false
          end

          # @see Stub#tail
          def tail(application_name_or_id, log_id, stream)
            app_guid = app_guid(application_name_or_id)
            return tail_stream(app_guid, log_id, stream) if log_stream?(log_id)
            tail_file(app_guid, log_id, stream)
          end

          # @see Stub#log_entries
          def log_entries(application_name_or_id, log_id)
            app_guid = app_guid(application_name_or_id)
            # first check if this log is a file or must be fetched from the loggregator
            if log_stream?(log_id)
              # fetch recent data from loggregator and return an array of log entries
              recent_decoded = recent_log_messages(app_guid, loggregator_filter(log_id))
              recent_decoded.collect { |log_msg| construct_log_entry(log_msg) }
            elsif log_id.to_sym == Enums::ApplicationLogfileType::BUILD
              # handle special staging log
              build_log_entries(app_guid)
            else
              download_logfile_entries(app_guid, log_id)
            end
          end

          private

          def build_log_entries(app_guid)
            log_id = 'staging_task.log'
            download_logfile_entries(app_guid, log_id)
          rescue Errors::AdapterResourceNotFoundError
            # if there was no build yet, return no entries instead of the 404 error
            []
          end

          def loggregator_filter(log_id)
            case log_id.to_sym
            when Enums::ApplicationLogfileType::API
              filter = ['API']
            when Enums::ApplicationLogfileType::APPLICATION
              filter = ['APP']
            when Enums::ApplicationLogfileType::REQUEST
              filter = ['RTR']
            when Enums::ApplicationLogfileType::SYSTEM
              filter = %w[STG LGR DEA]
            when :all
              # no filter, show all
              filter = nil
            else
              # invalid log requests --> 404
              raise Errors::AdapterResourceNotFoundError,
                    "Invalid log file '#{log_id}', not available for application '#{app_guid}'"
            end
            filter
          end

          def construct_log_entry(decoded_message)
            # 2015-03-22T15:28:55.83+0100 [RTR/0]      OUT message...
            "#{Time.at(decoded_message.timestamp / 1_000_000_000.0).iso8601} "\
              "[#{decoded_message.source_name}/#{decoded_message.source_id}] "\
              "#{decoded_message.message_type == 1 ? 'OUT' : 'ERR'} #{decoded_message.message}"
          end

          def download_logfile_entries(app_guid, log_id, headers_to_use = nil)
            # download log file
            logfile_contents = download_file(app_guid, "logs/#{log_id}", headers_to_use)
            # split file into entries by line breaks and return an array of log entries
            logfile_contents.split("\n")
          end

          def download_file(app_guid, file_path, headers_to_use = nil)
            expected_statuses = [200, 302, 400, 404]
            # Hack, do not create fresh headers (which would fail) when in a deferred action
            headers_to_use ||= headers

            # log list consists of 2 parts, loggregator and files
            log_files = get("/v2/apps/#{app_guid}/instances/0/files/#{file_path}",
                            follow_redirects: false, expects: expected_statuses, headers: headers_to_use)
            if log_files.status == 400 || log_files.status == 404
              raise Errors::AdapterResourceNotFoundError,
                    "Invalid log file: '#{file_path}' not available for application '#{app_guid}'"
            end
            return log_files.body if log_files.status == 200

            # status must be 302, follow to the Location
            download_location = log_files.headers[:Location]
            # if IBM f*cked with the download URL, fix the address
            download_location.gsub!(/objectstorage.service.networklayer.com/, 'objectstorage.softlayer.net')
            Excon.defaults[:ssl_verify_peer] = false unless @check_certificates

            connection_params = { ssl_verify_peer: @check_certificates }
            connection = Excon.new(download_location, connection_params)
            downloaded_logfile_response = connection.request(method: :get, expects: expected_statuses)

            if downloaded_logfile_response.status == 404
              raise Errors::AdapterResourceNotFoundError,
                    "Invalid log file: '#{file_path}' not available for application '#{app_guid}'"
            end
            downloaded_logfile_response.body
          end

          def recent_log_messages(app_guid, filter = nil)
            loggregator_recent_uri = "https://#{loggregator_endpoint}:443/recent?app=#{app_guid}"
            # current log state before tailing, multipart message of protobuf objects
            current_log_response = get(loggregator_recent_uri)
            current_log_boundary = /boundary=(\w+)/.match(current_log_response.headers['Content-Type'])[1]
            current_log = current_log_response.body

            boundary_regexp = /--#{Regexp.quote(current_log_boundary)}(--)?#{CRLF}/
            parts = current_log.split(boundary_regexp).collect do |chunk|
              header_part = chunk.split(/#{CRLF}#{WSP}*#{CRLF}/m, 2)[0]
              if header_part
                headers = header_part.split(/\r\n/).map { |kv| kv }
                headers.length > 1 ? headers[1] : nil
              end
            end.compact
            # decode log messages
            decoded_messages = parts.collect do |proto_message|
              Message.decode(proto_message)
            end.compact
            return decoded_messages unless filter
            # return filtered messages
            decoded_messages.find_all do |msg|
              filter.include?(msg.source_name)
            end
          end

          def log_stream?(log_id)
            LOGGREGATOR_TYPES.include?(log_id.to_sym) || log_id.to_sym == :all
          end

          def loggregator_endpoint
            @endpoint_url.gsub(%r{^(\w*://)?(api)([-\.\w]+)$}i, 'loggregator\3')
          end

          def tail_file(app_guid, log_id, stream)
            log.debug 'Tailing CF log file'
            log_id = 'staging_task.log' if log_id.to_sym == Enums::ApplicationLogfileType::BUILD

            # cache headers as they are bound to a request and could be lost with the next tick
            headers_to_use = headers
            latest_pushed_line = -1

            # update every 3 seconds
            @tail_file_timer = EM.add_periodic_timer(3) do
              log.debug('Poll updated file tail...')
              begin
                latest_pushed_line = push_file_tail(app_guid, log_id, stream, latest_pushed_line, headers_to_use)
              rescue Errors::AdapterResourceNotFoundError
                log.debug('Logfile not found, finished tailing')
                # file lost, close stream
                @tail_file_timer.cancel if @tail_file_timer
                stream.close
              end
            end
            # listener to stop polling
            StopListener.new(@tail_file_timer, :cancel)
          end

          def push_file_tail(app_guid, log_id, stream, pushed_line_idx, headers_to_use)
            log.debug('Fetching file for tail response...')
            entries = download_logfile_entries(app_guid, log_id, headers_to_use)
            # file was shortened, close stream since we do not know where to continue
            if entries.length < pushed_line_idx
              log.debug('File was modified and shortened, stop tailing the file...')
              stream.close
            else
              entries.each_with_index do |entry, index|
                next if index <= pushed_line_idx
                pushed_line_idx = index
                stream.send_message(entry)
              end
              pushed_line_idx
            end
          end

          def tail_stream(app_guid, log_id, stream)
            filter = loggregator_filter(log_id)

            # push current state
            recent_log_messages(app_guid, filter).each { |entry| stream.send_message(construct_log_entry(entry)) }

            # Now register websocket to receive the latest updates
            ws = Faye::WebSocket::Client.new("wss://#{loggregator_endpoint}:443/tail/?app=#{app_guid}",
                                             nil, headers: headers.slice('Authorization'))

            ws.on :message do |event|
              log.debug "CF loggregator message received: #{event}"
              begin
                msg = Message.decode(event.data.pack('C*'))
                # notify stream to print new log line if msg type matches the applied filter
                stream.send_message(construct_log_entry(msg)) if filter.nil? || filter.include?(msg.source_name)
              rescue StandardError => e
                log.error "Cloud Foundry log message de-serialization failed: #{e}"
              end
            end

            ws.on :close do |event|
              log.debug "Closing CF loggregator websocket: code=#{event.code}, reason=#{event.reason}"
              ws = nil
              # notify stream that no more update are to arrive and stream shall be closed
              stream.close
            end
            # return listener to stop websocket
            TailStopper.new(ws, :close)
          end

          # Message class definition, matching the Protocol Buffer definition of the Cloud Foundry loggregator.
          # see also: https://github.com/cloudfoundry/loggregatorlib/blob/master/logmessage/log_message.proto
          class Message < ::Protobuf::Message
            class MessageType < ::Protobuf::Enum
              define :OUT, 1
              define :ERR, 2
            end

            required :bytes, :message, 1
            required Logs::Message::MessageType, :message_type, 2
            required :sint64, :timestamp, 3
            required :string, :app_id, 4
            optional :string, :source_id, 6
            repeated :string, :drain_urls, 7
            optional :string, :source_name, 8
          end

          class Envelope < ::Protobuf::Message
            required :string, :routing_key, 1
            required :bytes, :signature, 2
            required Logs::Message, :log_message, 3
          end
        end
      end
    end
  end
end