xybu/onedrived-dev

View on GitHub
onedrived/od_webhooks/ngrok_server.py

Summary

Maintainability
A
1 hr
Test Coverage
import logging
import os
import subprocess
import time
import shutil

import psutil
import requests

from . import http_server


class WebhookConfig(http_server.WebhookConfig):

    def __init__(self, port=0, ngrok_config_path=None):
        super().__init__(port=port)
        self.ngrok_path = os.getenv('NGROK', 'ngrok')
        ngrok_config_path = os.getenv('NGROK_CONFIG_FILE', ngrok_config_path)
        if isinstance(ngrok_config_path, str):
            self.ngrok_config_path = ngrok_config_path


def _append_cmd_arg(config, prop, arg, cmd):
    if hasattr(config, prop):
        cmd.append(arg)
        cmd.append(getattr(config, prop))


class WebhookListener(http_server.WebhookListener):

    POLL_TUNNELS_MAX_TRIES = 30

    def __init__(self, config, handler_class):
        super().__init__(config, handler_class)
        if shutil.which(config.ngrok_path) is None:
            raise RuntimeError('Did not find ngrok executable "%s".' % config.ngrok_path)
        cmd = [config.ngrok_path, 'http', str(self.server.server_port), '-bind-tls=true']
        _append_cmd_arg(config, 'ngrok_config_path', '--config', cmd)
        self._start_ngrok_process(cmd)
        self._read_ngrok_tunnels()

    @property
    def ngrok_api_url(self):
        return self._api_url

    @property
    def webhook_url(self):
        return self._webhook_url

    def stop(self):
        try:
            self.ngrok_proc.terminate()
            self.ngrok_proc.wait(timeout=1)
        except subprocess.TimeoutExpired:
            self.ngrok_proc.kill()
        super().stop()

    def run(self):
        logging.info('Local webhook server listening on port %d.', self.server.server_port)
        super().run()

    def _start_ngrok_process(self, cmd):
        try:
            self.ngrok_proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            self.ngrok_proc.wait(timeout=1)
            logging.critical('ngrok process (pid %d) terminated early with return code %d. Command: "%s".',
                             self.ngrok_proc.pid, self.ngrok_proc.returncode, ' '.join(cmd))
            raise RuntimeError('Ngrok process exited early.')
        except subprocess.TimeoutExpired:
            pass

    def _find_ngrok_inspection_port(self):
        """
        :return (str, int):
        """
        for c in psutil.Process(self.ngrok_proc.pid).connections():
            if c.laddr[0] == '127.0.0.1' and c.raddr == () and c.status == psutil.CONN_LISTEN:
                return c.laddr
        raise RuntimeError('Did not find API interface of ngrok.')

    def _read_ngrok_tunnels(self):
        webhook_urls = dict()
        self._api_url = 'http://%s:%d/api' % self._find_ngrok_inspection_port()
        logging.info('Local ngrok API url: %s', self._api_url)
        for _ in range(0, self.POLL_TUNNELS_MAX_TRIES):
            try:
                data = requests.get(self._api_url + '/tunnels').json()
                if 'tunnels' not in data or len(data['tunnels']) == 0:
                    raise ValueError('ngrok API did not return any tunnel.')
                for tunnel in data['tunnels']:
                    if tunnel['config']['addr'].endswith(':' + str(self.server.server_port)):
                        webhook_urls[tunnel['proto']] = tunnel['public_url']
                break
            except (requests.ConnectionError, ValueError) as e:
                logging.error('Error reading ngrok API: %s. Retry in 1sec.', e)
                time.sleep(1)
        if 'https' in webhook_urls:
            self._webhook_url = webhook_urls['https'] + '/' + self.server.session_token
        else:
            raise RuntimeError('Did not receive any HTTPS tunnel from ngrok API.')