scripts/check_urls/check_urls.py
import os
import re
import ssl
from multiprocessing import Manager, Pool, cpu_count
from os.path import abspath, basename, dirname, join
from pathlib import Path
from sys import argv
from typing import List
import certifi
import requests
import urllib3
from slack import WebClient
from slack.errors import SlackApiError
from urlextract import URLExtract
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
MAX_TIMEOUT = 20
global_uris = set()
SLACK_TOKEN = os.environ.get("IO_APP_SLACK_HELPER_BOT_TOKEN", None)
tagged_people = ["<!here>"]
SLACK_CHANNEL = "#io_dev_app_status"
# a list of remote uris consumed by the app for content presentation
remote_content_uri = ["https://assets.cdn.io.pagopa.it/bonus/bonus_available_v2.json",
"https://assets.cdn.io.pagopa.it/contextualhelp/data.json",
"https://assets.cdn.io.pagopa.it/status/backend.json"]
class IOUrl(object):
def __init__(self, io_uri, source):
self.uri = io_uri
self.source = source
self.has_error = False
self.error = None
def set_error(self, error):
self.error = error
self.has_error = True
def scan_directory(path, file_black_list, black_list_url, ext_set={'*.ts*'}):
"""
Scan the chosen directory, and the sub-directories and returns the execution of read_file from the found collection of files
:param path: directory to scan
:param file_black_list: a set of files to exclude from scanning
:param black_list_url: a set of urls to exclude from scanning
:param ext_set: file extension to retrieve
:return: a dictionary containing all uris found
"""
path = path[:-1] if path[-1] == "/" else path
files = []
for ext in ext_set:
files.extend(list(Path(path).rglob(ext)))
to_remove = []
black_list_files = tuple(file_black_list)
# exclude test files
for f in files:
# exclude all those files that are included in the blacklist or are tests, mocks, or snaps
if re.search(r'(__tests?__|__mocks?__|tsx.snap)', abspath(f), re.IGNORECASE) or str(f).endswith(black_list_files):
to_remove.append(f)
continue
for tr in to_remove:
files.remove(tr)
return read_file(files, black_list_url)
def extract_uris(text, black_list_url={''}):
extractor = URLExtract()
urls = set(extractor.find_urls(text))
urls = list(map(lambda r: r.replace(")", "").replace("}", ""),
filter(lambda r: r.startswith("http") or r.startswith("www"), urls)))
urls_set = set(filter(lambda f: f not in black_list_url, urls))
return urls_set
def read_file(files, black_list_url):
"""
Reads the collection of files passed as parameter and returns the set of uris found inside all the files
:param files: an iterable of file paths
:param black_list_url: a set of urls to exclude from scanning
:return: a dictionary containing all uris found (the key is the uri the value is the list of files where it is found)
"""
uri_map = {}
for path in files:
with open(path, 'r') as f:
content = f.read()
uri_list = extract_uris(content)
uri_list = list(
filter(lambda f: f not in black_list_url, uri_list))
for u in uri_list:
if u in uri_map:
uri_map[u].append(basename(str(path)))
else:
uri_map[u] = [basename(str(path))]
return uri_map
def load_remote_content(uri_to_load):
try:
r = requests.get(uri_to_load, timeout=MAX_TIMEOUT)
if r.ok:
return r.text
return None
except:
return None
def test_protocol(uri_to_test):
"""
check if the protocol is http (it could cause a crash inside the app cause http is not allow)
:param uri:
:return:
"""
if re.search(r'^http:', uri_to_test, re.IGNORECASE) is not None:
return "has not https protocol"
return None
def test_availability(uri_to_test):
"""
Tests the uri passed as argument making an http get request.
If it causes an exception or an error code the uri will be returned
:param uri: the uri to test
:return: the uri if it is problematic, None otherwise
"""
try:
r = requests.get(uri_to_test, headers=HEADERS,
timeout=MAX_TIMEOUT, verify=False)
if r.ok:
return None
return "status code %d" % r.status_code
except requests.exceptions.SSLError:
# this is not an issue for the url availability
return None
except requests.ConnectionError as e:
return "Connection Error - " + str(e)
except requests.Timeout as e:
return "Timeout - " + str(e)
except requests.RequestException as e:
return "General Error - " + str(e)
except Exception as e:
return str(e)
def test_http_uri(io_url: IOUrl):
# a list of test to apply
tests = [test_availability]
for t in tests:
res = t(io_url.uri)
if res is not None:
io_url.set_error(res)
return io_url
def send_slack_message(invalid_uris_list: List[IOUrl]):
"""
Sends the report of the check to slack to notify the status of the static texts of the app
:return:
"""
try:
# avoid ssl certificate warning
ssl_context = ssl.create_default_context(cafile=certifi.where())
rtm_client = WebClient(
token=SLACK_TOKEN, ssl=ssl_context
)
if len(invalid_uris_list) > 0:
tags = " ".join(tagged_people)
message = "[URLs Check] :warning: %s There are %d uris in *IO App* that are not working" % (
tags, len(invalid_uris_list))
message_blocks = []
message_blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": message
}
})
rtm_client.chat_postMessage(
channel=SLACK_CHANNEL,
blocks=message_blocks
)
message_blocks = []
for iu in invalid_uris_list:
message = "`%s` `%s` -> ```%s```" % (
iu.source, iu.error, iu.uri)
message_blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": message
}
})
rtm_client.chat_postMessage(
channel=SLACK_CHANNEL,
blocks=message_blocks
)
except SlackApiError as e:
# You will get a SlackApiError if "ok" is False
assert e.response["ok"] is False
# str like 'invalid_auth', 'channel_not_found'
assert e.response["error"]
print(f"Got an error: {e.response['error']}")
run_test = len(argv) > 1 and argv[1] == "run_tests"
# since this code is executed multiple time for each process spawned
# we have to ensure the init part is execute only the first time
if not run_test and __name__ == '__main__':
files_black_list = {"testFaker.ts", "PayWebViewModal.tsx", "paymentPayloads.ts", "message.ts",
"supportAssistance.ts", "ZendeskAskPermissions.tsx"}
manager = Manager()
print("scanning local folders...")
all_uris = set()
urls_black_list = {
# 403 when this check runs (in the middle of the night)
"https://id.lepida.it/docs/manuale_utente.pdf",
# returns a 404 anytime the check runs but it actually works fine
"https://checkout.pagopa.it/dona",
# Mixpanel EU endpoint
"https://api-eu.mixpanel.com"
# Profile 412 status error types
# Already taken type
"https://ioapp.it/problems/email-already-taken"
# EIC UAT endpoint
"https://collaudo.idserver.servizicie.interno.gov.it/idp"
# localhost is not reachable
"https://localhost",
# local url used for E2E tests
"http://127.0.0.1:3000/api/v1/cgn/delete",
# not accessible outside of PagoPA
"https://docs.google.com/presentation/d/11rEttb7lJYlRqgFpl4QopyjFmjt2Q0K8uis6JhAQaCw/edit#slide=id.p"
}
locales = (abspath(join(dirname(__file__), "../..", "locales")), set())
ts_dir = (abspath(join(dirname(__file__), "../..", "ts")), files_black_list)
for directory, black_list in [locales, ts_dir]:
files_found = scan_directory(directory, black_list, urls_black_list)
print("found %d files in %s" % (len(files_found.keys()), directory))
all_uris.update(
list(map(lambda kv: IOUrl(kv[0], "|".join(kv[1])), files_found.items())))
print("scanning remote resources...")
for ru in remote_content_uri:
c = load_remote_content(ru)
if c is not None:
uris = extract_uris(c, urls_black_list)
all_uris.update(list(map(lambda u: IOUrl(u, basename(ru)), uris)))
pool = Pool(cpu_count())
invalid_uri_processing = []
print(
f"found and processing {len(all_uris)} uris using {cpu_count()} cpus")
for uri in all_uris:
invalid_uri_processing.append(
pool.apply_async(test_http_uri, args=(uri,)))
# get all processes results
invalid_uris = list(map(lambda r: r.get(), invalid_uri_processing))
# remove None results from list
invalid_uris = list(filter(lambda r: r.has_error, invalid_uris))
pool.close()
print('found %d broken or invalid uris' % len(invalid_uris))
if len(invalid_uris):
msg = '\nfound %d errors\n' % len(invalid_uris)
msg += "\n".join(list(map(lambda iu: "[%s][%s] -> %s" %
(iu.source, iu.error, iu.uri), invalid_uris)))
msg += "\n"
print(msg)
if SLACK_TOKEN:
send_slack_message(invalid_uris)
else:
print("no SLACK token provided")
if run_test:
print("running tests...")
a_text_with_urls = '''[a](http://foo.com)
[a](http://goo.gl)
[a](https://foo.com)
[a](https://www.foo.com)
[a](https://www.foo.com/)
[a](https://www.foo.com/bar)
[a](http://goo.gl/1 http://goo.gl/2
foo [a](http://goo.gl/1) [a](http://goo.gl/(2))
[a](http://foo.com/.) [a](http://foo.com/)! [a](http://foo.com/),
This url does not have a protocol: goo.gl/1
[a](http://firstround.com/review/thoughts-on-gender-and-radical-candor/?ct=t(How_Does_Your_Leadership_Team_Rate_12_3_2015))
[a](https://google.com)
https:google.com
www.cool.com.au
[a](http://www.cool.com.au)
[a](http://www.cool.com.au/ersdfs)
[a](http://www.cool.com.au/ersdfs?dfd=dfgd@s=1)
[a](http://www.cool.com:81/index.html)'''
test1 = extract_uris("[hello world](http://test.com)")
assert len(test1) == 1
assert "http://test.com" in test1
test2 = extract_uris(
"[a](https://test2.com) hello world [b](http://test.com)")
assert len(test2) == 2
assert "https://test2.com" in test2
assert "http://test.com" in test2
test3 = extract_uris(
"[a](https://www.test.com) site.it [b](https://empty)")
assert len(test3) == 1
assert "https://www.test.com" in test3
test4 = extract_uris(a_text_with_urls)
assert len(test4) == 18
test5 = extract_uris("bla bla http://www.google.it")
assert len(test5) == 1
test6 = extract_uris("bla bla http://www.google.it",
["http://www.google.it"])
assert len(test6) == 0
print("all tests passed")