ANSSI-FR/polichombr

View on GitHub
skelenox_plugin/connection.py

Summary

Maintainability
C
1 day
Test Coverage
"""
    Skelenox: the collaborative IDA Pro Agent

    This file is part of Polichombr
        (c) ANSSI-FR 2018

    Description:
        This file implements the main connection class
        which interacts with the remote API
"""

import gzip
import httplib
import json
import datetime
import logging
import ssl

from StringIO import StringIO
from string import lower

import idc

logger = logging.getLogger(__name__)


class SkelConnection(object):
    """
        HTTP(S) API management
    """
    sample_id = None
    remote_path = None
    http_debug = None
    api_key = None
    poli_server = None
    poli_port = None
    h_conn = None
    is_online = False

    def __init__(self, skel_config=None):
        """
            Here skel_config should be a SkelConfig object
        """
        if skel_config is None:
            raise ValueError
        self.http_debug = skel_config.debug_http
        self.remote_path = skel_config.poli_remote_path
        self.api_key = skel_config.poli_apikey
        self.poli_server = skel_config.poli_server
        self.poli_port = skel_config.poli_port

        self.h_conn = None
        self.auth_token = None
        self.is_online = False
        self.sample_id = None

    def get_online(self):
        """
            Connect to the server
        """
        try:
            self.__do_init()
        except Exception:
            logger.exception("The polichombr server seems down")
            return False
        return True

    def __do_init(self):
        """
            Initiate connection handle
        """
        if self.http_debug is True:
            logger.info("Connecting using simple HTTP")
            self.h_conn = httplib.HTTPConnection(self.poli_server,
                                                 self.poli_port)
        else:
            logger.info("Connecting using HTTPS")
            ssl_context = ssl._create_unverified_context()
            self.h_conn = httplib.HTTPSConnection(self.poli_server,
                                                  self.poli_port,
                                                  context=ssl_context)

        self.h_conn.connect()
        self.login()
        self.is_online = True
        self.init_sample_id()

    def close_connection(self):
        """
            Cleanup the connection
        """
        logger.debug("Closing connection")
        if self.h_conn is not None:
            self.h_conn.close()
        self.is_online = False
        self.sample_id = None

    def login(self):
        """
            Get an authentication token from the server
        """
        data = json.dumps({'api_key': self.api_key})
        headers = {"Accept-encoding": "gzip, deflate",
                   "Content-type": "application/json",
                   "Accept": "*/*;q=0.8",
                   "Accept-Language": "en-US,en;q=0.5"}

        self.h_conn.request("POST",
                            "/api/1.0/auth_token/",
                            data,
                            headers)
        res = self.h_conn.getresponse()
        if res.status != 200:
            idc.warning("Error, cannot login to Polichombr!")
            raise IOError
        token = json.loads(res.read())["token"]
        self.auth_token = token

    def poli_request(self, endpoint, data, method="POST"):
        """
            @arg : endpoint The API target endpoint
            @arg : data dictionary
            @return : dict issued from JSON
        """
        if not self.is_online:
            logger.error("Cannot send requests while not connected")
            raise IOError
        headers = {"Accept-encoding": "gzip, deflate",
                   "Content-type": "application/json",
                   "Accept": "*/*;q=0.8",
                   "Accept-Language": "en-US,en;q=0.5",
                   "Connection": "Keep-Alive",
                   "X-Api-Key": self.auth_token}
        json_data = json.dumps(data)
        try:
            self.h_conn.request(method, endpoint, json_data, headers)
        except (httplib.CannotSendRequest, httplib.BadStatusLine) as e:
            logger.error("Error during request, retrying")
            self.close_connection()
            self.get_online()
            self.h_conn.request(method, endpoint, json_data, headers)
        res = self.h_conn.getresponse()

        if res.status == 401:
            logger.error("Token is invalid, trying to login again")
            self.login()
            return None
        elif res.status != 200:
            logger.error("The %s request didn't go as expected", method)
            logger.debug("Status code was %d and content was %s",
                         res.status, res.read())
            return None
        content_type = res.getheader("Content-Encoding")
        if content_type == "gzip":
            buf = StringIO(res.read())
            res = gzip.GzipFile(fileobj=buf)
        data = res.read()
        try:
            result = json.loads(data)
        except BaseException:
            raise IOError
        return result

    def poli_post(self, endpoint="/", data=None):
        result = self.poli_request(endpoint, data, method='POST')
        return result

    def poli_get(self, endpoint="/", data=None):
        result = self.poli_request(endpoint, data, method='GET')
        return result

    def poli_put(self, endpoint="/", data=None):
        result = self.poli_request(endpoint, data, method='PUT')
        return result

    def poli_delete(self, endpoint='/', data=None):
        result = self.poli_request(endpoint, data, method='DELETE')
        return result

    def poli_patch(self, endpoint='/', data=None):
        result = self.poli_request(endpoint, data, method='PATCH')
        return result

    def send_sample(self, filedata):
        """
            Ugly wrapper for uploading a file in multipart/form-data
        """
        endpoint = "/api/1.0/samples/"
        headers = {"Accept-encoding": "gzip, deflate",
                   "X-Api-Key": self.auth_token}

        method = "POST"
        boundary = "70f6e331562f4b8f98e5f9590e0ffb8e"
        headers["Content-type"] = "multipart/form-data; boundary=" + boundary
        body = "--" + boundary
        body += "\r\n"
        body += "Content-Disposition: form-data; name=\"filename\"\r\n"
        body += "\r\n"
        body += idc.get_root_filename()
        body += "\r\n\r\n"
        body += "--" + boundary + "\r\n"

        body += "Content-Disposition: form-data;"
        body += "name=\"file\"; filename=\"file\"\r\n"
        body += "\r\n"
        body += filedata.read()
        body += "\r\n--"
        body += boundary
        body += "--\r\n"

        self.h_conn.request(method, endpoint, body, headers)
        res = self.h_conn.getresponse()
        data = res.read()
        try:
            result = json.loads(data)
        except BaseException:
            logger.exception("Cannot load json data from server")
            result = None
        return result

    def get_sample_id(self):
        """
            Query the server for the sample ID
        """
        endpoint = "/api/1.0/samples/"
        endpoint += lower(idc.retrieve_input_file_md5())
        endpoint += "/"
        try:
            data = self.poli_get(endpoint)
            if data["sample_id"] is not None:
                return data["sample_id"]
            else:
                return False
        except BaseException:  # 404?
            return False

    def init_sample_id(self):
        """
            test if the remote sample exists,
            if not, we upload it
        """
        if self.sample_id is None:
            self.sample_id = self.get_sample_id()
            if not self.sample_id:
                logger.warning("Sample not found on server, uploading it")
                self.send_sample(open(idc.get_root_filename(), 'rb'))
                self.sample_id = self.get_sample_id()
                logger.info("Sample ID: %d", self.sample_id)


class SkelIDAConnection(SkelConnection):
    def push_comment(self, address=0, comment=None):
        """
            Push a standard comment
        """
        if comment is None:
            return False
        data = {"address": address,
                "comment": comment}
        endpoint = self.prepare_endpoint('comments')
        res = self.poli_post(endpoint, data)
        if res["result"]:
            logger.debug(
                "Comment %s sent for address 0x%x",
                comment,
                address)
        else:
            logger.error("Cannot send comment %s ( 0x%x )", comment, address)
        return res["result"]

    def push_type(self, address, mtype=None):
        """
            Push defined types, parsed with prepare_parse_type
        """
        data = {"address": address,
                "typedef": mtype}
        endpoint = self.prepare_endpoint('types')
        res = self.poli_post(endpoint, data)
        if res["result"]:
            logger.debug("New type %s sent for address 0x%x", mtype, address)
        else:
            logger.error("Cannot send type %s ( 0x%x )", mtype, address)
        return res["result"]

    def get_abstract(self):
        endpoint = self.prepare_endpoint("abstract")
        abstract = self.poli_get(endpoint)
        return abstract["abstract"]

    def push_abstract(self, abstract):
        endpoint = self.prepare_endpoint("abstract")
        data = {"abstract": abstract}
        res = self.poli_post(endpoint, data)
        if res["result"]:
            logger.debug("Abstract sent!")
        else:
            logger.error("Cannot send abstract...\n Error %s", res)

    def get_comments(self, timestamp=None):
        endpoint = self.prepare_endpoint('comments')
        format_ts = "%Y-%m-%dT%H:%M:%S.%f"
        if timestamp is not None:
            endpoint += "?timestamp="
            endpoint += datetime.datetime.strftime(timestamp, format_ts)
        res = self.poli_get(endpoint)
        return res["comments"]

    def get_names(self, timestamp=None):
        """
            Get all names defined in the database
        """
        endpoint = self.prepare_endpoint('names')
        format_ts = "%Y-%m-%dT%H:%M:%S.%f"
        if timestamp is not None:
            endpoint += "?timestamp="
            endpoint += datetime.datetime.strftime(timestamp, format_ts)
        res = self.poli_get(endpoint)
        return res["names"]

    def get_proposed_names(self):
        """
            Get machoc proposed names
            Returns a list of dictionaries by address
        """
        endpoint = self.prepare_endpoint("functions/proposednames")
        res = self.poli_get(endpoint)
        return res["functions"]

    def push_name(self, address=0, name=None):
        """
            Send a define name, be it func or area
        """
        if name is None:
            return False
        data = {"address": address,
                "name": name}
        endpoint = self.prepare_endpoint('names')
        res = self.poli_post(endpoint, data)
        if res["result"]:
            logger.debug("sent name %s at 0x%x", name, address)
        else:
            logger.error("failed to send name %s", name)
        return True

    def create_struct(self, struct_name):
        """
            Create a structure in the database
            @arg:
                the structure name
            @return:
                The struct id, False if failed
        """
        endpoint = self.prepare_endpoint('structs')
        data = dict(name=struct_name)
        res = self.poli_post(endpoint, data)
        if not res["result"]:
            return False
        sid = res["structs"][0]["id"]
        return sid

    def get_struct_by_name(self, name):
        """
            Return the remote struct id given a name
        """
        endpoint = self.prepare_endpoint("structs/"+name)
        res = self.poli_get(endpoint)
        if "name" in res["structs"].keys():
            return res["structs"]["id"]
        else:
            return None

    def get_member_by_name(self, sid, name):
        """
            Should probably be implemented server side
        """
        endpoint = self.prepare_endpoint("structs/"+str(sid))
        res = self.poli_get(endpoint)
        if "members" in res["structs"].keys():
            for member in res["structs"]["members"]:
                if member["name"] == name:
                    return member["id"]
        return None

    def rename_struct(self, struct_id, new_name):
        """
            Rename a struct
        """
        endpoint = self.prepare_endpoint("structs/"+str(struct_id))
        data = dict(name=new_name)
        res = self.poli_patch(endpoint, data)
        return res

    def delete_struct(self, struct_id):
        endpoint = self.prepare_endpoint("structs/"+str(struct_id))
        res = self.poli_delete(endpoint)
        return res

    def create_struct_member(self, sid, name, start_offset):
        """
            Create a new member for a struct.
        """
        endpoint = self.prepare_endpoint("structs/"+str(sid)+"/members")
        data = dict(name=name, size=1, offset=start_offset)
        res = self.poli_post(endpoint, data=data)
        return res

    def resize_struct_member(self, sid, mid, size):
        """
            Sets a new size for a struct member
        """
        raise NotImplementedError

    def rename_struct_member(self, sid, mid, name):
        """
            Rename a struct member
        """
        endpoint = self.prepare_endpoint("structs/"+str(sid)+"/members")
        data = dict(mid=mid, newname=name)
        res = self.poli_patch(endpoint, data=data)
        return res

    def prepare_endpoint(self, submodule):
        """
            Prepare a standard API endpoint
        """
        endpoint = self.remote_path + "samples/"
        endpoint += str(self.sample_id)
        endpoint += "/" + submodule + "/"
        return endpoint