piotrmurach/tty-reader

View on GitHub
lib/tty/reader.rb

Summary

Maintainability
F
3 days
Test Coverage
# frozen_string_literal: true

require "tty-cursor"
require "tty-screen"
require "wisper"

require_relative "reader/completer"
require_relative "reader/completion_event"
require_relative "reader/history"
require_relative "reader/line"
require_relative "reader/key_event"
require_relative "reader/console"
require_relative "reader/win_console"
require_relative "reader/version"

module TTY
  # A class responsible for reading character input from STDIN
  #
  # Used internally to provide key and line reading functionality
  #
  # @api public
  class Reader
    include Wisper::Publisher

    # Key codes
    BACKSPACE = 8
    TAB       = 9
    NEWLINE   = 10
    CARRIAGE_RETURN = 13
    DELETE    = 127

    # Keys that terminate input
    EXIT_KEYS = %i[ctrl_d ctrl_z].freeze

    # Pattern to check if line ends with a line break character
    END_WITH_LINE_BREAK = /(\r|\n)$/.freeze

    # Raised when the user hits the interrupt key(Control-C)
    #
    # @api public
    InputInterrupt = Class.new(Interrupt)

    # Check if Windowz mode
    #
    # @return [Boolean]
    #
    # @api public
    def self.windows?
      ::File::ALT_SEPARATOR == "\\"
    end

    attr_reader :input

    attr_reader :output

    attr_reader :env

    attr_reader :track_history
    alias track_history? track_history

    # The handler for finding word completion suggestions
    #
    # @api public
    attr_reader :completion_handler

    # The suffix to add to suggested word completion
    #
    # @api public
    attr_reader :completion_suffix

    attr_reader :console

    attr_reader :cursor

    # Initialize a Reader
    #
    # @param [IO] input
    #   the input stream
    # @param [IO] output
    #   the output stream
    # @param [Symbol] interrupt
    #   the way to handle the Ctrl+C key out of :signal, :exit, :noop
    # @param [Hash] env
    #   the environment variables
    # @param [Boolean] track_history
    #   disable line history tracking, true by default
    # @param [Boolean] history_cycle
    #   allow cycling through history, false by default
    # @param [Boolean] history_duplicates
    #   allow duplicate entires, false by default
    # @param [Proc] history_exclude
    #   exclude lines from history, by default all lines are stored
    # @param [Proc] completion_handler
    #   the hanlder for finding word completion suggestions
    # @param [String] completion_suffix
    #   the suffix to add to suggested word completion
    #
    # @api public
    def initialize(input: $stdin, output: $stdout, interrupt: :error,
                   env: ENV, track_history: true, history_cycle: false,
                   history_exclude: History::DEFAULT_EXCLUDE,
                   history_size: History::DEFAULT_SIZE,
                   history_duplicates: false,
                   completion_handler: nil, completion_suffix: "")
      @input = input
      @output = output
      @interrupt = interrupt
      @env = env
      @track_history = track_history
      @history_cycle = history_cycle
      @history_exclude = history_exclude
      @history_duplicates = history_duplicates
      @history_size = history_size
      @completion_handler = completion_handler
      @completion_suffix = completion_suffix
      @completer = Completer.new(handler: completion_handler,
                                 suffix: completion_suffix)

      @console = select_console(input)
      @history = History.new(history_size) do |h|
        h.cycle = history_cycle
        h.duplicates = history_duplicates
        h.exclude = history_exclude
      end
      @cursor = TTY::Cursor
    end

    # Set completion handler
    #
    # @param [Proc] handler
    #   the handler for finding word completion suggestions
    #
    # @api public
    def completion_handler=(handler)
      @completion_handler = handler
      @completer.handler = handler
    end

    # Set completion suffix
    #
    # @param [String] suffix
    #   the suffix to add to suggested word completion
    #
    # @api public
    def completion_suffix=(suffix)
      @completion_suffix = suffix
      @completer.suffix = suffix
    end

    alias old_subcribe subscribe

    # Subscribe to receive key events
    #
    # @example
    #   reader.subscribe(MyListener.new)
    #
    # @return [self|yield]
    #
    # @api public
    def subscribe(listener, options = {})
      old_subcribe(listener, options)
      object = self
      if block_given?
        object = yield
        unsubscribe(listener)
      end
      object
    end

    # Unsubscribe from receiving key events
    #
    # @example
    #   reader.unsubscribe(my_listener)
    #
    # @return [void]
    #
    # @api public
    def unsubscribe(listener)
      registry = send(:local_registrations)
      registry.each do |object|
        if object.listener.equal?(listener)
          registry.delete(object)
        end
      end
    end

    # Select appropriate console
    #
    # @api private
    def select_console(input)
      if self.class.windows? && !env["TTY_TEST"]
        WinConsole.new(input)
      else
        Console.new(input)
      end
    end

    # Get input in unbuffered mode.
    #
    # @example
    #   unbufferred do
    #     ...
    #   end
    #
    # @api public
    def unbufferred(&block)
      bufferring = output.sync
      # Immediately flush output
      output.sync = true
      block[] if block_given?
    ensure
      output.sync = bufferring
    end

    # Read a keypress including invisible multibyte codes and return
    # a character as a string.
    # Nothing is echoed to the console. This call will block for a
    # single keypress, but will not wait for Enter to be pressed.
    #
    # @param [Boolean] echo
    #   whether to echo chars back or not, defaults to false
    # @option [Boolean] raw
    #   whenther raw mode is enabled, defaults to true
    # @option [Boolean] nonblock
    #   whether to wait for input or not, defaults to false
    #
    # @return [String]
    #
    # @api public
    def read_keypress(echo: false, raw: true, nonblock: false)
      codes = unbufferred do
        get_codes(echo: echo, raw: raw, nonblock: nonblock)
      end
      char = codes ? codes.pack("U*") : nil

      trigger_key_event(char) if char
      char
    end
    alias read_char read_keypress

    # Get input code points
    #
    # @param [Boolean] echo
    #   whether to echo chars back or not, defaults to false
    # @option [Boolean] raw
    #   whenther raw mode is enabled, defaults to true
    # @option [Boolean] nonblock
    #   whether to wait for input or not, defaults to false
    # @param [Array[Integer]] codes
    #   the currently read char code points
    #
    # @return [Array[Integer]]
    #
    # @api private
    def get_codes(echo: true, raw: false, nonblock: false, codes: [])
      char = console.get_char(echo: echo, raw: raw, nonblock: nonblock)
      handle_interrupt if console.keys[char] == :ctrl_c
      return if char.nil?

      codes << char.ord
      condition = proc { |escape|
        (codes - escape).empty? ||
        (escape - codes).empty? &&
        !(64..126).cover?(codes.last)
      }

      while console.escape_codes.any?(&condition)
        char_codes = get_codes(echo: echo, raw: raw,
                               nonblock: true, codes: codes)
        break if char_codes.nil?
      end

      codes
    end

    # Get a single line from STDIN. Each key pressed is echoed
    # back to the shell. The input terminates when enter or
    # return key is pressed.
    #
    # @param [String] prompt
    #   the prompt to display before input
    # @param [String] value
    #   the value to pre-populate line with
    # @param [Boolean] echo
    #   whether to echo chars back or not, defaults to false
    # @param [Array<Symbol>] exit_keys
    #   the custom keys to exit line editing
    # @option [Boolean] raw
    #   whenther raw mode is enabled, defaults to true
    # @option [Boolean] nonblock
    #   whether to wait for input or not, defaults to false
    #
    # @return [String]
    #
    # @api public
    def read_line(prompt = "", value: "", echo: true, raw: true,
                  nonblock: false, exit_keys: nil)
      line = Line.new(value, prompt: prompt)
      screen_width = TTY::Screen.width
      history_in_use = false
      previous_key_name = ""
      buffer = ""

      output.print(line)

      while (codes = get_codes(echo: echo, raw: raw, nonblock: nonblock)) &&
            (code = codes[0])
        char = codes.pack("U*")
        key_name = console.keys[char]

        if exit_keys && exit_keys.include?(key_name)
          trigger_key_event(char, line: line.to_s)
          break
        end

        if raw && echo
          clear_display(line, screen_width)
        end

        if (key_name == :tab || code == TAB || key_name == :shift_tab) &&
           completion_handler
          initial = previous_key_name != :tab && previous_key_name != :shift_tab
          direction = key_name == :shift_tab ? :previous : :next
          if completion = @completer.complete(line, initial: initial,
                                                    direction: direction)
            trigger_completion_event(completion, line.to_s)
          end
        elsif key_name == :escape && completion_handler &&
              (previous_key_name == :tab || previous_key_name == :shift_tab)
          @completer.cancel(line)
        elsif key_name == :backspace || code == BACKSPACE
          if !line.start?
            line.left
            line.delete
          end
        elsif key_name == :delete || code == DELETE
          line.delete
        elsif key_name.to_s =~ /ctrl_/
          # skip
        elsif key_name == :up
          @history.replace(line.text) if history_in_use
          if history_previous?
            line.replace(history_previous(skip: !history_in_use))
            history_in_use = true
          end
        elsif key_name == :down
          @history.replace(line.text) if history_in_use
          if history_next?
            line.replace(history_next)
          elsif history_in_use
            line.replace(buffer)
            history_in_use = false
          end
        elsif key_name == :left
          line.left
        elsif key_name == :right
          line.right
        elsif key_name == :home
          line.move_to_start
        elsif key_name == :end
          line.move_to_end
        else
          if raw && [CARRIAGE_RETURN, NEWLINE].include?(code)
            char = "\n"
            line.move_to_end
          end
          line.insert(char)
          buffer = line.text unless history_in_use
        end

        if (key_name == :backspace || code == BACKSPACE) && echo
          if raw
            output.print("\e[1X") unless line.start?
          else
            output.print(?\s + (line.start? ? "" : ?\b))
          end
        end

        previous_key_name = key_name

        # trigger before line is printed to allow for line changes
        trigger_key_event(char, line: line.to_s)

        if raw && echo
          output.print(line.to_s)
          if char == "\n"
            line.move_to_start
          elsif !line.end? # readjust cursor position
            output.print(cursor.backward(line.text_size - line.cursor))
          end
        end

        if [CARRIAGE_RETURN, NEWLINE].include?(code)
          buffer = ""
          output.puts unless echo
          break
        end
      end

      if track_history? && echo
        add_to_history(line.text.rstrip)
      end

      line.text
    end

    # Clear display for the current line input
    #
    # Handles clearing input that is longer than the current
    # terminal width which allows copy & pasting long strings.
    #
    # @param [Line] line
    #   the line to display
    # @param [Number] screen_width
    #   the terminal screen width
    #
    # @api private
    def clear_display(line, screen_width)
      total_lines  = count_screen_lines(line.size, screen_width)
      current_line = count_screen_lines(line.prompt_size + line.cursor, screen_width)
      lines_down = total_lines - current_line

      output.print(cursor.down(lines_down)) unless lines_down.zero?
      output.print(cursor.clear_lines(total_lines))
    end

    # Count the number of screen lines given line takes up in terminal
    #
    # @param [Integer] line_or_size
    #   the current line or its length
    # @param [Integer] screen_width
    #   the width of terminal screen
    #
    # @return [Integer]
    #
    # @api public
    def count_screen_lines(line_or_size, screen_width = TTY::Screen.width)
      line_size = if line_or_size.is_a?(Integer)
                    line_or_size
                  else
                    Line.sanitize(line_or_size).size
                  end
      # new character + we don't want to add new line on screen_width
      new_chars = self.class.windows? ? -1 : 1
      1 + [0, (line_size - new_chars) / screen_width].max
    end

    # Read multiple lines and return them in an array.
    # Skip empty lines in the returned lines array.
    # The input gathering is terminated by Ctrl+d or Ctrl+z.
    #
    # @param [String] prompt
    #   the prompt displayed before the input
    # @param [String] value
    #   the value to pre-populate line with
    # @param [Boolean] echo
    #   whether to echo chars back or not, defaults to false
    # @param [Array<Symbol>] exit_keys
    #   the custom keys to exit line editing
    # @option [Boolean] raw
    #   whenther raw mode is enabled, defaults to true
    # @option [Boolean] nonblock
    #   whether to wait for input or not, defaults to false
    #
    # @yield [String] line
    #
    # @return [Array[String]]
    #
    # @api public
    def read_multiline(prompt = "", value: "", echo: true, raw: true,
                       nonblock: false, exit_keys: EXIT_KEYS)
      lines = []
      stop = false
      clear_value = !value.to_s.empty?

      loop do
        line = read_line(prompt, value: value, echo: echo, raw: raw,
                                 nonblock: nonblock, exit_keys: exit_keys).to_s
        if clear_value
          clear_value = false
          value = ""
        end
        break if line.empty?

        stop = line.match(END_WITH_LINE_BREAK).nil?
        next if line !~ /\S/ && !stop

        if block_given?
          yield(line)
        else
          lines << line
        end
        break if stop
      end

      lines
    end
    alias read_lines read_multiline

    # Expose event broadcasting
    #
    # @api public
    def trigger(event, *args)
      publish(event, *args)
    end

    # Add a line to history
    #
    # @param [String] line
    #
    # @api private
    def add_to_history(line)
      @history.push(line)
    end

    # Check if history has next line
    #
    # @param [Boolean]
    #
    # @api private
    def history_next?
      @history.next?
    end

    # Move history to the next line
    #
    # @return [String]
    #   the next line
    #
    # @api private
    def history_next
      @history.next
      @history.get
    end

    # Check if history has previous line
    #
    # @return [Boolean]
    #
    # @api private
    def history_previous?
      @history.previous?
    end

    # Move history to the previous line
    #
    # @param [Boolean] skip
    #   whether or not to move history index
    #
    # @return [String]
    #   the previous line
    #
    # @api private
    def history_previous(skip: false)
      @history.previous unless skip
      @history.get
    end

    # Inspect class name and public attributes
    #
    # @return [String]
    #
    # @api public
    def inspect
      "#<#{self.class}: @input=#{input}, @output=#{output}>"
    end

    private

    # Trigger completion event
    #
    # @param [String] completion
    #   the suggested word completion
    # @param [Line] line
    #   the line with word to complete
    #
    # @api private
    def trigger_completion_event(completion, line)
      completion_event = CompletionEvent.new(@completer, completion, line)
      trigger(:complete, completion_event)
    end

    # Publish event
    #
    # @param [String] char
    #   the key pressed
    #
    # @return [nil]
    #
    # @api private
    def trigger_key_event(char, line: "")
      event = KeyEvent.from(console.keys, char, line)
      trigger(:"key#{event.key.name}", event) if event.trigger?
      trigger(:keypress, event)
    end

    # Handle input interrupt based on provided value
    #
    # @api private
    def handle_interrupt
      case @interrupt
      when :signal
        Process.kill("SIGINT", Process.pid)
      when :exit
        exit(130)
      when Proc
        @interrupt.call
      when :noop
        # Noop
      else
        raise InputInterrupt
      end
    end
  end # Reader
end # TTY