tristanlatr/WPWatcher

View on GitHub
wpwatcher/scan.py

Summary

Maintainability
A
1 hr
Test Coverage
"""
Scanner utility. 
"""
from typing import Optional, BinaryIO, List, Tuple, Dict, Any, Union
import threading
import re
import os
import traceback
import json
from smtplib import SMTPException
from datetime import timedelta, datetime

from wpscan_out_parse import WPScanJsonParser, WPScanCliParser

from wpwatcher import log
from wpwatcher.__version__ import __version__
from wpwatcher.utils import (
    get_valid_filename,
    safe_log_wpscan_args,
    oneline,
    remove_color,
)
from wpwatcher.email import EmailSender
from wpwatcher.wpscan import WPScanWrapper
from wpwatcher.syslog import SyslogOutput
from wpwatcher.report import ScanReport
from wpwatcher.site import Site
from wpwatcher.config import Config


# Date format used everywhere
DATE_FORMAT = "%Y-%m-%dT%H-%M-%S"


class Scanner:
    """Scanner class create reports and handles the scan process. """

    def __init__(self, conf: Config):

        # Create (lazy) wpscan link
        self.wpscan = WPScanWrapper(
            wpscan_path=conf["wpscan_path"], 
            scan_timeout=conf["scan_timeout"],
            api_limit_wait=conf["api_limit_wait"],
            follow_redirect=conf["follow_redirect"], )

        # Init mail client
        self.mail: EmailSender = EmailSender(conf)
        
        # Toogle if aborting so other errors doesnt get triggerred and exit faster
        self.interrupting: bool = False
        # List of scanned URLs
        self.scanned_sites: List[Optional[str]] = []

        # Save required config options
        self.wpscan_output_folder: str = conf["wpscan_output_folder"]
        self.wpscan_args: List[str] = conf["wpscan_args"]
        self.fail_fast: bool = conf["fail_fast"]
        self.false_positive_strings: List[str] = conf["false_positive_strings"]

        # Init wpscan output folder
        if self.wpscan_output_folder:
            os.makedirs(self.wpscan_output_folder, exist_ok=True)
            os.makedirs(
                os.path.join(self.wpscan_output_folder, "error/"), exist_ok=True
            )
            os.makedirs(
                os.path.join(self.wpscan_output_folder, "alert/"), exist_ok=True
            )
            os.makedirs(
                os.path.join(self.wpscan_output_folder, "warning/"), exist_ok=True
            )
            os.makedirs(os.path.join(self.wpscan_output_folder, "info/"), exist_ok=True)

        self.syslog: Optional[SyslogOutput] = None
        self.broken_syslog = False
        if conf["syslog_server"]:
            try:
                self.syslog = SyslogOutput(conf)
            except Exception as e:
                self.broken_syslog = True
                log.error(f"Broken Syslog: {e}\n{traceback.format_exc()}")
                
        

    def interrupt(self) -> None:
        """
        Call `WPScanWrapper.interrupt`. 

        This do NOT raise SystemExit. 
        """
        self.interrupting = True
        self.wpscan.interrupt()

    # Scan process

    @staticmethod
    def _write_wpscan_output(wp_report: ScanReport, fpwpout: BinaryIO) -> None:
        """Helper method to write output to file"""
        nocolor_output = remove_color(wp_report["wpscan_output"])
        try:
            fpwpout.write(nocolor_output.encode("utf-8"))
        except UnicodeEncodeError:
            fpwpout.write(nocolor_output.encode("latin1", errors='replace'))

    def write_wpscan_output(self, wp_report: ScanReport) -> Optional[str]:
        """Write WPScan output to configured place with `wpscan_output_folder` if configured"""
        # Subfolder
        folder = f"{wp_report['status'].lower()}/"
        # Write wpscan output
        if self.wpscan_output_folder:
            wpscan_results_file = os.path.join(
                self.wpscan_output_folder,
                folder,
                get_valid_filename(
                    f"WPScan_output_{wp_report['site']}_{wp_report['datetime']}.txt"
                ),
            )
            log.info(f"Saving WPScan output to file {wpscan_results_file}")
            with open(wpscan_results_file, "wb") as wpout:
                self._write_wpscan_output(wp_report, wpout)
                return wpout.name
        else:
            return None

    
    def log_report_results(self, wp_report: ScanReport) -> None:
        """Print WPScan findings"""
        for info in wp_report["infos"]:
            log.info(oneline(f"** WPScan INFO {wp_report['site']} ** {info}"))
        for fix in wp_report["fixed"]:
            log.info(oneline(f"** FIXED Issue {wp_report['site']} ** {fix}"))
        for warning in wp_report["warnings"]:
            log.warning(oneline(f"** WPScan WARNING {wp_report['site']} ** {warning}"))
        for alert in wp_report["alerts"]:
            log.critical(oneline(f"** WPScan ALERT {wp_report['site']} ** {alert}"))

    def _scan_site(
        self, wp_site: Site, wp_report: ScanReport
    ) -> Optional[ScanReport]:
        """
        Handled WPScan scanning , parsing, errors and reporting.
        Returns filled wp_report, None if interrupted or killed.
        Can raise `RuntimeError` if any errors.
        """

        # WPScan arguments
        wpscan_arguments = (
            self.wpscan_args + wp_site["wpscan_args"] + ["--url", wp_site["url"]]
        )

        # Output
        log.info(f"Scanning site {wp_site['url']}")

        # Launch WPScan
        wpscan_process = self.wpscan.wpscan(
            *wpscan_arguments
        )
        wp_report["wpscan_output"] = wpscan_process.stdout

        log.debug(f"WPScan raw output:\n{wp_report['wpscan_output']}")
        log.debug("Parsing WPScan output")

        try:
            # Use wpscan_out_parse module
            try:
                parser = WPScanJsonParser(
                    json.loads(wp_report["wpscan_output"]),
                    self.false_positive_strings
                    + wp_site["false_positive_strings"] 
                    + ["No WPVulnDB API Token given", "No WPScan API Token given"]
                )
            except ValueError as err:
                parser = WPScanCliParser(
                    wp_report["wpscan_output"],
                    self.false_positive_strings
                    + wp_site["false_positive_strings"]
                    + ["No WPVulnDB API Token given", "No WPScan API Token given"]
                )
            finally:
                wp_report.load_parser(parser)

        except Exception as err:
            raise RuntimeError(
                f"Could not parse WPScan output for site {wp_site['url']}\nOutput:\n{wp_report['wpscan_output']}"
            ) from err

        # Exit code 0: all ok. Exit code 5: Vulnerable. Other exit code are considered as errors
        if wpscan_process.returncode in [0, 5]:
            return wp_report

        # Quick return if interrupting and/or if user cancelled scans
        if self.interrupting or wpscan_process.returncode in [2, -2, -9]:
            return None

        # Other errors codes : 127, etc, simply raise error
        err_str = f"WPScan failed with exit code {wpscan_process.returncode}. \nArguments: {safe_log_wpscan_args(wpscan_arguments)}. \nOutput: \n{remove_color(wp_report['wpscan_output'])}\nError: \n{wpscan_process.stderr}"
        raise RuntimeError(err_str)


    def _fail_scan(self, wp_report: ScanReport, err_str: str) -> None:
        """
        Common manipulations when a scan fail. This will not stop the scans
        unless --fail_fast if enabled.

        Triger InterruptedError if fail_fast and not already interrupting.
        """
        
        wp_report.fail(err_str)
        if self.fail_fast and not self.interrupting:
            raise InterruptedError()

    def scan_site(
        self, wp_site: Site, last_wp_report: Optional[ScanReport] = None
    ) -> Optional[ScanReport]:
        """
        Orchestrate the scanning of a site.

        :Return: The scan report or `None` if something happened.
        """

        # Init report variables
        wp_report: ScanReport = ScanReport(
            {"site": wp_site["url"], "datetime": datetime.now().strftime(DATE_FORMAT)}
        )

        # Launch WPScan
        try:
            # If report is None, return None right away
            if not self._scan_site(wp_site, wp_report):
                return None

        except RuntimeError:

            self._fail_scan(
                wp_report,
                f"Could not scan site {wp_site['url']} \n{traceback.format_exc()}",
            )


        # Updating report entry with data from last scan
        wp_report.update_report(last_wp_report)

        self.log_report_results(wp_report)

        wpscan_command = " ".join(
            safe_log_wpscan_args(
                ["wpscan"]
                + self.wpscan_args
                + wp_site["wpscan_args"]
                + ["--url", wp_site["url"]]
            )
        )

        try:

            # Notify recepients if match triggers
            if self.mail.notify(
                wp_site, wp_report, 
                last_wp_report, 
                wpscan_command=wpscan_command,
                wpscan_version=self.wpscan._wpscan_version or '??',
            ):
                # Store report time
                wp_report["last_email"] = wp_report["datetime"]
                # Discard fixed items because infos have been sent
                wp_report["fixed"] = []

        # Handle sendmail errors
        except (SMTPException, ConnectionRefusedError, TimeoutError):
            self._fail_scan(
                wp_report,
                f"Could not send mail report for site {wp_site['url']}\n{traceback.format_exc()}",
            )

        
        # Send syslog if self.syslog is not None
        if self.syslog:
            try:
                self.syslog.emit_messages(wp_report)
            except Exception:
                self._fail_scan(
                    wp_report,
                    f"Could not send syslog messages for site {wp_site['url']}\n{traceback.format_exc()}",
                )

        # Save scanned site
        self.scanned_sites.append(wp_site["url"])

        # Discard wpscan_output from report
        if "wpscan_output" in wp_report:
            del wp_report["wpscan_output"]

        # Discard wpscan_parser from report
        if "wpscan_parser" in wp_report:
            del wp_report["wpscan_parser"]


        return wp_report