wpwatcher/wpscan.py
from typing import List, Tuple, Optional
import shlex
import subprocess
import json
import time
import re
import threading
from datetime import datetime, timedelta
from wpwatcher import log
from wpwatcher.utils import safe_log_wpscan_args, timeout
# Wait when API limit reached
API_WAIT_SLEEP = timedelta(hours=24)
"24h"
UPDATE_DB_INTERVAL: timedelta = timedelta(hours=1)
"1h"
INTERRUPT_TIMEOUT: int = 5
"Send kill signal after 5 seconds when interrupting."
# WPScan helper class -----------
class WPScanWrapper:
"""
Process level wrapper for WPScan with a few additions:
- Auto-update the WPSCan database on interval
- Supports multi-threading (update is done with a lock)
- Use a timeout for the scans, kills the process and raise error if reached
"""
_NO_VAL = datetime(year=2000, month=1, day=1)
_NO_VERSION = '0.0.0'
def __init__(self, wpscan_path: str, scan_timeout: Optional[timedelta] = None,
api_limit_wait: bool = False, follow_redirect: bool = False) -> None:
"""
:param wpscan_path: Path to WPScan executable.
Exemple: ``'/usr/local/rvm/gems/default/wrappers/wpscan'``
:param scan_timeout: Timeout
"""
self.processes: List[subprocess.Popen[bytes]] = []
"List of running WPScan processes"
self._wpscan_path: List[str] = shlex.split(wpscan_path)
self._scan_timeout: Optional[timedelta] = scan_timeout
self._update_lock: threading.Lock = threading.Lock()
self._lazy_last_db_update: Optional[datetime] = self._NO_VAL
self._lazy_wpscan_version: Optional[str] = None
self._api_limit_wait = api_limit_wait
self._follow_redirect = follow_redirect
self._api_wait: threading.Event = threading.Event()
self._interrupting = False
def wpscan(self, *args: str) -> subprocess.CompletedProcess: # type: ignore [type-arg]
"""
Run WPScan and return process results. Automatically update WPScan database.
:param args: Sequence of arguments to pass to WPScan.
Exemple: ``"--update", "--format", "json", "--no-banner"``
:returns: Custom `subprocess.CompletedProcess` instance with decoded output.
"""
if self._needs_update(): # for lazy update
while self._update_lock.locked():
time.sleep(0.01)
with self._update_lock:
if self._needs_update(): # Re-check in case of concurrent scanning
self._update_wpscan()
p = self._wpscan(*args)
if p.returncode not in [0, 5]:
return self._handle_wpscan_err(p)
else:
return p
# safe_log_wpscan_args
def interrupt(self) -> None:
"Send SIGTERM to all currently running WPScan processes. Unlock api wait. "
self._interrupting = True
self._api_wait.set()
for p in self.processes:
p.terminate()
# Wait for all processes to finish , kill after timeout
try:
timeout(INTERRUPT_TIMEOUT, self._wait_all_wpscan_process)
except TimeoutError:
for p in self.processes:
p.kill()
def _wait_all_wpscan_process(self) -> None:
"""
Wait all WPScan processes.
Should be called with timeout() function
"""
while len(self.processes) > 0:
time.sleep(0.5)
@property
def _last_db_update(self) -> Optional[datetime]:
if self._lazy_last_db_update == self._NO_VAL:
self._init_lazy_attributes()
return self._lazy_last_db_update
@property
def _wpscan_version(self) -> Optional[str]:
if self._lazy_wpscan_version == self._NO_VERSION:
self._init_lazy_attributes()
return self._lazy_wpscan_version
def _init_lazy_attributes(self) -> None:
wp_version_args = ["--version", "--format", "json", "--no-banner"]
try:
process = self._wpscan(*wp_version_args)
except FileNotFoundError as err:
raise FileNotFoundError(
"Could not find WPScan executale. "
"Make sure wpscan in you PATH or configure full path to executable in config file. "
"If you're using RVM+bundler, the path should point to the WPScan wrapper like '/usr/local/rvm/gems/default/wrappers/wpscan'"
) from err
else:
if process.returncode != 0:
raise RuntimeError(
f"There is an issue with WPScan. Non-zero exit code when requesting 'wpscan {' '.join(wp_version_args)}' \nOutput:\n{process.stdout}\nError:\n{process.stderr}"
)
version_info = json.loads(process.stdout)
if isinstance(version_info.get("last_db_update"), str):
self._lazy_last_db_update = datetime.strptime(
version_info["last_db_update"].split(".")[0], "%Y-%m-%dT%H:%M:%S"
)
else:
self._lazy_last_db_update = None
try:
self._lazy_wpscan_version = version_info['version']
except KeyError:
self._lazy_wpscan_version = None
def _update_wpscan(self) -> None:
# Update wpscan database
log.info("Updating WPScan")
process = self._wpscan(
"--update", "--format", "json", "--no-banner"
)
if process.returncode != 0:
raise RuntimeError(f"Error updating WPScan.\nOutput:{process.stdout}\nError:\n{process.stderr}")
self._lazy_last_db_update = datetime.now()
def _needs_update(self) -> bool:
return (
self._last_db_update == None or
( datetime.now() # type: ignore [operator]
- self._last_db_update
> UPDATE_DB_INTERVAL )
)
# Helper method: actually wraps wpscan
def _wpscan(self, *args: str) -> subprocess.CompletedProcess: # type: ignore [type-arg]
# WPScan arguments
arguments = list(args)
if arguments[0] == 'wpscan':
arguments.pop(0)
cmd = self._wpscan_path + arguments
# Log wpscan command without api token
log.debug(f"Running WPScan command: {' '.join(safe_log_wpscan_args(cmd))}")
# Run wpscan
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Append process to current process list and launch
self.processes.append(process)
if self._scan_timeout:
try:
stdout, stderr = timeout(self._scan_timeout.total_seconds(),
process.communicate)
except TimeoutError as err:
process.kill()
# Raise error
err_str = f"WPScan process '{safe_log_wpscan_args(cmd)}' timed out after {self._scan_timeout.total_seconds()} seconds. Setup 'scan_timeout' to allow more time. "
raise RuntimeError(err_str) from err
else:
stdout, stderr = process.communicate()
self.processes.remove(process)
try:
out_decoded = stdout.decode("utf-8")
err_decoded = stderr.decode("utf-8")
except UnicodeDecodeError:
out_decoded = stdout.decode("latin1", errors="replace")
err_decoded = stderr.decode("latin1", errors="replace")
finally:
return subprocess.CompletedProcess(
args = cmd,
returncode = process.returncode,
stdout = out_decoded,
stderr = err_decoded)
def _handle_wpscan_err_api_wait(
self, failed_process: subprocess.CompletedProcess ) -> subprocess.CompletedProcess: # type: ignore [type-arg]
"""
Sleep 24 hours and retry.
"""
log.info(
f"API limit has been reached, sleeping 24h and continuing the scans..."
)
self._api_wait.wait(API_WAIT_SLEEP.total_seconds())
if self._interrupting:
return failed_process
return self._wpscan(*failed_process.args)
def _handle_wpscan_err_follow_redirect(
self, failed_process: subprocess.CompletedProcess) -> subprocess.CompletedProcess: # type: ignore [type-arg]
"""Parse URL in WPScan output and retry.
"""
if "The URL supplied redirects to" in failed_process.stdout:
urls = re.findall(
r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+",
failed_process.stdout.split("The URL supplied redirects to")[1],
)
if len(urls) >= 1:
url = urls[0].strip()
log.info(f"Following redirection to {url}")
cmd = failed_process.args
cmd[cmd.index('--url')+1] = url
return self._wpscan(*cmd)
else:
raise ValueError(f"Could not parse the URL to follow in WPScan output after words 'The URL supplied redirects to'\nOutput:\n{failed_process.stdout}")
else:
return failed_process
def _handle_wpscan_err(self, failed_process: subprocess.CompletedProcess) -> subprocess.CompletedProcess: # type: ignore [type-arg]
"""Handle API limit and Follow redirection errors based on output strings.
"""
if (
"API limit has been reached" in str(failed_process.stdout)
and self._api_limit_wait
):
return self._handle_wpscan_err_api_wait(failed_process)
# Handle Following redirection
elif (
"The URL supplied redirects to" in str(failed_process.stdout)
and self._follow_redirect
):
return self._handle_wpscan_err_follow_redirect(failed_process)
else:
return failed_process