src/collectors/systemd-journal.plugin/systemd-journal-watcher.c
// SPDX-License-Identifier: GPL-3.0-or-later
#include "systemd-internals.h"
#include <sys/inotify.h>
#define EVENT_SIZE (sizeof(struct inotify_event))
#define INITIAL_WATCHES 256
#define WATCH_FOR (IN_CREATE | IN_MODIFY | IN_DELETE | IN_DELETE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_UNMOUNT)
typedef struct watch_entry {
int slot;
int wd; // Watch descriptor
char *path; // Dynamically allocated path
struct watch_entry *next; // for the free list
} WatchEntry;
typedef struct {
WatchEntry *watchList;
WatchEntry *freeList;
int watchCount;
int watchListSize;
size_t errors;
DICTIONARY *pending;
} Watcher;
static WatchEntry *get_slot(Watcher *watcher) {
WatchEntry *t;
if (watcher->freeList != NULL) {
t = watcher->freeList;
watcher->freeList = t->next;
t->next = NULL;
return t;
}
if (watcher->watchCount == watcher->watchListSize) {
watcher->watchListSize *= 2;
watcher->watchList = reallocz(watcher->watchList, watcher->watchListSize * sizeof(WatchEntry));
}
watcher->watchList[watcher->watchCount] = (WatchEntry){
.slot = watcher->watchCount,
.wd = -1,
.path = NULL,
.next = NULL,
};
t = &watcher->watchList[watcher->watchCount];
watcher->watchCount++;
return t;
}
static void free_slot(Watcher *watcher, WatchEntry *t) {
t->wd = -1;
freez(t->path);
t->path = NULL;
// link it to the free list
t->next = watcher->freeList;
watcher->freeList = t;
}
static int add_watch(Watcher *watcher, int inotifyFd, const char *path) {
WatchEntry *t = get_slot(watcher);
t->wd = inotify_add_watch(inotifyFd, path, WATCH_FOR);
if (t->wd == -1) {
nd_log(NDLS_COLLECTORS, NDLP_ERR,
"JOURNAL WATCHER: cannot watch directory: '%s'",
path);
free_slot(watcher, t);
struct stat info;
if(stat(path, &info) == 0 && S_ISDIR(info.st_mode)) {
// the directory exists, but we failed to add the watch
// increase errors
watcher->errors++;
}
}
else {
t->path = strdupz(path);
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: watching directory: '%s'",
path);
}
return t->wd;
}
static void remove_watch(Watcher *watcher, int inotifyFd, int wd) {
int i;
for (i = 0; i < watcher->watchCount; ++i) {
if (watcher->watchList[i].wd == wd) {
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: removing watch from directory: '%s'",
watcher->watchList[i].path);
inotify_rm_watch(inotifyFd, watcher->watchList[i].wd);
free_slot(watcher, &watcher->watchList[i]);
return;
}
}
nd_log(NDLS_COLLECTORS, NDLP_WARNING,
"JOURNAL WATCHER: cannot find directory watch %d to remove.",
wd);
}
static void free_watches(Watcher *watcher, int inotifyFd) {
for (int i = 0; i < watcher->watchCount; ++i) {
if (watcher->watchList[i].wd != -1) {
inotify_rm_watch(inotifyFd, watcher->watchList[i].wd);
free_slot(watcher, &watcher->watchList[i]);
}
}
freez(watcher->watchList);
watcher->watchList = NULL;
dictionary_destroy(watcher->pending);
watcher->pending = NULL;
}
static char* get_path_from_wd(Watcher *watcher, int wd) {
for (int i = 0; i < watcher->watchCount; ++i) {
if (watcher->watchList[i].wd == wd)
return watcher->watchList[i].path;
}
return NULL;
}
static bool is_directory_watched(Watcher *watcher, const char *path) {
for (int i = 0; i < watcher->watchCount; ++i) {
if (watcher->watchList[i].wd != -1 && strcmp(watcher->watchList[i].path, path) == 0) {
return true;
}
}
return false;
}
static void watch_directory_and_subdirectories(Watcher *watcher, int inotifyFd, const char *basePath) {
DICTIONARY *dirs = dictionary_create(DICT_OPTION_SINGLE_THREADED | DICT_OPTION_DONT_OVERWRITE_VALUE);
journal_directory_scan_recursively(NULL, dirs, basePath, 0);
void *x;
dfe_start_read(dirs, x) {
const char *dirname = x_dfe.name;
// Check if this directory is already being watched
if (!is_directory_watched(watcher, dirname)) {
add_watch(watcher, inotifyFd, dirname);
}
}
dfe_done(x);
dictionary_destroy(dirs);
}
static bool is_subpath(const char *path, const char *subpath) {
// Use strncmp to compare the paths
if (strncmp(path, subpath, strlen(path)) == 0) {
// Ensure that the next character is a '/' or '\0'
char next_char = subpath[strlen(path)];
return next_char == '/' || next_char == '\0';
}
return false;
}
void remove_directory_watch(Watcher *watcher, int inotifyFd, const char *dirPath) {
for (int i = 0; i < watcher->watchCount; ++i) {
WatchEntry *t = &watcher->watchList[i];
if (t->wd != -1 && is_subpath(t->path, dirPath)) {
inotify_rm_watch(inotifyFd, t->wd);
free_slot(watcher, t);
}
}
struct journal_file *jf;
dfe_start_write(journal_files_registry, jf) {
if(is_subpath(jf->filename, dirPath))
dictionary_del(journal_files_registry, jf->filename);
}
dfe_done(jf);
dictionary_garbage_collect(journal_files_registry);
}
void process_event(Watcher *watcher, int inotifyFd, struct inotify_event *event) {
if(!event->len) {
nd_log(NDLS_COLLECTORS, NDLP_NOTICE
, "JOURNAL WATCHER: received event with mask %u and len %u (this is zero) for path: '%s' - ignoring it."
, event->mask, event->len, event->name);
return;
}
char *dirPath = get_path_from_wd(watcher, event->wd);
if(!dirPath) {
nd_log(NDLS_COLLECTORS, NDLP_NOTICE,
"JOURNAL WATCHER: received event with mask %u and len %u for path: '%s' - "
"but we can't find its watch descriptor - ignoring it."
, event->mask, event->len, event->name);
return;
}
if(event->mask & IN_DELETE_SELF) {
remove_watch(watcher, inotifyFd, event->wd);
return;
}
static __thread char fullPath[PATH_MAX];
snprintfz(fullPath, sizeof(fullPath), "%s/%s", dirPath, event->name);
// fullPath contains the full path to the file
size_t len = strlen(event->name);
if(event->mask & IN_ISDIR) {
if (event->mask & (IN_DELETE | IN_MOVED_FROM)) {
// A directory is deleted or moved out
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: Directory deleted or moved out: '%s'",
fullPath);
// Remove the watch - implement this function based on how you manage your watches
remove_directory_watch(watcher, inotifyFd, fullPath);
}
else if (event->mask & (IN_CREATE | IN_MOVED_TO)) {
// A new directory is created or moved in
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: New directory created or moved in: '%s'",
fullPath);
// Start watching the new directory - recursive watch
watch_directory_and_subdirectories(watcher, inotifyFd, fullPath);
}
else
nd_log(NDLS_COLLECTORS, NDLP_WARNING,
"JOURNAL WATCHER: Received unhandled event with mask %u for directory '%s'",
event->mask, fullPath);
}
else if(len > sizeof(".journal") - 1 && strcmp(&event->name[len - (sizeof(".journal") - 1)], ".journal") == 0) {
// It is a file that ends in .journal
// add it to our pending list
dictionary_set(watcher->pending, fullPath, NULL, 0);
}
else
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: ignoring event with mask %u for file '%s'",
event->mask, fullPath);
}
static void process_pending(Watcher *watcher) {
void *x;
dfe_start_write(watcher->pending, x) {
struct stat info;
const char *fullPath = x_dfe.name;
if(stat(fullPath, &info) != 0) {
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: file '%s' no longer exists, removing it from the registry",
fullPath);
dictionary_del(journal_files_registry, fullPath);
}
else if(S_ISREG(info.st_mode)) {
nd_log(NDLS_COLLECTORS, NDLP_DEBUG,
"JOURNAL WATCHER: file '%s' has been added/updated, updating the registry",
fullPath);
struct journal_file t = {
.file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC +
info.st_mtim.tv_nsec / NSEC_PER_USEC,
.last_scan_monotonic_ut = now_monotonic_usec(),
.size = info.st_size,
.max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
};
struct journal_file *jf = dictionary_set(journal_files_registry, fullPath, &t, sizeof(t));
journal_file_update_header(jf->filename, jf);
}
dictionary_del(watcher->pending, fullPath);
}
dfe_done(x);
dictionary_garbage_collect(watcher->pending);
}
size_t journal_watcher_wanted_session_id = 0;
void journal_watcher_restart(void) {
__atomic_add_fetch(&journal_watcher_wanted_session_id, 1, __ATOMIC_RELAXED);
}
void *journal_watcher_main(void *arg __maybe_unused) {
while(1) {
size_t journal_watcher_session_id = __atomic_load_n(&journal_watcher_wanted_session_id, __ATOMIC_RELAXED);
Watcher watcher = {
.watchList = mallocz(INITIAL_WATCHES * sizeof(WatchEntry)),
.freeList = NULL,
.watchCount = 0,
.watchListSize = INITIAL_WATCHES,
.pending = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE|DICT_OPTION_SINGLE_THREADED),
.errors = 0,
};
int inotifyFd = inotify_init();
if (inotifyFd < 0) {
nd_log(NDLS_COLLECTORS, NDLP_ERR, "inotify_init() failed.");
free_watches(&watcher, inotifyFd);
return NULL;
}
for (unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES; i++) {
if (!journal_directories[i].path) break;
watch_directory_and_subdirectories(&watcher, inotifyFd, string2str(journal_directories[i].path));
}
usec_t last_headers_update_ut = now_monotonic_usec();
struct buffered_reader reader;
while (journal_watcher_session_id == __atomic_load_n(&journal_watcher_wanted_session_id, __ATOMIC_RELAXED)) {
buffered_reader_ret_t rc = buffered_reader_read_timeout(
&reader, inotifyFd, SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS, false);
if (rc != BUFFERED_READER_READ_OK && rc != BUFFERED_READER_READ_POLL_TIMEOUT) {
nd_log(NDLS_COLLECTORS, NDLP_CRIT,
"JOURNAL WATCHER: cannot read inotify events, buffered_reader_read_timeout() returned %d - "
"restarting the watcher.",
rc);
break;
}
if(rc == BUFFERED_READER_READ_OK) {
bool unmount_event = false;
ssize_t i = 0;
while (i < reader.read_len) {
struct inotify_event *event = (struct inotify_event *) &reader.read_buffer[i];
if(event->mask & IN_UNMOUNT) {
unmount_event = true;
break;
}
process_event(&watcher, inotifyFd, event);
i += (ssize_t)EVENT_SIZE + event->len;
}
reader.read_buffer[0] = '\0';
reader.read_len = 0;
reader.pos = 0;
if(unmount_event)
break;
}
usec_t ut = now_monotonic_usec();
if (dictionary_entries(watcher.pending) && (rc == BUFFERED_READER_READ_POLL_TIMEOUT ||
last_headers_update_ut + (SYSTEMD_JOURNAL_EXECUTE_WATCHER_PENDING_EVERY_MS * USEC_PER_MS) <= ut)) {
process_pending(&watcher);
last_headers_update_ut = ut;
}
if(watcher.errors) {
nd_log(NDLS_COLLECTORS, NDLP_NOTICE,
"JOURNAL WATCHER: there were errors in setting up inotify watches - restarting the watcher.");
}
}
close(inotifyFd);
free_watches(&watcher, inotifyFd);
// this will scan the directories and cleanup the registry
journal_files_registry_update();
sleep_usec(2 * USEC_PER_SEC);
}
return NULL;
}