mobius3.py
import array
import argparse
import asyncio
from collections import (
defaultdict,
)
import ctypes
import contextlib
import datetime
import enum
import fcntl
import hashlib
import hmac
import termios
import json
import logging
import os
import re
import signal
import ssl
import stat
import sys
import time
import uuid
import urllib.parse
from pathlib import (
PurePosixPath,
)
import struct
from weakref import (
WeakValueDictionary,
)
from xml.etree import (
ElementTree as ET,
)
import httpx
from fifolock import (
FifoLock,
)
import sentry_sdk
from sentry_sdk.integrations.httpx import HttpxIntegration
libc = ctypes.CDLL('libc.so.6', use_errno=True)
libc.inotify_init.argtypes = []
libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32]
def call_libc(func, *args):
value = func(*args)
latest_errno = ctypes.set_errno(0)
if latest_errno:
raise OSError(latest_errno, os.strerror(latest_errno))
return value
class FileContentChanged(Exception):
pass
class WeakReferenceableDict(dict):
pass
class Mutex(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Mutex]
class InotifyEvents(enum.IntEnum):
IN_MODIFY = 0x00000002
IN_ATTRIB = 0x00000004
IN_CLOSE_WRITE = 0x00000008
IN_MOVED_FROM = 0x00000040
IN_MOVED_TO = 0x00000080
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
# Sent by the kernel without explicitly watching for them
IN_Q_OVERFLOW = 0x00004000
IN_IGNORED = 0x00008000
class InotifyFlags(enum.IntEnum):
IN_ONLYDIR = 0x01000000
IN_ISDIR = 0x40000000
class S3SyncLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return \
('[s3sync] %s' % (msg,), kwargs) if not self.extra else \
('[s3sync:%s] %s' % (','.join(str(v) for v in self.extra.values()), msg), kwargs)
def child_adapter(s3sync_adapter, extra):
return S3SyncLoggerAdapter(
s3sync_adapter.logger,
{**s3sync_adapter.extra, **extra},
)
def get_logger_adapter_default(extra):
return S3SyncLoggerAdapter(logging.getLogger('mobius3'), extra)
WATCH_MASK = \
InotifyEvents.IN_MODIFY | \
InotifyEvents.IN_ATTRIB | \
InotifyEvents.IN_CLOSE_WRITE | \
InotifyEvents.IN_MOVED_FROM | \
InotifyEvents.IN_MOVED_TO | \
InotifyEvents.IN_CREATE | \
InotifyEvents.IN_DELETE | \
InotifyFlags.IN_ONLYDIR
# We watch the download directly only for moves to be able to use the cookie
# to determine if a move is from a download so we then don't re-upload it
DOWNLOAD_WATCH_MASK = \
InotifyEvents.IN_MOVED_FROM
EVENT_HEADER = struct.Struct('iIII')
get_current_task = \
asyncio.current_task if hasattr(asyncio, 'current_task') else \
asyncio.Task.current_task
async def empty_async_iterator():
while False:
yield
async def streamed(data):
yield data
async def buffered(data):
return b''.join([chunk async for chunk in data])
@contextlib.contextmanager
def timeout(loop, max_time):
cancelling_due_to_timeout = False
current_task = get_current_task()
def cancel():
nonlocal cancelling_due_to_timeout
cancelling_due_to_timeout = True
current_task.cancel()
def reset():
nonlocal handle
handle.cancel()
handle = loop.call_later(max_time, cancel)
handle = loop.call_later(max_time, cancel)
try:
yield reset
except asyncio.CancelledError:
if cancelling_due_to_timeout:
raise asyncio.TimeoutError()
raise
finally:
handle.cancel()
def Pool(
get_ssl_context=ssl.create_default_context,
get_logger_adapter=get_logger_adapter_default,
force_ipv4=False,
):
logger = get_logger_adapter({})
async def log_request(request):
logger.info('[http] Request: %s %s', request.method, request.url)
async def log_response(response):
logger.info('[http] Response: %s %s %s', response.request.method, response.request.url, response.status_code)
return httpx.AsyncClient(
timeout=10.0,
transport=httpx.AsyncHTTPTransport(
retries=3,
verify=get_ssl_context(),
local_address="0.0.0.0" if force_ipv4 else None,
),
event_hooks={'request': [log_request], 'response': [log_response]}
)
def AWSAuth(service, region, client, get_credentials, content_hash=hashlib.sha256().hexdigest()):
def aws_sigv4_headers(access_key_id, secret_access_key, pre_auth_headers,
method, path, params):
algorithm = 'AWS4-HMAC-SHA256'
now = datetime.datetime.utcnow()
amzdate = now.strftime('%Y%m%dT%H%M%SZ')
datestamp = now.strftime('%Y%m%d')
credential_scope = f'{datestamp}/{region}/{service}/aws4_request'
pre_auth_headers_lower = tuple(
(header_key.lower(), ' '.join(header_value.split()))
for header_key, header_value in pre_auth_headers
)
required_headers = (
('x-amz-content-sha256', content_hash),
('x-amz-date', amzdate),
)
headers = sorted(pre_auth_headers_lower + required_headers)
signed_headers = ';'.join(key for key, _ in headers)
def signature():
def canonical_request():
canonical_uri = urllib.parse.quote(path, safe='/~')
quoted_params = sorted(
(urllib.parse.quote(key, safe='~'), urllib.parse.quote(value, safe='~'))
for key, value in params
)
canonical_querystring = '&'.join(f'{key}={value}' for key, value in quoted_params)
canonical_headers = ''.join(f'{key}:{value}\n' for key, value in headers)
return f'{method}\n{canonical_uri}\n{canonical_querystring}\n' + \
f'{canonical_headers}\n{signed_headers}\n{content_hash}'
def sign(key, msg):
return hmac.new(key, msg.encode('ascii'), hashlib.sha256).digest()
string_to_sign = f'{algorithm}\n{amzdate}\n{credential_scope}\n' + \
hashlib.sha256(canonical_request().encode('ascii')).hexdigest()
date_key = sign(('AWS4' + secret_access_key).encode('ascii'), datestamp)
region_key = sign(date_key, region)
service_key = sign(region_key, service)
request_key = sign(service_key, 'aws4_request')
return sign(request_key, string_to_sign).hex()
return (
('authorization', (
f'{algorithm} Credential={access_key_id}/{credential_scope}, '
f'SignedHeaders={signed_headers}, Signature=' + signature())
),
('x-amz-date', amzdate),
('x-amz-content-sha256', content_hash),
)
class _AWSAuth(httpx.Auth):
async def async_auth_flow(self, request):
access_key_id, secret_access_key, auth_headers = await get_credentials(client)
params = tuple((key.decode(), value.decode()) for (key, value) in urllib.parse.parse_qsl(request.url.query, keep_blank_values=True))
existing_headers = tuple((key, value) for (key, value) in request.headers.items() if key == 'host' or key.startswith('content-') or key.startswith('x-amz-'))
headers_to_set = aws_sigv4_headers(
access_key_id, secret_access_key, existing_headers + auth_headers,
request.method, request.url.path, params,
) + auth_headers
for key, value in headers_to_set:
request.headers[key] = value
yield request
return _AWSAuth()
async def get_credentials_from_environment(_):
return os.environ['AWS_ACCESS_KEY_ID'], os.environ['AWS_SECRET_ACCESS_KEY'], ()
def get_credentials_from_ecs_endpoint():
aws_access_key_id = None
aws_secret_access_key = None
pre_auth_headers = None
expiration = datetime.datetime.fromtimestamp(0)
async def _get_credentials(client):
nonlocal aws_access_key_id
nonlocal aws_secret_access_key
nonlocal pre_auth_headers
nonlocal expiration
now = datetime.datetime.now()
if now > expiration:
response = await client.request(
b'GET',
'http://169.254.170.2' + os.environ['AWS_CONTAINER_CREDENTIALS_RELATIVE_URI']
)
creds = json.loads(response.content)
aws_access_key_id = creds['AccessKeyId']
aws_secret_access_key = creds['SecretAccessKey']
expiration = datetime.datetime.strptime(creds['Expiration'], '%Y-%m-%dT%H:%M:%SZ')
pre_auth_headers = (
('x-amz-security-token', creds['Token']),
)
return aws_access_key_id, aws_secret_access_key, pre_auth_headers
return _get_credentials
def Syncer(
directory, bucket, endpoint, region,
prefix='',
concurrent_uploads=5,
concurrent_downloads=5,
get_credentials=get_credentials_from_environment,
get_pool=Pool,
flush_file_root='.__mobius3_flush__',
flush_file_timeout=5,
directory_watch_timeout=5,
download_directory='.mobius3',
get_logger_adapter=get_logger_adapter_default,
local_modification_persistance=120,
download_interval=10,
exclude_remote=r'^$',
exclude_local=r'^$',
upload_on_create=r'^$',
cloudwatch_monitoring_endpoint='',
cloudwatch_monitoring_region='',
cloudwatch_monitoring_namespace='',
):
loop = asyncio.get_running_loop()
logger = get_logger_adapter({})
directory = PurePosixPath(directory)
exclude_remote = re.compile(exclude_remote)
exclude_local = re.compile(exclude_local)
upload_on_create = re.compile(upload_on_create)
bucket_url = endpoint.format(bucket)
# The file descriptor returned from inotify_init
fd = None
# Watch descriptors to paths. A notification returns only a relative
# path to its watch descriptor path: these are used to find the full
# path of any notified-on files
wds_to_path = {}
# To migitate (but not eliminate) the chance that nested files are
# immediately re-uploaded
directory_watch_events = WeakValueDictionary()
# The asyncio task pool that performs the uploads
upload_tasks = []
# PUTs and DELETEs are initiated in the order generated by inotify events
upload_job_queue = asyncio.Queue()
download_manager_task = None
download_tasks = []
download_job_queue = asyncio.Queue()
# To prevent concurrent HTTP requests on the same files where order of
# receipt by S3 cannot be guaranteed, we wrap each request by a lock
# e.g. to prevent a DELETE overtaking a PUT
path_locks = WeakValueDictionary()
# A path -> content version dict is maintained during queues and uploads,
# and incremented on every modification of a file. When a path is
# scheduled to be uploaded, its version is copied. After the last read of
# data for an upload, but before it's uploaded, the copied version of the
# path is compared to the current version. If this is different, there was
# a change to the file contents, we know another upload will be scheduled,
# so we abort the current upload
content_versions = WeakValueDictionary()
# Before completing an upload, we force a flush of the event queue for
# the uploads directory to ensure that we have processed any change events
# that would upate the corresponding item in content_versions
flushes = WeakValueDictionary()
# During a queue of a PUT or DELETE, and for
# local_modification_persistance seconds, we do not overwrite local files
# with information from S3 for eventual consistency reasons
push_queued = defaultdict(int)
push_completed = ExpiringSet(loop, local_modification_persistance)
# When moving download files from the hidden directory to their final
# position, we would like to detect if this is indeed a move from a
# download (in which case we don't re-upload), or a real move from
download_cookies = ExpiringSet(loop, 10)
# When we delete a file locally, do not attempt to delete it remotely
ignore_next_delete = {}
# When we add a directory from a download, do not attempt to reupload
ignore_next_directory_upload = {}
# When downloading a file, we note its etag. We don't re-download it later
# if the etag matches
etags = {}
# Don't re-upload metadata if we think S3 already has it
meta = {}
# A cache of the file tree is maintained. Used for directory renames: we
# only get notified of renames _after_ they have happened, we need a way
# to know what objects are on S3 in order to DELETE them
tree_cache_root = {
'type': 'directory',
'children': {},
}
client = get_pool()
auth = AWSAuth(service='s3', region=region, client=client, get_credentials=get_credentials, content_hash='UNSIGNED-PAYLOAD')
def ensure_file_in_tree_cache(path):
parent_dir = ensure_parent_dir_in_tree_cache(path)
parent_dir['children'][path.name] = {
'type': 'file',
}
def ensure_dir_in_tree_cache(path):
parent_dir = ensure_parent_dir_in_tree_cache(path)
parent_dir['children'].setdefault(path.name, {
'type': 'directory',
'children': {},
})
def ensure_parent_dir_in_tree_cache(path):
directory = tree_cache_root
for parent in reversed(list(path.parents)):
directory = directory['children'].setdefault(parent.name, {
'type': 'directory',
'children': {},
})
return directory
def remove_from_tree_cache(path):
directory = tree_cache_root
for parent in reversed(list(path.parents)):
directory = directory['children'][parent.name]
del directory['children'][path.name]
def tree_cache_directory(path):
directory = tree_cache_root
for parent in reversed(list(path.parents)):
directory = directory['children'][parent.name]
return directory['children'][path.name]
def set_etag(path, headers):
etags[path] = headers['etag']
def queued_push_local_change(path):
push_queued[path] += 1
def completed_push_local_change(path):
push_queued[path] -= 1
if push_queued[path] == 0:
del push_queued[path]
push_completed.add(path)
def is_dir_pull_blocked(path):
return path in push_queued or path in push_completed
def is_pull_blocked(path):
# For extremely recent modifications we may not have yielded the event
# loop to add files to the queue. We do our best and check the mtime
# to prevent overriding with older remote data. However, after we
# check a file could still be modified locally, and we have no way to
# detect this
def modified_recently():
now = datetime.datetime.now().timestamp()
try:
return now - os.lstat(path).st_mtime < local_modification_persistance
except FileNotFoundError:
return False
return path in push_queued or path in push_completed or modified_recently()
async def start():
logger = get_logger_adapter({'mobius3_component': 'start'})
logger.info('Starting')
logger.info('Excluding: %s', exclude_remote)
nonlocal upload_tasks
nonlocal download_tasks
nonlocal download_manager_task
start_time = time.monotonic()
try:
os.mkdir(directory / download_directory)
except FileExistsError:
pass
upload_tasks = [
asyncio.create_task(process_jobs(upload_job_queue))
for i in range(0, concurrent_uploads)
]
download_tasks = [
asyncio.create_task(process_jobs(download_job_queue))
for i in range(0, concurrent_downloads)
]
start_inotify(logger, upload=False)
await list_and_schedule_downloads(logger)
download_manager_task = asyncio.create_task(
download_manager(get_logger_adapter({'mobius3_component': 'download'}))
)
end_time = time.monotonic()
logger.info('Finished starting: %s seconds', end_time - start_time)
if cloudwatch_monitoring_endpoint and cloudwatch_monitoring_region and cloudwatch_monitoring_namespace:
logger.info('Posting startup time to %s in region %s in namespace %s', cloudwatch_monitoring_endpoint, cloudwatch_monitoring_region, cloudwatch_monitoring_namespace)
cloudwatch_auth = AWSAuth(service='monitoring', region=cloudwatch_monitoring_region, client=client, get_credentials=get_credentials)
response = await client.post(cloudwatch_monitoring_endpoint, params=(
('Version', '2010-08-01'),
('Action', 'PutMetricData'),
('Namespace', cloudwatch_monitoring_namespace),
('MetricData.member.1.MetricName', 'StartupSyncTime'),
('MetricData.member.1.Unit', 'Seconds'),
('MetricData.member.1.Value', str(end_time - start_time)),
), auth=cloudwatch_auth)
response.raise_for_status()
def start_inotify(logger, upload):
nonlocal wds_to_path
nonlocal meta
nonlocal etags
nonlocal tree_cache_root
nonlocal fd
nonlocal ignore_next_delete
nonlocal ignore_next_directory_upload
ignore_next_delete = {}
ignore_next_directory_upload = {}
wds_to_path = {}
meta = {}
etags = {}
tree_cache_root = {
'type': 'directory',
'children': {},
}
fd = call_libc(libc.inotify_init)
def _read_events():
logger = get_logger_adapter({'mobius3_component': 'event'})
read_events(logger)
loop.add_reader(fd, _read_events)
watch_directory(download_directory, DOWNLOAD_WATCH_MASK)
watch_directory_recursive(logger, directory, WATCH_MASK, upload)
async def stop():
async def cancel(task):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Make every effort to read all incoming events and finish the queue
logger = get_logger_adapter({'mobius3_component': 'stop'})
logger.info('Stopping')
if download_manager_task is not None:
await cancel(download_manager_task)
for task in download_tasks:
await cancel(task)
read_events(logger)
while upload_job_queue._unfinished_tasks:
await upload_job_queue.join()
read_events(logger)
stop_inotify()
for task in upload_tasks:
await cancel(task)
await client.aclose()
await asyncio.sleep(0)
logger.info('Finished stopping')
def stop_inotify():
loop.remove_reader(fd)
os.close(fd)
def watch_directory(path, mask):
try:
wd = call_libc(libc.inotify_add_watch, fd, str(path).encode('utf-8'), mask)
except (NotADirectoryError, FileNotFoundError):
return
# After a directory rename, we will be changing the path of an
# existing entry, but that's fine
wds_to_path[wd] = path
# Notify any waiting watchers
directory_watch_events.setdefault(path, default=asyncio.Event()).set()
def watch_directory_recursive(logger, path, mask, upload):
logger.info('Watching directory: %s, with upload: %s', path, upload)
watch_directory(path, mask)
if PurePosixPath(path) not in [directory, directory / download_directory]:
try:
del ignore_next_directory_upload[path]
except KeyError:
if upload:
logger.info('Scheduling upload directory: %s', path)
schedule_upload_directory(logger, path)
# By the time we've added a watcher, files or subdirectories may have
# already been created
for root, dirs, files in os.walk(path):
if PurePosixPath(root) == directory / download_directory:
continue
if upload:
for file in files:
logger.info('Scheduling upload: %s', PurePosixPath(root) / file)
schedule_upload(logger, PurePosixPath(root) / file)
for d in dirs:
watch_directory_recursive(logger, PurePosixPath(root) / d, mask, upload)
def remote_delete_directory(logger, path):
# Directory nesting not likely to be large
def recursive_delete(prefix, directory):
for child_name, child in list(directory['children'].items()):
if child['type'] == 'file':
logger.info('Scheduling delete: %s', prefix / child_name)
schedule_delete(logger, prefix / child_name)
else:
recursive_delete(prefix / child_name, child)
schedule_delete_directory(logger, prefix / child_name)
try:
cache_directory = tree_cache_directory(path)
except KeyError:
# We may be moving from or deleting something not yet watched,
# in which case we leave S3 as it is. There may be file(s) in
# the queue to upload, but they will correctly fail if it can't
# find the file(s)
pass
else:
recursive_delete(path, cache_directory)
schedule_delete_directory(logger, path)
def read_events(parent_logger):
FIONREAD_output = array.array('i', [0])
fcntl.ioctl(fd, termios.FIONREAD, FIONREAD_output)
bytes_to_read = FIONREAD_output[0]
if not bytes_to_read:
return
raw_bytes = os.read(fd, bytes_to_read)
offset = 0
while offset < len(raw_bytes):
wd, mask, cookie, length = EVENT_HEADER.unpack_from(raw_bytes, offset)
offset += EVENT_HEADER.size
path = PurePosixPath(raw_bytes[offset:offset+length].rstrip(b'\0').decode('utf-8'))
offset += length
event_id = uuid.uuid4().hex[:8]
logger = child_adapter(parent_logger, {'event': event_id})
full_path = \
wds_to_path[wd] / path if wd != -1 else \
directory # Overflow event
events = [event for event in InotifyEvents.__members__.values() if event & mask]
item_type = \
'overflow' if mask & InotifyEvents.IN_Q_OVERFLOW else \
'flush' if path.name.startswith(flush_file_root) and full_path in flushes else \
'download' if full_path.parent == directory / download_directory else \
'dir' if mask & InotifyFlags.IN_ISDIR else \
'file'
for event in events:
handler_name = f'handle__{item_type}__{event.name}'
try:
handler = parent_locals[handler_name]
except KeyError:
continue
else:
logger.debug('Path: %s', full_path)
logger.debug('Handler: %s', handler_name)
try:
handler(logger, wd, cookie, full_path)
except Exception:
logger.exception('Exception calling handler')
def handle__overflow__IN_Q_OVERFLOW(logger, _, __, ___):
logger.warning('IN_Q_OVERFLOW. Restarting')
stop_inotify()
start_inotify(logger, upload=True)
def handle__flush__IN_CREATE(logger, _, __, path):
flush = flushes[path]
logger.debug('Flushing: %s', path)
flush.set()
def handle__download__IN_MOVED_FROM(logger, __, cookie, ___):
logger.debug('Cookie: %s', cookie)
download_cookies.add(cookie)
def handle__file__IN_ATTRIB(logger, _, __, path):
schedule_upload_meta(logger, path)
def handle__file__IN_CLOSE_WRITE(logger, _, __, path):
schedule_upload(logger, path)
def handle__file__IN_CREATE(logger, _, __, path):
if upload_on_create.match(str(path)) or os.path.islink(path):
schedule_upload(logger, path)
def handle__dir__IN_CREATE(logger, _, __, path):
watch_directory_recursive(logger, path, WATCH_MASK, upload=True)
def handle__file__IN_DELETE(logger, _, __, path):
try:
del etags[path]
except KeyError:
pass
try:
del meta[path]
except KeyError:
pass
try:
del ignore_next_delete[path]
except KeyError:
pass
else:
return
# Correctness does not depend on this bump: it's an optimisation
# that ensures we abandon any upload of this path ahead of us
# in the queue
bump_content_version(path)
schedule_delete(logger, path)
def handle__dir__IN_DELETE(logger, _, __, path):
try:
del ignore_next_delete[path]
except KeyError:
pass
else:
return
schedule_delete_directory(logger, path)
def handle__file__IN_IGNORED(_, wd, __, ___):
# For some reason IN_ISDIR is not set with IN_IGNORED
del wds_to_path[wd]
def handle__file__IN_MODIFY(_, __, ___, path):
bump_content_version(path)
def handle__dir__IN_MOVED_FROM(logger, _, __, path):
remote_delete_directory(logger, path)
def handle__file__IN_MOVED_FROM(logger, _, __, path):
try:
del etags[path]
except KeyError:
pass
schedule_delete(logger, path)
def handle__dir__IN_MOVED_TO(logger, _, __, path):
watch_directory_recursive(logger, path, WATCH_MASK, upload=True)
def handle__file__IN_MOVED_TO(logger, _, cookie, path):
bump_content_version(path)
if cookie in download_cookies:
logger.debug('Cookie: %s', cookie)
return
schedule_upload(logger, path)
def get_content_version(path):
return content_versions.setdefault(path, default=WeakReferenceableDict(version=0))
def bump_content_version(path):
get_content_version(path)['version'] += 1
def get_lock(path):
return path_locks.setdefault(path, default=FifoLock())
def schedule_upload_meta(logger, path):
if exclude_local.match(str(path)):
logger.info('Excluding from upload: %s', path)
return
version_current = get_content_version(path)
version_original = version_current.copy()
async def function():
try:
await upload_meta(logger, path, version_current, version_original)
finally:
completed_push_local_change(path)
ensure_file_in_tree_cache(path)
upload_job_queue.put_nowait((logger, function))
queued_push_local_change(path)
def schedule_upload(logger, path):
if exclude_local.match(str(path)):
logger.info('Excluding from upload: %s', path)
return
version_current = get_content_version(path)
version_original = version_current.copy()
async def function():
try:
await upload(logger, path, version_current, version_original)
finally:
completed_push_local_change(path)
ensure_file_in_tree_cache(path)
upload_job_queue.put_nowait((logger, function))
queued_push_local_change(path)
def schedule_upload_directory(logger, path):
if exclude_local.match(str(path)):
logger.info('Excluding from upload: %s', path)
return
async def function():
try:
await upload_directory(logger, path)
finally:
completed_push_local_change(path)
ensure_dir_in_tree_cache(path)
upload_job_queue.put_nowait((logger, function))
queued_push_local_change(path)
def schedule_delete(logger, path):
if exclude_local.match(str(path)):
logger.info('Excluding from delete: %s', path)
return
version_current = get_content_version(path)
version_original = version_current.copy()
async def function():
try:
await delete(logger, path, version_current, version_original)
finally:
completed_push_local_change(path)
try:
remove_from_tree_cache(path)
except KeyError:
pass
upload_job_queue.put_nowait((logger, function))
queued_push_local_change(path)
def schedule_delete_directory(logger, path):
if exclude_local.match(str(path)):
logger.info('Excluding from delete: %s', path)
return
async def function():
try:
await delete_directory(logger, path)
finally:
completed_push_local_change(path)
try:
remove_from_tree_cache(path)
except KeyError:
pass
upload_job_queue.put_nowait((logger, function))
queued_push_local_change(path)
async def process_jobs(queue):
while True:
logger, job = await queue.get()
try:
await job()
except Exception as exception:
if isinstance(exception, asyncio.CancelledError):
raise
if (
isinstance(exception, FileContentChanged) or
isinstance(exception.__cause__, FileContentChanged)
):
logger.info('Content changed, aborting: %s', exception)
if (
isinstance(exception, FileNotFoundError) or
isinstance(exception.__cause__, FileNotFoundError)
):
logger.info('File not found: %s', exception)
if (
not isinstance(exception, FileNotFoundError) and
not isinstance(exception.__cause__, FileNotFoundError) and
not isinstance(exception, FileContentChanged) and
not isinstance(exception.__cause__, FileContentChanged)
):
logger.exception('Exception during %s', job)
finally:
queue.task_done()
async def flush_events(logger, path):
flush_path = path.parent / (flush_file_root + uuid.uuid4().hex)
logger.debug('Creating flush file: %s', flush_path)
event = asyncio.Event()
flushes[flush_path] = event
with open(flush_path, 'w'):
pass
os.remove(flush_path)
# In rare cases, the event queue could be full and the event for
# the flush file is dropped
with timeout(loop, flush_file_timeout):
await event.wait()
async def wait_for_directory_watched(path):
# Inneficient search
if path.parent in wds_to_path.values():
return
event = directory_watch_events.setdefault(path.parent, default=asyncio.Event())
with timeout(loop, directory_watch_timeout):
await event.wait()
async def upload(logger, path, content_version_current, content_version_original):
logger.info('Uploading %s', path)
def with_is_last(iterable):
try:
last = next(iterable)
except StopIteration:
return
for val in iterable:
yield False, last
last = val
yield True, last
async def file_body():
with open(path, 'rb') as file:
for is_last, chunk in with_is_last(iter(lambda: file.read(16384), b'')):
if is_last:
await flush_events(logger, path)
if content_version_current != content_version_original:
raise FileContentChanged(path)
yield chunk
async def symlink_points_to():
yield os.readlink(path).encode('utf-8')
is_symlink = os.path.islink(path)
if not is_symlink:
content = file_body()
content_length = str(os.stat(path).st_size)
else:
content = symlink_points_to()
content_length = str(len(os.readlink(path).encode('utf-8')))
mtime = str(os.lstat(path).st_mtime)
mode = str(os.lstat(path).st_mode)
# Ensure we only progress if the content length hasn't changed since
# we have queued the upload
await flush_events(logger, path)
if content_version_current != content_version_original:
raise FileContentChanged(path)
data = (
('x-amz-meta-mtime', mtime),
('x-amz-meta-mode', mode),
)
def set_etag_and_meta(path, headers):
meta[path] = data
set_etag(path, headers)
await locked_request(
logger, b'PUT', path, file_key_for_path(path), content=content,
get_headers=lambda: (
('content-length', content_length),
) + data,
on_done=set_etag_and_meta,
)
async def upload_meta(logger, path, content_version_current, content_version_original):
logger.info('Uploading meta %s', path)
mtime = str(os.lstat(path).st_mtime)
mode = str(os.lstat(path).st_mode)
# Ensure we only progress if the content hasn't changed since we have
# queued the upload
await flush_events(logger, path)
if content_version_current != content_version_original:
raise FileContentChanged(path)
data = (
('x-amz-meta-mtime', mtime),
('x-amz-meta-mode', mode),
)
def set_meta(path, _):
meta[path] = data
key = file_key_for_path(path)
await locked_request(
logger, b'PUT', path, key,
cont=lambda: meta[path] != data,
get_headers=lambda: data + (
('x-amz-copy-source', f'/{bucket}/{key}'),
('x-amz-metadata-directive', 'REPLACE'),
('x-amz-copy-source-if-match', etags[path]),
),
on_done=set_meta,
)
async def upload_directory(logger, path):
logger.info('Uploading directory %s', path)
mtime = str(os.stat(path).st_mtime)
if not os.path.isdir(path):
raise FileContentChanged(path)
await locked_request(
logger, b'PUT', path, dir_key_for_path(path),
get_headers=lambda: (
('content-length', '0'),
('x-amz-meta-mtime', mtime),
),
on_done=set_etag,
)
async def delete(logger, path, content_version_current, content_version_original):
logger.info('Deleting %s', path)
# We may have recently had an download from S3, so we don't carry on
# with the DELETE (if we did, we would then delete the local file
# on the next download)
try:
await flush_events(logger, path)
except FileNotFoundError:
# The local folder in which the file was may have been deleted,
# but we still want to carry on with the remote delete
pass
if content_version_current != content_version_original:
raise FileContentChanged(path)
await locked_request(logger, b'DELETE', path, file_key_for_path(path))
async def delete_directory(logger, path):
logger.info('Deleting directory %s', path)
if os.path.isdir(path):
raise FileContentChanged(path)
await locked_request(logger, b'DELETE', path, dir_key_for_path(path))
def file_key_for_path(path):
return prefix + str(path.relative_to(directory))
def dir_key_for_path(path):
return prefix + str(path.relative_to(directory)) + '/'
async def locked_request(logger, method, path, key, cont=lambda: True,
get_headers=lambda: (),
content=empty_async_iterator(),
on_done=lambda path, headers: None):
# Keep a reference to the lock to keep it in the WeakValueDictionary
lock = get_lock(path)
async with lock(Mutex):
if not cont():
return
remote_url = bucket_url + key
headers = get_headers()
logger.debug('%s %s %s', method.decode(), remote_url, headers)
response = await client.request(method, remote_url, headers=get_headers(), content=content, auth=auth)
logger.debug('%s %s', response.status_code, response.headers)
if response.status_code not in [200, 204]:
raise Exception(status_code, content)
on_done(path, response.headers)
async def download_manager(logger):
while True:
try:
await list_and_schedule_downloads(logger)
except asyncio.CancelledError:
raise
except Exception:
logger.exception('Failed to list files')
await asyncio.sleep(download_interval)
async def list_and_schedule_downloads(logger):
logger.debug('Listing keys')
path_etags = [
(path, etag) async for path, etag in list_keys_relative_to_prefix(logger)
]
for path, etag in path_etags:
try:
etag_existing = etags[directory / path]
except KeyError:
pass
else:
if etag == etag_existing:
logger.debug('Existing etag matches for: %s', path)
continue
logger.info('Scheduling download: %s', path)
schedule_download(logger, path)
await download_job_queue.join()
full_paths = set(directory / path for path, _ in path_etags)
full_paths_all_parents = set(
parent
for path, _ in path_etags
for parent in (directory / path).parents
)
for root, dirs, files in os.walk(directory, topdown=False):
for file in files:
full_path = PurePosixPath(root) / file
if (
exclude_local.match(str(full_path)) or
full_path in full_paths or
is_pull_blocked(full_path)
):
continue
# Since walking the filesystem can take time we might have a new file that we have
# recently uploaded that was not present when we request the original file list.
path = full_path.relative_to(directory)
response = await client.request(b'HEAD', bucket_url + prefix + str(path), auth=auth)
if response.status_code != 404:
continue
# Check again if we have made modifications since the above request can take time
try:
await flush_events(logger, full_path)
except (FileNotFoundError, OSError):
continue
if is_pull_blocked(full_path):
continue
try:
logger.info('Deleting locally %s', full_path)
os.remove(full_path)
except (FileNotFoundError, OSError):
pass
else:
# The remove will queue a remote DELETE. However, the file already doesn't
# appear to exist in S3, so a) there is no need and b) may actually delete
# data either added by another client, or even from this one in the case
# of an extremely long eventual consistency issue where a PUT of a object
# that did not previously exist is still appearing
ignore_next_delete[full_path] = True
for dir_ in dirs:
full_path = PurePosixPath(root) / dir_
if (
exclude_local.match(str(full_path)) or
full_path in full_paths or
full_path in full_paths_all_parents or
is_dir_pull_blocked(full_path) or
full_path == directory / download_directory
):
continue
path = full_path.relative_to(directory)
response = await client.request(b'HEAD', bucket_url + prefix + str(path) + '/', auth=auth)
if response.status_code != 404:
continue
try:
await flush_events(logger, full_path)
except (FileNotFoundError, OSError):
continue
if is_dir_pull_blocked(full_path):
continue
try:
logger.info('Deleting locally %s', full_path)
os.rmdir(full_path)
except (FileNotFoundError, OSError):
pass
else:
ignore_next_delete[full_path] = True
def schedule_download(logger, path):
async def download():
full_path = directory / path
if is_pull_blocked(full_path):
logger.debug('Recently changed locally, not changing: %s', full_path)
return
logger.info('Downloading: %s', full_path)
async with client.stream(b'GET', bucket_url + prefix + path, auth=auth) as response:
if response.status_code != 200:
await buffered(response.aiter_bytes()) # Fetch all bytes and return to pool
raise Exception(response.status_code)
is_directory = path[-1] == '/'
directory_to_ensure_created = \
full_path if is_directory else \
full_path.parent
# Create directories under directory
directory_and_parents = [directory] + list(directory.parents)
directory_to_ensure_created_and_paraents = list(
reversed(directory_to_ensure_created.parents)) + [directory_to_ensure_created]
directories_to_ensure_created_under_directory = [
_dir
for _dir in directory_to_ensure_created_and_paraents
if _dir not in directory_and_parents
]
for _dir in directories_to_ensure_created_under_directory:
# If we don't wait for the containing directory to be watched,
# then we might be incorrectly ignoring
await wait_for_directory_watched(_dir)
try:
os.mkdir(_dir)
except FileExistsError:
logger.debug('Already exists: %s', _dir)
except NotADirectoryError:
logger.debug('Not a directory: %s', _dir)
except Exception:
logger.debug('Unable to create directory: %s', _dir)
else:
ignore_next_directory_upload[_dir] = True
try:
modified = float(response.headers['x-amz-meta-mtime'])
except (KeyError, ValueError):
modified = datetime.datetime.strptime(
response.headers['last-modified'],
'%a, %d %b %Y %H:%M:%S %Z').timestamp()
try:
mode = int(response.headers['x-amz-meta-mode'])
except (KeyError, ValueError):
mode = None
is_symlink = mode and stat.S_ISLNK(mode)
if is_directory:
await buffered(response.aiter_bytes())
if is_dir_pull_blocked(full_path):
logger.debug('Recently changed locally, not changing: %s', full_path)
return
os.utime(full_path, (modified, modified))
etags[full_path] = response.headers['etag']
# Ensure that subsequent renames will attempt to move the directory
ensure_dir_in_tree_cache(full_path)
return
temporary_path = directory / download_directory / uuid.uuid4().hex
try:
if not is_symlink:
with open(temporary_path, 'wb') as file:
async for chunk in response.aiter_bytes():
file.write(chunk)
else:
symlink_points_to = await buffered(response.aiter_bytes())
os.symlink(symlink_points_to, temporary_path)
# May raise a FileNotFoundError if the directory no longer
# exists, but handled at higher level
os.utime(temporary_path, (modified, modified), follow_symlinks=False)
# This is skipped when the file being downloaded is a symlink because it
# will fail if the symlink is processed before the file it points to exists.
# Permissions on symlinks in Linux are always 777 anyway as it defers to
# the permissions of the underlying file
if mode is not None and not is_symlink:
os.chmod(temporary_path, mode)
# If we don't wait for the directory watched, then if
# - a directory has just been created above
# - a download that doesn't yield (enough) for the create
# directory eveny to have been processed
# once the IN_CREATE event for the directory is processed it
# would discover the file and re-upload
await wait_for_directory_watched(full_path)
try:
await flush_events(logger, full_path)
except FileNotFoundError:
# The folder doesn't exist, so moving into place will fail
return
if is_pull_blocked(full_path):
logger.debug('Recently changed locally, not changing: %s', full_path)
return
os.replace(temporary_path, full_path)
meta[full_path] = (
(b'x-amz-meta-mtime', modified),
(b'x-amz-meta-mode', mode),
)
# Ensure that once we move the file into place, subsequent
# renames will attempt to move the file
ensure_file_in_tree_cache(full_path)
finally:
try:
os.remove(temporary_path)
except FileNotFoundError:
pass
etags[full_path] = response.headers['etag']
download_job_queue.put_nowait((logger, download))
async def list_keys_relative_to_prefix(logger):
async def _list(extra_query_items=()):
query = (
('max-keys', '1000'),
('list-type', '2'),
('prefix', prefix),
) + extra_query_items
response = await client.request(b'GET', bucket_url, params=query, auth=auth)
if response.status_code != 200:
raise Exception(response.status_code, response.content)
namespace = '{http://s3.amazonaws.com/doc/2006-03-01/}'
root = ET.fromstring(response.content)
next_token = ''
keys_relative = []
for element in root:
if element.tag == f'{namespace}Contents':
key = first_child_text(element, f'{namespace}Key')
key_relative = key[len(prefix):]
if key_relative == '':
# Don't include the prefix itself, if it exists
continue
if exclude_remote.match(key_relative):
logger.info('Excluding: %s', key_relative)
continue
etag = first_child_text(element, f'{namespace}ETag')
keys_relative.append((key_relative, etag))
if element.tag == f'{namespace}NextContinuationToken':
next_token = element.text
return (next_token, keys_relative)
async def list_first_page():
return await _list()
async def list_later_page(token):
return await _list((('continuation-token', token),))
def first_child_text(element, tag):
for child in element:
if child.tag == tag:
return child.text
return None
token, keys_page = await list_first_page()
for key in keys_page:
yield key
while token:
token, keys_page = await list_later_page(token)
for key in keys_page:
yield key
parent_locals = locals()
return start, stop
class ExpiringDict:
def __init__(self, loop, seconds):
self._loop = loop
self._seconds = seconds
self._store = {}
def __getitem__(self, key):
return self._store[key][0]
def __setitem__(self, key, value):
def delete():
del self._store[key]
if key in self._store:
self._store[key][1].cancel()
del self._store[key]
delete_handle = self._loop.call_later(self._seconds, delete)
self._store[key] = (value, delete_handle)
def __contains__(self, key):
return key in self._store
class ExpiringSet:
def __init__(self, loop, seconds):
self._loop = loop
self._store = ExpiringDict(loop, seconds)
def add(self, item):
self._store[item] = True
def __contains__(self, item):
return item in self._store
async def async_main(syncer_args):
start, stop = Syncer(**syncer_args)
await start()
return stop
def main():
parser = argparse.ArgumentParser(prog='mobius3', formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument(
'directory',
metavar='directory',
help='Path of the directory to sync, without a trailing slash\ne.g. /path/to/dir')
parser.add_argument(
'bucket',
metavar='bucket',
help='URL to the remote bucket, with a trailing slash\n'
'e.g. https://s3-eu-west-2.amazonaws.com/my-bucket-name/')
parser.add_argument(
'endpoint',
metavar='endpoint',
help='Pattern which is filled with the bucket name\n'
'e.g. https://s3-eu-west-2.amazonaws.com/{}/')
parser.add_argument(
'region',
metavar='region',
help='The region of the bucket\ne.g. eu-west-2')
parser.add_argument(
'--credentials-source',
metavar='credentials-source',
default='envrionment-variables',
nargs='?',
choices=['environment-variables', 'ecs-container-endpoint'],
help='Where to pickup AWS credentials',
)
parser.add_argument(
'--prefix',
metavar='prefix',
default='',
nargs='?',
help='Prefix of keys in the bucket, often with a trailing slash\n'
'e.g. my-folder/')
parser.add_argument(
'--exclude-remote',
metavar='exclude-remote',
default='^$',
nargs='?',
help='Regex of keys to not be downloaded')
parser.add_argument(
'--exclude-local',
metavar='exclude-local',
default='^$',
nargs='?',
help='Regex of paths to not be uploaded')
parser.add_argument(
'--upload-on-create',
metavar='upload-on-create',
default='^$',
nargs='?',
help='Regex of paths to upload as soon as they have been created')
parser.add_argument(
'--cloudwatch-monitoring-endpoint',
metavar='cloudwatch-monitoring-endpoint',
default='',
nargs='?',
help='The endpoint of for CloudWatch monitoring for metrics to be posted to')
parser.add_argument(
'--cloudwatch-monitoring-region',
metavar='cloudwatch-monitoring-region',
default='',
nargs='?',
help='The endpoint of for CloudWatch monitoring for metrics to be posted to')
parser.add_argument(
'--cloudwatch-monitoring-namespace',
metavar='cloudwatch-monitoring-namespace',
default='',
nargs='?',
help='The namespace for CloudWatch monitoring metrics')
parser.add_argument(
'--disable-ssl-verification',
metavar='',
nargs='?', const=True, default=False)
parser.add_argument(
'--log-level',
metavar='',
nargs='?', const=True, default='WARNING')
parser.add_argument(
'--force-ipv4',
metavar='',
nargs='?', const=False, default=False)
parsed_args = parser.parse_args()
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(parsed_args.log_level)
logger = logging.getLogger('mobius3')
logger.setLevel(parsed_args.log_level)
logger.addHandler(stdout_handler)
def get_ssl_context_without_verifcation():
ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
return ssl_context
pool_args = {
'force_ipv4': parsed_args.force_ipv4,
**({
'get_ssl_context': get_ssl_context_without_verifcation,
} if parsed_args.disable_ssl_verification else {}),
}
creds_source = parsed_args.credentials_source
syncer_args = {
'directory': parsed_args.directory,
'bucket': parsed_args.bucket,
'endpoint': parsed_args.endpoint,
'prefix': parsed_args.prefix,
'region': parsed_args.region,
'exclude_remote': parsed_args.exclude_remote,
'exclude_local': parsed_args.exclude_local,
'upload_on_create': parsed_args.upload_on_create,
'cloudwatch_monitoring_endpoint': parsed_args.cloudwatch_monitoring_endpoint,
'cloudwatch_monitoring_region': parsed_args.cloudwatch_monitoring_region,
'cloudwatch_monitoring_namespace': parsed_args.cloudwatch_monitoring_namespace,
'get_pool': lambda: Pool(**pool_args),
'get_credentials':
get_credentials_from_environment if creds_source == 'envrionment-variables' else
get_credentials_from_ecs_endpoint()
}
if os.environ.get("SENTRY_DSN") is not None:
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
integrations=[HttpxIntegration()],
environment=os.environ.get("SENTRY_ENVIRONMENT"),
)
loop = asyncio.new_event_loop()
cleanup = loop.run_until_complete(async_main(syncer_args))
async def cleanup_then_stop():
await cleanup()
loop.stop()
def run_cleanup_then_stop():
loop.create_task(cleanup_then_stop())
loop.add_signal_handler(signal.SIGINT, run_cleanup_then_stop)
loop.add_signal_handler(signal.SIGTERM, run_cleanup_then_stop)
loop.run_forever()
if __name__ == '__main__':
main()