src/borg/cache.py
import configparser
import io
import os
import shutil
import stat
from collections import namedtuple
from datetime import datetime, timezone, timedelta
from time import perf_counter
from .logger import create_logger
logger = create_logger()
files_cache_logger = create_logger("borg.debug.files_cache")
from .constants import CACHE_README, FILES_CACHE_MODE_DISABLED, ROBJ_FILE_STREAM, TIME_DIFFERS2_NS
from .checksums import xxh64
from .hashindex import ChunkIndex, ChunkIndexEntry
from .helpers import Error
from .helpers import get_cache_dir, get_security_dir
from .helpers import hex_to_bin, bin_to_hex, parse_stringified_list
from .helpers import format_file_size, safe_encode
from .helpers import safe_ns
from .helpers import yes
from .helpers import ProgressIndicatorMessage
from .helpers import msgpack
from .helpers.msgpack import int_to_timestamp, timestamp_to_int
from .item import ChunkListEntry
from .crypto.key import PlaintextKey
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
from .manifest import Manifest
from .platform import SaveFile
from .remote import RemoteRepository
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound, repo_lister
# chunks is a list of ChunkListEntry
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
class SecurityManager:
"""
Tracks repositories. Ensures that nothing bad happens (repository swaps,
replay attacks, unknown repositories etc.).
This is complicated by the Cache being initially used for this, while
only some commands actually use the Cache, which meant that other commands
did not perform these checks.
Further complications were created by the Cache being a cache, so it
could be legitimately deleted, which is annoying because Borg didn't
recognize repositories after that.
Therefore a second location, the security database (see get_security_dir),
was introduced which stores this information. However, this means that
the code has to deal with a cache existing but no security DB entry,
or inconsistencies between the security DB and the cache which have to
be reconciled, and also with no cache existing but a security DB entry.
"""
def __init__(self, repository):
self.repository = repository
self.dir = get_security_dir(repository.id_str, legacy=(repository.version == 1))
self.cache_dir = cache_dir(repository)
self.key_type_file = os.path.join(self.dir, "key-type")
self.location_file = os.path.join(self.dir, "location")
self.manifest_ts_file = os.path.join(self.dir, "manifest-timestamp")
@staticmethod
def destroy(repository, path=None):
"""destroy the security dir for ``repository`` or at ``path``"""
path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1))
if os.path.exists(path):
shutil.rmtree(path)
def known(self):
return all(os.path.exists(f) for f in (self.key_type_file, self.location_file, self.manifest_ts_file))
def key_matches(self, key):
if not self.known():
return False
try:
with open(self.key_type_file) as fd:
type = fd.read()
return type == str(key.TYPE)
except OSError as exc:
logger.warning("Could not read/parse key type file: %s", exc)
def save(self, manifest, key):
logger.debug("security: saving state for %s to %s", self.repository.id_str, self.dir)
current_location = self.repository._location.canonical_path()
logger.debug("security: current location %s", current_location)
logger.debug("security: key type %s", str(key.TYPE))
logger.debug("security: manifest timestamp %s", manifest.timestamp)
with SaveFile(self.location_file) as fd:
fd.write(current_location)
with SaveFile(self.key_type_file) as fd:
fd.write(str(key.TYPE))
with SaveFile(self.manifest_ts_file) as fd:
fd.write(manifest.timestamp)
def assert_location_matches(self):
# Warn user before sending data to a relocated repository
try:
with open(self.location_file) as fd:
previous_location = fd.read()
logger.debug("security: read previous location %r", previous_location)
except FileNotFoundError:
logger.debug("security: previous location file %s not found", self.location_file)
previous_location = None
except OSError as exc:
logger.warning("Could not read previous location file: %s", exc)
previous_location = None
repository_location = self.repository._location.canonical_path()
if previous_location and previous_location != repository_location:
msg = (
"Warning: The repository at location {} was previously located at {}\n".format(
repository_location, previous_location
)
+ "Do you want to continue? [yN] "
)
if not yes(
msg,
false_msg="Aborting.",
invalid_msg="Invalid answer, aborting.",
retry=False,
env_var_override="BORG_RELOCATED_REPO_ACCESS_IS_OK",
):
raise Cache.RepositoryAccessAborted()
# adapt on-disk config immediately if the new location was accepted
logger.debug("security: updating location stored in security dir")
with SaveFile(self.location_file) as fd:
fd.write(repository_location)
def assert_no_manifest_replay(self, manifest, key):
try:
with open(self.manifest_ts_file) as fd:
timestamp = fd.read()
logger.debug("security: read manifest timestamp %r", timestamp)
except FileNotFoundError:
logger.debug("security: manifest timestamp file %s not found", self.manifest_ts_file)
timestamp = ""
except OSError as exc:
logger.warning("Could not read previous location file: %s", exc)
timestamp = ""
logger.debug("security: determined newest manifest timestamp as %s", timestamp)
# If repository is older than the cache or security dir something fishy is going on
if timestamp and timestamp > manifest.timestamp:
if isinstance(key, PlaintextKey):
raise Cache.RepositoryIDNotUnique()
else:
raise Cache.RepositoryReplay()
def assert_key_type(self, key):
# Make sure an encrypted repository has not been swapped for an unencrypted repository
if self.known() and not self.key_matches(key):
raise Cache.EncryptionMethodMismatch()
def assert_secure(self, manifest, key, *, warn_if_unencrypted=True):
# warn_if_unencrypted=False is only used for initializing a new repository.
# Thus, avoiding asking about a repository that's currently initializing.
self.assert_access_unknown(warn_if_unencrypted, manifest, key)
self._assert_secure(manifest, key)
logger.debug("security: repository checks ok, allowing access")
def _assert_secure(self, manifest, key):
self.assert_location_matches()
self.assert_key_type(key)
self.assert_no_manifest_replay(manifest, key)
if not self.known():
logger.debug("security: remembering previously unknown repository")
self.save(manifest, key)
def assert_access_unknown(self, warn_if_unencrypted, manifest, key):
# warn_if_unencrypted=False is only used for initializing a new repository.
# Thus, avoiding asking about a repository that's currently initializing.
if not key.logically_encrypted and not self.known():
msg = (
"Warning: Attempting to access a previously unknown unencrypted repository!\n"
+ "Do you want to continue? [yN] "
)
allow_access = not warn_if_unencrypted or yes(
msg,
false_msg="Aborting.",
invalid_msg="Invalid answer, aborting.",
retry=False,
env_var_override="BORG_UNKNOWN_UNENCRYPTED_REPO_ACCESS_IS_OK",
)
if allow_access:
if warn_if_unencrypted:
logger.debug("security: remembering unknown unencrypted repository (explicitly allowed)")
else:
logger.debug("security: initializing unencrypted repository")
self.save(manifest, key)
else:
raise Cache.CacheInitAbortedError()
def assert_secure(repository, manifest):
sm = SecurityManager(repository)
sm.assert_secure(manifest, manifest.key)
def cache_dir(repository, path=None):
return path or os.path.join(get_cache_dir(), repository.id_str)
class CacheConfig:
def __init__(self, repository, path=None):
self.repository = repository
self.path = cache_dir(repository, path)
logger.debug("Using %s as cache", self.path)
self.config_path = os.path.join(self.path, "config")
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def exists(self):
return os.path.exists(self.config_path)
def create(self):
assert not self.exists()
config = configparser.ConfigParser(interpolation=None)
config.add_section("cache")
config.set("cache", "version", "1")
config.set("cache", "repository", self.repository.id_str)
config.set("cache", "manifest", "")
config.add_section("integrity")
config.set("integrity", "manifest", "")
with SaveFile(self.config_path) as fd:
config.write(fd)
def open(self):
self.load()
def load(self):
self._config = configparser.ConfigParser(interpolation=None)
with open(self.config_path) as fd:
self._config.read_file(fd)
self._check_upgrade(self.config_path)
self.id = self._config.get("cache", "repository")
self.manifest_id = hex_to_bin(self._config.get("cache", "manifest"))
self.ignored_features = set(parse_stringified_list(self._config.get("cache", "ignored_features", fallback="")))
self.mandatory_features = set(
parse_stringified_list(self._config.get("cache", "mandatory_features", fallback=""))
)
try:
self.integrity = dict(self._config.items("integrity"))
if self._config.get("cache", "manifest") != self.integrity.pop("manifest"):
# The cache config file is updated (parsed with ConfigParser, the state of the ConfigParser
# is modified and then written out.), not re-created.
# Thus, older versions will leave our [integrity] section alone, making the section's data invalid.
# Therefore, we also add the manifest ID to this section and
# can discern whether an older version interfered by comparing the manifest IDs of this section
# and the main [cache] section.
self.integrity = {}
logger.warning("Cache integrity data not available: old Borg version modified the cache.")
except configparser.NoSectionError:
logger.debug("Cache integrity: No integrity data found (files, chunks). Cache is from old version.")
self.integrity = {}
def save(self, manifest=None):
if manifest:
self._config.set("cache", "manifest", manifest.id_str)
self._config.set("cache", "ignored_features", ",".join(self.ignored_features))
self._config.set("cache", "mandatory_features", ",".join(self.mandatory_features))
if not self._config.has_section("integrity"):
self._config.add_section("integrity")
for file, integrity_data in self.integrity.items():
self._config.set("integrity", file, integrity_data)
self._config.set("integrity", "manifest", manifest.id_str)
with SaveFile(self.config_path) as fd:
self._config.write(fd)
def close(self):
pass
def _check_upgrade(self, config_path):
try:
cache_version = self._config.getint("cache", "version")
wanted_version = 1
if cache_version != wanted_version:
self.close()
raise Exception(
"%s has unexpected cache version %d (wanted: %d)." % (config_path, cache_version, wanted_version)
)
except configparser.NoSectionError:
self.close()
raise Exception("%s does not look like a Borg cache." % config_path) from None
class Cache:
"""Client Side cache"""
class CacheInitAbortedError(Error):
"""Cache initialization aborted"""
exit_mcode = 60
class EncryptionMethodMismatch(Error):
"""Repository encryption method changed since last access, refusing to continue"""
exit_mcode = 61
class RepositoryAccessAborted(Error):
"""Repository access aborted"""
exit_mcode = 62
class RepositoryIDNotUnique(Error):
"""Cache is newer than repository - do you have multiple, independently updated repos with same ID?"""
exit_mcode = 63
class RepositoryReplay(Error):
"""Cache, or information obtained from the security directory is newer than repository - this is either an attack or unsafe (multiple repos with same ID)"""
exit_mcode = 64
@staticmethod
def break_lock(repository, path=None):
pass
@staticmethod
def destroy(repository, path=None):
"""destroy the cache for ``repository`` or at ``path``"""
path = path or os.path.join(get_cache_dir(), repository.id_str)
config = os.path.join(path, "config")
if os.path.exists(config):
os.remove(config) # kill config first
shutil.rmtree(path)
def __new__(
cls,
repository,
manifest,
path=None,
sync=True,
warn_if_unencrypted=True,
progress=False,
cache_mode=FILES_CACHE_MODE_DISABLED,
iec=False,
archive_name=None,
start_backup=None,
):
return AdHocWithFilesCache(
manifest=manifest,
path=path,
warn_if_unencrypted=warn_if_unencrypted,
progress=progress,
iec=iec,
cache_mode=cache_mode,
archive_name=archive_name,
start_backup=start_backup,
)
class FilesCacheMixin:
"""
Massively accelerate processing of unchanged files.
We read the "files cache" (either from cache directory or from previous archive
in repo) that has metadata for all "already stored" files, like size, ctime/mtime,
inode number and chunks id/size list.
When finding a file on disk, we use the metadata to determine if the file is unchanged.
If so, we use the cached chunks list and skip reading/chunking the file contents.
"""
FILES_CACHE_NAME = "files"
def __init__(self, cache_mode, archive_name=None, start_backup=None):
self.archive_name = archive_name # ideally a SERIES name
assert not ("c" in cache_mode and "m" in cache_mode)
assert "d" in cache_mode or "c" in cache_mode or "m" in cache_mode
self.cache_mode = cache_mode
self._files = None
self._newest_cmtime = 0
self._newest_path_hashes = set()
self.start_backup = start_backup
@property
def files(self):
if self._files is None:
self._files = self._read_files_cache() # try loading from cache dir
if self._files is None:
self._files = self._build_files_cache() # try loading from repository
if self._files is None:
self._files = {} # start from scratch
return self._files
def _build_files_cache(self):
"""rebuild the files cache by reading previous archive from repository"""
if "d" in self.cache_mode: # d(isabled)
return
if not self.archive_name:
return
from .archive import Archive
# get the latest archive with the IDENTICAL name, supporting archive series:
archives = self.manifest.archives.list(match=[self.archive_name], sort_by=["ts"], last=1)
if not archives:
# nothing found
return
prev_archive = archives[0]
files = {}
logger.debug(
f"Building files cache from {prev_archive.name} {prev_archive.ts} {bin_to_hex(prev_archive.id)} ..."
)
files_cache_logger.debug("FILES-CACHE-BUILD: starting...")
archive = Archive(self.manifest, prev_archive.id)
for item in archive.iter_items(preload=False):
# only put regular files' infos into the files cache:
if stat.S_ISREG(item.mode):
path_hash = self.key.id_hash(safe_encode(item.path))
# keep track of the key(s) for the most recent timestamp(s):
ctime_ns = item.ctime
if ctime_ns > self._newest_cmtime:
self._newest_cmtime = ctime_ns
self._newest_path_hashes = {path_hash}
elif ctime_ns == self._newest_cmtime:
self._newest_path_hashes.add(path_hash)
mtime_ns = item.mtime
if mtime_ns > self._newest_cmtime:
self._newest_cmtime = mtime_ns
self._newest_path_hashes = {path_hash}
elif mtime_ns == self._newest_cmtime:
self._newest_path_hashes.add(path_hash)
# add the file to the in-memory files cache
entry = FileCacheEntry(
age=0,
inode=item.get("inode", 0),
size=item.size,
ctime=int_to_timestamp(ctime_ns),
mtime=int_to_timestamp(mtime_ns),
chunks=item.chunks,
)
files[path_hash] = msgpack.packb(entry) # takes about 240 Bytes per file
# deal with special snapshot / timestamp granularity case, see FAQ:
for path_hash in self._newest_path_hashes:
del files[path_hash]
files_cache_logger.debug("FILES-CACHE-BUILD: finished, %d entries loaded.", len(files))
return files
def files_cache_name(self):
suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
# when using archive series, we automatically make up a separate cache file per series.
# when not, the user may manually do that by using the env var.
if not suffix:
# avoid issues with too complex or long archive_name by hashing it:
suffix = bin_to_hex(xxh64(self.archive_name.encode()))
return self.FILES_CACHE_NAME + "." + suffix
def discover_files_cache_names(self, path):
return [fn for fn in os.listdir(path) if fn.startswith(self.FILES_CACHE_NAME + ".")]
def _read_files_cache(self):
"""read files cache from cache directory"""
if "d" in self.cache_mode: # d(isabled)
return
files = {}
logger.debug("Reading files cache ...")
files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
msg = None
try:
with IntegrityCheckedFile(
path=os.path.join(self.path, self.files_cache_name()),
write=False,
integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
) as fd:
u = msgpack.Unpacker(use_list=True)
while True:
data = fd.read(64 * 1024)
if not data:
break
u.feed(data)
try:
for path_hash, item in u:
entry = FileCacheEntry(*item)
# in the end, this takes about 240 Bytes per file
files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
except OSError as exc:
msg = "The files cache can't be read. [%s]" % str(exc)
except FileIntegrityError as fie:
msg = "The files cache is corrupted. [%s]" % str(fie)
if msg is not None:
logger.debug(msg)
files = None
files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(files or {}))
return files
def _write_files_cache(self, files):
"""write files cache to cache directory"""
max_time_ns = 2**63 - 1 # nanoseconds, good until y2262
# _self._newest_cmtime might be None if it was never set because no files were modified/added.
newest_cmtime = self._newest_cmtime if self._newest_cmtime is not None else max_time_ns
start_backup_time = self.start_backup - TIME_DIFFERS2_NS if self.start_backup is not None else max_time_ns
# we don't want to persist files cache entries of potentially problematic files:
discard_after = min(newest_cmtime, start_backup_time)
ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 2))
files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
# TODO: use something like SaveFile here, but that didn't work due to SyncFile missing .seek().
with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
entries = 0
age_discarded = 0
race_discarded = 0
for path_hash, item in files.items():
entry = FileCacheEntry(*msgpack.unpackb(item))
if entry.age == 0: # current entries
if max(timestamp_to_int(entry.ctime), timestamp_to_int(entry.mtime)) < discard_after:
# Only keep files seen in this backup that old enough not to suffer race conditions relating
# to filesystem snapshots and ctime/mtime granularity or being modified while we read them.
keep = True
else:
keep = False
race_discarded += 1
else: # old entries
if entry.age < ttl:
# Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
keep = True
else:
keep = False
age_discarded += 1
if keep:
msgpack.pack((path_hash, entry), fd)
entries += 1
files_cache_logger.debug(f"FILES-CACHE-KILL: removed {age_discarded} entries with age >= TTL [{ttl}]")
t_str = datetime.fromtimestamp(discard_after / 1e9, timezone.utc).isoformat()
files_cache_logger.debug(f"FILES-CACHE-KILL: removed {race_discarded} entries with ctime/mtime >= {t_str}")
files_cache_logger.debug(f"FILES-CACHE-SAVE: finished, {entries} remaining entries saved.")
return fd.integrity_data
def file_known_and_unchanged(self, hashed_path, path_hash, st):
"""
Check if we know the file that has this path_hash (know == it is in our files cache) and
whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
:param hashed_path: the file's path as we gave it to hash(hashed_path)
:param path_hash: hash(hashed_path), to save some memory in the files cache
:param st: the file's stat() result
:return: known, chunks (known is True if we have infos about this file in the cache,
chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
"""
if not stat.S_ISREG(st.st_mode):
return False, None
cache_mode = self.cache_mode
if "d" in cache_mode: # d(isabled)
files_cache_logger.debug("UNKNOWN: files cache disabled")
return False, None
# note: r(echunk) does not need the files cache in this method, but the files cache will
# be updated and saved to disk to memorize the files. To preserve previous generations in
# the cache, this means that it also needs to get loaded from disk first.
if "r" in cache_mode: # r(echunk)
files_cache_logger.debug("UNKNOWN: rechunking enforced")
return False, None
entry = self.files.get(path_hash)
if not entry:
files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
return False, None
# we know the file!
entry = FileCacheEntry(*msgpack.unpackb(entry))
if "s" in cache_mode and entry.size != st.st_size:
files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
return True, None
if "i" in cache_mode and entry.inode != st.st_ino:
files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
return True, None
ctime = int_to_timestamp(safe_ns(st.st_ctime_ns))
if "c" in cache_mode and entry.ctime != ctime:
files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
return True, None
mtime = int_to_timestamp(safe_ns(st.st_mtime_ns))
if "m" in cache_mode and entry.mtime != mtime:
files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
return True, None
# V = any of the inode number, mtime, ctime values.
# we ignored V in the comparison above or it is still the same value.
# if it is still the same, replacing it in the tuple doesn't change it.
# if we ignored it, a reason for doing that is that files were moved/copied to
# a new disk / new fs (so a one-time change of V is expected) and we wanted
# to avoid everything getting chunked again. to be able to re-enable the
# V comparison in a future backup run (and avoid chunking everything again at
# that time), we need to update V in the cache with what we see in the filesystem.
self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, ctime=ctime, mtime=mtime, age=0))
chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks] # convert to list of namedtuple
return True, chunks
def memorize_file(self, hashed_path, path_hash, st, chunks):
if not stat.S_ISREG(st.st_mode):
return
# note: r(echunk) modes will update the files cache, d(isabled) mode won't
if "d" in self.cache_mode:
files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
return
ctime_ns = safe_ns(st.st_ctime_ns)
mtime_ns = safe_ns(st.st_mtime_ns)
entry = FileCacheEntry(
age=0,
inode=st.st_ino,
size=st.st_size,
ctime=int_to_timestamp(ctime_ns),
mtime=int_to_timestamp(mtime_ns),
chunks=chunks,
)
self.files[path_hash] = msgpack.packb(entry)
self._newest_cmtime = max(self._newest_cmtime or 0, ctime_ns)
self._newest_cmtime = max(self._newest_cmtime or 0, mtime_ns)
files_cache_logger.debug(
"FILES-CACHE-UPDATE: put %r <- %r", entry._replace(chunks="[%d entries]" % len(entry.chunks)), hashed_path
)
def load_chunks_hash(repository) -> bytes:
try:
hash = repository.store_load("cache/chunks_hash")
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
hash = b""
logger.debug("cache/chunks_hash missing!")
return hash
def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False):
cached_hash = load_chunks_hash(repository)
if compact:
# if we don't need the in-memory chunks index anymore:
chunks.compact() # vacuum the hash table
with io.BytesIO() as f:
chunks.write(f)
data = f.getvalue()
if clear:
# if we don't need the in-memory chunks index anymore:
chunks.clear() # free memory, immediately
new_hash = xxh64(data)
if force_write or new_hash != cached_hash:
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
# when a client is loading the chunks index from a cache, it has to compare its xxh64
# hash against cache/chunks_hash in the repository. if it is the same, the cache
# is valid. If it is different, the cache is either corrupted or out of date and
# has to be discarded.
# when some functionality is DELETING chunks from the repository, it has to either update
# both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
# so that all clients will discard any client-local chunks index caches.
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
repository.store_store("cache/chunks", data)
repository.store_store("cache/chunks_hash", new_hash)
return new_hash
def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
chunks = None
# first, try to load a pre-computed and centrally cached chunks index:
if not disable_caches:
wanted_hash = load_chunks_hash(repository)
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
try:
chunks_data = repository.store_load("cache/chunks")
except (Repository.ObjectNotFound, StoreObjectNotFound):
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
logger.debug("cache/chunks not found in the repository.")
else:
if xxh64(chunks_data) == wanted_hash:
logger.debug("cache/chunks is valid.")
with io.BytesIO(chunks_data) as f:
chunks = ChunkIndex.read(f)
return chunks
else:
logger.debug("cache/chunks is invalid.")
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
logger.debug("querying the chunk IDs list from the repo...")
chunks = ChunkIndex()
t0 = perf_counter()
num_chunks = 0
# The repo says it has these chunks, so we assume they are referenced chunks.
# We do not care for refcounting anymore, so we just set refcount = MAX_VALUE.
# We do not know the plaintext size (!= stored_size), thus we set size = 0.
init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
for id, stored_size in repo_lister(repository, limit=LIST_SCAN_LIMIT):
num_chunks += 1
chunks[id] = init_entry
# Cache does not contain the manifest.
if not isinstance(repository, (Repository, RemoteRepository)):
del chunks[Manifest.MANIFEST_ID]
duration = perf_counter() - t0 or 0.001
# Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
# Protocol overhead is neglected in this calculation.
speed = format_file_size(num_chunks * 34 / duration)
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s, ~{speed}/s")
if cache_immediately:
# immediately update cache/chunks, so we only rarely have to do it the slow way:
write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
return chunks
class ChunksMixin:
"""
Chunks index related code for misc. Cache implementations.
"""
def __init__(self):
self._chunks = None
self.last_refresh_dt = datetime.now(timezone.utc)
self.refresh_td = timedelta(seconds=60)
@property
def chunks(self):
if self._chunks is None:
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
return self._chunks
def seen_chunk(self, id, size=None):
entry = self.chunks.get(id, ChunkIndexEntry(0, None))
if entry.refcount and size is not None:
assert isinstance(entry.size, int)
if not entry.size:
# AdHocWithFilesCache:
# Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
self.chunks[id] = entry._replace(size=size)
return entry.refcount != 0
def reuse_chunk(self, id, size, stats):
assert isinstance(size, int) and size > 0
stats.update(size, False)
return ChunkListEntry(id, size)
def add_chunk(
self,
id,
meta,
data,
*,
stats,
wait=True,
compress=True,
size=None,
ctype=None,
clevel=None,
ro_type=ROBJ_FILE_STREAM,
):
assert ro_type is not None
if size is None:
if compress:
size = len(data) # data is still uncompressed
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
now = datetime.now(timezone.utc)
exists = self.seen_chunk(id, size)
if exists:
# if borg create is processing lots of unchanged files (no content and not metadata changes),
# there could be a long time without any repository operations and the repo lock would get stale.
self.refresh_lock(now)
return self.reuse_chunk(id, size, stats)
cdata = self.repo_objs.format(
id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
)
self.repository.put(id, cdata, wait=wait)
self.last_refresh_dt = now # .put also refreshed the lock
self.chunks.add(id, ChunkIndex.MAX_VALUE, size)
stats.update(size, not exists)
return ChunkListEntry(id, size)
def _write_chunks_cache(self, chunks):
# this is called from .close, so we can clear/compact here:
write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
self._chunks = None # nothing there (cleared!)
def refresh_lock(self, now):
if now > self.last_refresh_dt + self.refresh_td:
# the repository lock needs to get refreshed regularly, or it will be killed as stale.
# refreshing the lock is not part of the repository API, so we do it indirectly via repository.info.
self.repository.info()
self.last_refresh_dt = now
class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
"""
An ad-hoc chunks and files cache.
Chunks: it does not maintain accurate reference count.
Chunks that were not added during the current lifetime won't have correct size set (0 bytes)
and will have an infinite reference count (MAX_VALUE).
Files: if a previous_archive_id is given, ad-hoc build a in-memory files cache from that archive.
"""
def __init__(
self,
manifest,
path=None,
warn_if_unencrypted=True,
progress=False,
cache_mode=FILES_CACHE_MODE_DISABLED,
iec=False,
archive_name=None,
start_backup=None,
):
"""
:param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
:param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
"""
FilesCacheMixin.__init__(self, cache_mode, archive_name, start_backup)
ChunksMixin.__init__(self)
assert isinstance(manifest, Manifest)
self.manifest = manifest
self.repository = manifest.repository
self.key = manifest.key
self.repo_objs = manifest.repo_objs
self.progress = progress
self.path = cache_dir(self.repository, path)
self.security_manager = SecurityManager(self.repository)
self.cache_config = CacheConfig(self.repository, self.path)
# Warn user before sending data to a never seen before unencrypted repository
if not os.path.exists(self.path):
self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
self.create()
self.open()
try:
self.security_manager.assert_secure(manifest, self.key)
if not self.check_cache_compatibility():
self.wipe_cache()
self.update_compatibility()
except: # noqa
self.close()
raise
def __enter__(self):
self._chunks = None
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
self._chunks = None
def create(self):
"""Create a new empty cache at `self.path`"""
os.makedirs(self.path)
with open(os.path.join(self.path, "README"), "w") as fd:
fd.write(CACHE_README)
self.cache_config.create()
def open(self):
if not os.path.isdir(self.path):
raise Exception("%s Does not look like a Borg cache" % self.path)
self.cache_config.open()
self.cache_config.load()
def close(self):
self.security_manager.save(self.manifest, self.key)
pi = ProgressIndicatorMessage(msgid="cache.close")
if self._files is not None:
pi.output("Saving files cache")
integrity_data = self._write_files_cache(self._files)
self.cache_config.integrity[self.files_cache_name()] = integrity_data
if self._chunks is not None:
pi.output("Saving chunks cache")
self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism
pi.output("Saving cache config")
self.cache_config.save(self.manifest)
self.cache_config.close()
pi.finish()
self.cache_config = None
def check_cache_compatibility(self):
my_features = Manifest.SUPPORTED_REPO_FEATURES
if self.cache_config.ignored_features & my_features:
# The cache might not contain references of chunks that need a feature that is mandatory for some operation
# and which this version supports. To avoid corruption while executing that operation force rebuild.
return False
if not self.cache_config.mandatory_features <= my_features:
# The cache was build with consideration to at least one feature that this version does not understand.
# This client might misinterpret the cache. Thus force a rebuild.
return False
return True
def wipe_cache(self):
logger.warning("Discarding incompatible cache and forcing a cache rebuild")
self._chunks = ChunkIndex()
self.cache_config.manifest_id = ""
self.cache_config._config.set("cache", "manifest", "")
self.cache_config.ignored_features = set()
self.cache_config.mandatory_features = set()
def update_compatibility(self):
operation_to_features_map = self.manifest.get_all_mandatory_features()
my_features = Manifest.SUPPORTED_REPO_FEATURES
repo_features = set()
for operation, features in operation_to_features_map.items():
repo_features.update(features)
self.cache_config.ignored_features.update(repo_features - my_features)
self.cache_config.mandatory_features.update(repo_features & my_features)