avocado-framework/avocado

View on GitHub
avocado/utils/gdb.py

Summary

Maintainability
D
1 day
Test Coverage
F
30%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
# Authors: Cleber Rosa <cleber@redhat.com>

"""
Module that provides communication with GDB via its GDB/MI interpreter
"""

__all__ = ["GDB", "GDBServer", "GDBRemote"]


import fcntl
import os
import socket
import subprocess
import tempfile
import time

from avocado.utils.external import gdbmi_parser
from avocado.utils.network import ports
from avocado.utils.path import find_command

#: Contains a list of binary names that should be run via the GNU debugger
#: and be stopped at a given point. That means that a breakpoint will be set
#: using the given expression
GDB_RUN_BINARY_NAMES_EXPR = []

#: After loading a binary in binary in GDB, but before actually running it,
#: execute the given GDB commands present in the given file, one per line
GDB_PRERUN_COMMANDS = {}

#: Whether to enable the automatic generation of core dumps for applications
#: that are run inside the GNU debugger
GDB_ENABLE_CORE = False

#: Path to the GDB binary
GDB_PATH = None

#: Path to the gdbserver binary
GDBSERVER_PATH = None

GDB_PROMPT = b"(gdb)"
GDB_EXIT = b"^exit"
GDB_BREAK_CONDITIONS = [GDB_PROMPT, GDB_EXIT]

#: How the remote protocol signals a transmission success (in ACK mode)
REMOTE_TRANSMISSION_SUCCESS = "+"

#: How the remote protocol signals a transmission failure (in ACK mode)
REMOTE_TRANSMISSION_FAILURE = "-"

#: How the remote protocol flags the start of a packet
REMOTE_PREFIX = b"$"

#: How the remote protocol flags the end of the packet payload, and that the
#: two digits checksum follow
REMOTE_DELIMITER = b"#"

#: Rather conservative default maximum packet size for clients using the
#: remote protocol. Individual connections can ask (and do so by default)
#: the server about the maximum packet size they can handle.
REMOTE_MAX_PACKET_SIZE = 1024


class UnexpectedResponseError(Exception):
    """
    A response different from the one expected was received from GDB
    """


class ServerInitTimeoutError(Exception):
    """
    Server took longer than expected to initialize itself properly
    """


class InvalidPacketError(Exception):
    """
    Packet received has invalid format
    """


class NotConnectedError(Exception):
    """
    GDBRemote is not connected to a remote GDB server
    """


class RetransmissionRequestedError(Exception):
    """
    Message integrity was not validated and retransmission is being requested
    """


def parse_mi(line):
    """
    Parse a GDB/MI line

    :param line: a string supposedly coming from GDB using MI language
    :type line: str
    :returns: a parsed GDB/MI response
    """
    if not line.endswith("\n"):
        line = f"{line}\n"
    return gdbmi_parser.session().process(line)


def encode_mi_cli(command):
    """
    Encodes a regular (CLI) command into the proper MI form

    :param command: the regular cli command to send
    :type command: str
    :returns: the encoded (escaped) MI command
    :rtype: str
    """
    return f'-interpreter-exec console "{command}"'


def is_stopped_exit(parsed_mi_msg):
    return (
        hasattr(parsed_mi_msg, "class_")
        and (parsed_mi_msg.class_ == "stopped")
        and hasattr(parsed_mi_msg, "result")
        and hasattr(parsed_mi_msg.result, "reason")
        and (parsed_mi_msg.result.reason == "exited")
    )


def is_thread_group_exit(parsed_mi_msg):
    return hasattr(parsed_mi_msg, "class_") and (
        parsed_mi_msg.class_ == "thread-group-exited"
    )


def is_exit(parsed_mi_msg):
    return is_stopped_exit(parsed_mi_msg) or is_thread_group_exit(parsed_mi_msg)


def is_break_hit(parsed_mi_msg):
    return (
        hasattr(parsed_mi_msg, "class_")
        and (parsed_mi_msg.class_ == "stopped")
        and hasattr(parsed_mi_msg, "result")
        and hasattr(parsed_mi_msg.result, "reason")
        and (parsed_mi_msg.result.reason == "breakpoint-hit")
    )


def is_sigsegv(parsed_mi_msg):
    return (
        hasattr(parsed_mi_msg, "class_")
        and (parsed_mi_msg.class_ == "stopped")
        and hasattr(parsed_mi_msg, "result")
        and hasattr(parsed_mi_msg.result, "signal_name")
        and (parsed_mi_msg.result.reason == "SIGSEGV")
    )


def is_sigabrt_stopped(parsed_mi_msg):
    return (
        hasattr(parsed_mi_msg, "class_")
        and (parsed_mi_msg.class_ == "stopped")
        and hasattr(parsed_mi_msg, "record_type")
        and (parsed_mi_msg.record_type == "result")
        and (parsed_mi_msg.result.reason == "signal-received")
        and (parsed_mi_msg.result.signal_name == "SIGABRT")
    )


def is_sigabrt_console(parsed_mi_msg):
    return (
        hasattr(parsed_mi_msg, "record_type")
        and (parsed_mi_msg.record_type == "stream")
        and hasattr(parsed_mi_msg, "type")
        and (parsed_mi_msg.type == "console")
        and hasattr(parsed_mi_msg, "value")
        and parsed_mi_msg.value == "SIGABRT, Aborted.\n"
    )


def is_sigabrt(parsed_mi_msg):
    return is_sigabrt_stopped(parsed_mi_msg) or is_sigabrt_console(parsed_mi_msg)


def is_fatal_signal(parsed_mi_msg):
    return is_sigsegv(parsed_mi_msg) or is_sigabrt(parsed_mi_msg)


def format_as_hex(char):
    """
    Formats a single ascii character as a lower case hex string

    :param char: a single ascii character
    :type char: str
    :returns: the character formatted as a lower case hex string
    :rtype: str
    """
    return f"{ord(char):2x}"


def string_to_hex(text):
    """
    Formats a string of text into an hex representation

    :param text: a multi character string
    :type text: str
    :returns: the string converted to an hex representation
    :rtype: str
    """
    return "".join(map(format_as_hex, text))


class CommandResult:
    """
    A GDB command, its result, and other possible messages
    """

    def __init__(self, command):
        self.command = command
        self.timestamp = time.monotonic()
        self.stream_messages = []
        self.application_output = []
        self.result = None

    def get_application_output(self):
        """
        Return all application output concatenated as a single string

        :returns: application output concatenated
        :rtype: str
        """
        return "".join(self.application_output)

    def get_stream_messages_text(self):
        """
        Return all stream messages text concatenated as a single string

        :returns: stream messages text concatenated
        :rtype: str
        """
        return "".join([m.value for m in self.stream_messages])

    def __repr__(self):
        return f"{self.command} at {self.timestamp:.9f}"


class GDB:
    """
    Wraps a GDB subprocess for easier manipulation
    """

    REQUIRED_ARGS = ["--interpreter=mi", "--quiet"]

    DEFAULT_BREAK = "main"

    def __init__(self, path=None, *extra_args):  # pylint: disable=W1113
        if path is None:
            path = find_command("gdb", default="/usr/bin/gdb")
        self.path = path
        args = [self.path]
        args += self.REQUIRED_ARGS
        args += extra_args

        try:
            self.process = subprocess.Popen(
                args,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                close_fds=True,
            )
        except OSError as details:
            if details.errno == 2:
                exc = OSError(f"File '{args[0]}' not found")
                exc.errno = 2
                raise exc
            else:
                raise

        fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
        self.read_until_break()

        # If this instance is connected to another target. If so, what
        # tcp port it's connected to
        self.connected_to = None

        # any GDB MI async messages
        self.async_messages = []

        self.commands_history = []

        # whatever comes from the app that is not a GDB MI message
        self.output_messages = []
        self.output_messages_queue = []

    def read_gdb_response(self, timeout=0.01, max_tries=100):
        """
        Read raw responses from GDB

        :param timeout: the amount of time to way between read attempts
        :type timeout: float
        :param max_tries: the maximum number of cycles to try to read until
                          a response is obtained
        :type max_tries: int
        :returns: a string containing a raw response from GDB
        :rtype: str
        """
        current_try = 0
        while current_try < max_tries:
            try:
                line = self.process.stdout.readline()
                line = line.strip()
                if line:
                    return line
            except IOError:
                current_try += 1
            if current_try >= max_tries:
                raise ValueError("Could not read GDB response")
            else:
                time.sleep(timeout)

    def read_until_break(self, max_lines=100):
        """
        Read lines from GDB until a break condition is reached

        :param max_lines: the maximum number of lines to read
        :type max_lines: int
        :returns: a list of messages read
        :rtype: list of str
        """
        result = []
        while True:
            line = self.read_gdb_response()
            if line in GDB_BREAK_CONDITIONS:
                break
            if len(result) >= max_lines:
                break
            result.append(line)
        return result

    def send_gdb_command(self, command):
        """
        Send a raw command to the GNU debugger input

        :param command: the GDB command, hopefully in MI language
        :type command: str
        :returns: None
        """
        if not command.endswith("\n"):
            command = f"{command}\n"
        self.process.stdin.write(command.encode())
        self.process.stdin.flush()

    def cmd(self, command):
        """
        Sends a command and parses all lines until prompt is received

        :param command: the GDB command, hopefully in MI language
        :type command: str
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = CommandResult(command)

        self.send_gdb_command(command)
        responses = self.read_until_break()
        result_response_received = False

        for line in responses:
            # If the line can not be properly parsed, it is *most likely*
            # generated by the application being run inside the debugger
            try:
                parsed_response = parse_mi(line.decode())
            except Exception:  # pylint: disable=W0703
                cmd.application_output.append(line)
                continue

            if (
                parsed_response.type == "console"
                and parsed_response.record_type == "stream"
            ):
                cmd.stream_messages.append(parsed_response)
            elif parsed_response.type == "result":
                if result_response_received:
                    # raise an exception here, because no two result
                    # responses should come from a single command AFAIK
                    raise Exception("Many result responses to a single cmd")
                result_response_received = True
                cmd.result = parsed_response
            else:
                self.async_messages.append(parsed_response)

        return cmd

    def cli_cmd(self, command):
        """
        Sends a cli command encoded as an MI command

        :param command: a regular GDB cli command
        :type command: str
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = encode_mi_cli(command)
        return self.cmd(cmd)

    def cmd_exists(self, command):
        """
        Checks if a given command exists

        :param command: a GDB MI command, including the dash (-) prefix
        :type command: str
        :returns: either True or False
        :rtype: bool
        """
        gdb_info_command = f"-info-gdb-mi-command {command[1:]}"
        r = self.cmd(gdb_info_command)
        return r.result.result.command.exists == "true"

    def set_file(self, path):
        """
        Sets the file that will be executed

        :param path: the path of the binary that will be executed
        :type path: str
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = f"-file-exec-and-symbols {path}"
        r = self.cmd(cmd)
        if not r.result.class_ == "done":
            raise UnexpectedResponseError

        if self.connected_to is not None:
            cmd = f"set remote exec-file {path}"
            r = self.cmd(cmd)
            if not r.result.class_ == "done":
                raise UnexpectedResponseError
        return r

    def set_break(self, location, ignore_error=False):
        """
        Sets a new breakpoint on the binary currently being debugged

        :param location: a breakpoint location expression as accepted by GDB
        :type location: str
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = f"-break-insert {location}"
        r = self.cmd(cmd)
        if not r.result.class_ == "done":
            if not ignore_error:
                raise UnexpectedResponseError
        return r

    def del_break(self, number):
        """
        Deletes a breakpoint by its number

        :param number: the breakpoint number
        :type number: int
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = f"-break-delete {number}"
        r = self.cmd(cmd)
        if not r.result.class_ == "done":
            raise UnexpectedResponseError
        return r

    def run(self, args=None):
        """
        Runs the application inside the debugger

        :param args: the arguments to be passed to the binary as command line
                     arguments
        :type args: builtin.list
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        if args:
            args_text = " ".join(args)
            cmd = f"-exec-arguments {args_text}"
            r = self.cmd(cmd)
            if not r.result.class_ == "done":
                raise UnexpectedResponseError

        r = self.cmd("-exec-run")
        if not r.result.class_ == "running":
            raise UnexpectedResponseError
        return r

    def connect(self, port):
        """
        Connects to a remote debugger (a gdbserver) at the given TCP port

        This uses the "extended-remote" target type only

        :param port: the TCP port number
        :type port: int
        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = f"-target-select extended-remote :{port}"
        r = self.cmd(cmd)
        if not r.result.class_ == "connected":
            raise UnexpectedResponseError
        self.connected_to = port
        return r

    def disconnect(self):
        """
        Disconnects from a remote debugger

        :returns: a :class:`CommandResult` instance
        :rtype: :class:`CommandResult`
        """
        cmd = "-target-disconnect"
        r = self.cmd(cmd)
        if not r.result.class_ == "done":
            raise UnexpectedResponseError
        self.connected_to = None
        return r

    def exit(self):
        """
        Exits the GDB application gracefully

        :returns: the result of :meth:`subprocess.POpen.wait`, that is, a
                  :attr:`subprocess.POpen.returncode`
        :rtype: int or None
        """
        self.cmd("-gdb-exit")
        return self.process.wait()


class GDBServer:
    """
    Wraps a gdbserver instance
    """

    #: The default arguments used when starting the GDB server process
    REQUIRED_ARGS = ["--multi"]

    #: The range from which a port to GDB server will try to be allocated from
    PORT_RANGE = (20000, 20999)

    #: The time to optionally wait for the server to initialize itself and be
    #: ready to accept new connections
    INIT_TIMEOUT = 5.0

    # pylint: disable=W0613, W1113
    def __init__(
        self,
        path=None,
        port=None,
        wait_until_running=True,
        *extra_args,
    ):
        """
        Initializes a new gdbserver instance

        :param path: location of the gdbserver binary
        :type path: str
        :param port: tcp port number to listen on for incoming connections
        :type port: int
        :param wait_until_running: wait until the gdbserver is running and
                                   accepting connections. It may take a little
                                   after the process is started and it is
                                   actually bound to the allocated port
        :type wait_until_running: bool
        :param extra_args: optional extra arguments to be passed to gdbserver
        """
        if path is None:
            path = find_command("gdbserver", default="/usr/bin/gdbserver")
        self.path = path
        args = [self.path]
        args += self.REQUIRED_ARGS

        if port is None:
            self.port = ports.find_free_port(*self.PORT_RANGE)
        else:
            self.port = port
        args.append(f":{self.port}")

        prefix = f"avocado_gdbserver_{self.port}_"
        _, self.stdout_path = tempfile.mkstemp(prefix=prefix + "stdout_")
        self.stdout = open(self.stdout_path, "w", encoding="utf-8")
        _, self.stderr_path = tempfile.mkstemp(prefix=prefix + "stderr_")
        self.stderr = open(self.stderr_path, "w", encoding="utf-8")

        try:
            self.process = subprocess.Popen(
                args,
                stdin=subprocess.PIPE,
                stdout=self.stdout,
                stderr=self.stderr,
                close_fds=True,
            )
        except OSError as details:
            if details.errno == 2:
                exc = OSError(f"File '{args[0]}' not found")
                exc.errno = 2
                raise exc
            else:
                raise

        if wait_until_running:
            self._wait_until_running()

    def _wait_until_running(self):
        connection_ok = False
        c = GDB()
        end_time = time.monotonic() + self.INIT_TIMEOUT
        while time.monotonic() < end_time:
            try:
                c.connect(self.port)
                connection_ok = True
                break
            except UnexpectedResponseError:
                time.sleep(0.1)
        c.disconnect()
        c.exit()
        if not connection_ok:
            raise ServerInitTimeoutError

    def exit(self, force=True):
        """
        Quits the gdb_server process

        Most correct way of quitting the GDB server is by sending it a command.
        If no GDB client is connected, then we can try to connect to it and
        send a quit command. If this is not possible, we send it a signal and
        wait for it to finish.

        :param force: if a forced exit (sending SIGTERM) should be attempted
        :type force: bool
        :returns: None
        """
        temp_client = GDB()
        try:
            temp_client.connect(self.port)
            temp_client.cli_cmd("monitor exit")
        except (UnexpectedResponseError, ValueError):
            if force:
                self.process.kill()
        finally:
            try:
                temp_client.disconnect()
                temp_client.exit()
            except (UnexpectedResponseError, ValueError):
                if force:
                    temp_client.process.kill()
                    temp_client.process.wait()
            self.process.wait()
            self.stdout.close()
            self.stderr.close()


class GDBRemote:
    def __init__(self, host, port, no_ack_mode=True, extended_mode=True):
        """
        Initializes a new GDBRemote object.

        A GDBRemote acts like a client that speaks the GDB remote protocol,
        documented at:

        https://sourceware.org/gdb/current/onlinedocs/gdb/Remote-Protocol.html

        Caveat: we currently do not support communicating with devices, only
        with TCP sockets. This limitation is basically due to the lack of
        use cases that justify an implementation, but not due to any technical
        shortcoming.

        :param host: the IP address or host name
        :type host: str
        :param port: the port number where the the remote GDB is listening on
        :type port: int
        :param no_ack_mode: if the packet transmission confirmation mode should
                            be disabled
        :type no_ack_mode: bool
        :param extended_mode: if the remote extended mode should be enabled
        :type param extended_mode: bool
        """
        self.host = host
        self.port = port

        # Temporary holder for the class init attributes
        self._no_ack_mode = no_ack_mode
        self.no_ack_mode = False
        self._extended_mode = extended_mode
        self.extended_mode = False

        self._socket = None

    @staticmethod
    def checksum(input_message):
        """Calculates a remote message checksum.

        More details are available at:
        https://sourceware.org/gdb/current/onlinedocs/gdb/Overview.html

        :param input_message: the message input payload, without the
                              start and end markers
        :type input_message: bytes
        :returns: two byte checksum
        :rtype: bytes
        """
        total = 0
        for i in input_message:
            total += i
        result = total % 256

        return b"%02x" % result

    @staticmethod
    def encode(data):
        """Encodes a command.

        That is, add prefix, suffix and checksum.

        More details are available at:
        https://sourceware.org/gdb/current/onlinedocs/gdb/Overview.html

        :param command_data: the command data payload
        :type command_data: bytes
        :returns: the encoded command, ready to be sent to a remote GDB
        :rtype: bytes
        """
        return b"$%b#%b" % (data, GDBRemote.checksum(data))

    @staticmethod
    def decode(data):
        """Decodes a packet and returns its payload.

        More details are available at:
        https://sourceware.org/gdb/current/onlinedocs/gdb/Overview.html

        :param command_data: the command data payload
        :type command_data: bytes
        :returns: the encoded command, ready to be sent to a remote GDB
        :rtype: bytes
        """
        if data[0:1] != REMOTE_PREFIX:
            raise InvalidPacketError

        if data[-3:-2] != REMOTE_DELIMITER:
            raise InvalidPacketError

        payload = data[1:-3]
        checksum = data[-2:]

        if payload == b"":
            expected_checksum = b"00"
        else:
            expected_checksum = GDBRemote.checksum(payload)

        if checksum != expected_checksum:
            raise InvalidPacketError

        return payload

    def cmd(self, command_data, expected_response=None):
        """
        Sends a command data to a remote gdb server

        Limitations: the current version does not deal with retransmissions.

        :param command_data: the remote command to send the the remote stub
        :type command_data: str
        :param expected_response: the (optional) response that is expected
                                  as a response for the command sent
        :type expected_response: str
        :raises: RetransmissionRequestedError, UnexpectedResponseError
        :returns: raw data read from from the remote server
        :rtype: str
        """
        if self._socket is None:
            raise NotConnectedError

        data = self.encode(command_data)
        self._socket.send(data)

        if not self.no_ack_mode:
            transmission_result = self._socket.recv(1)
            if transmission_result == REMOTE_TRANSMISSION_FAILURE:
                raise RetransmissionRequestedError

        result = self._socket.recv(REMOTE_MAX_PACKET_SIZE)
        response_payload = self.decode(result)

        if expected_response is not None:
            if expected_response != response_payload:
                raise UnexpectedResponseError

        return response_payload

    def set_extended_mode(self):
        """
        Enable extended mode. In extended mode, the remote server is made
        persistent. The 'R' packet is used to restart the program being
        debugged. Original documentation at:

        https://sourceware.org/gdb/current/onlinedocs/gdb/Packets.html#extended-mode
        """
        self.cmd(b"!", b"OK")
        self.extended_mode = True

    def start_no_ack_mode(self):
        """
        Request that the remote stub disable the normal +/- protocol
        acknowledgments. Original documentation at:

        https://sourceware.org/gdb/current/onlinedocs/gdb/General-Query-Packets.html#QStartNoAckMode
        """
        self.cmd(b"QStartNoAckMode", b"OK")
        self.no_ack_mode = True

    def connect(self):
        """
        Connects to the remote target and initializes the chosen modes
        """
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((self.host, self.port))

        if self._no_ack_mode:
            self.start_no_ack_mode()

        if self._extended_mode:
            self.set_extended_mode()