bitslip6/bitfire

View on GitHub
firewall/src/db.php

Summary

Maintainability
B
4 hrs
Test Coverage
<?php declare(strict_types=1);
/**
 * functional MySQL database abstraction
 */

namespace ThreadFinDB;

use Attribute;
use Exception;
use mysqli;
use mysqli_result;
use ThreadFin\MaybeA;
use ThreadFin\MaybeStr;

use function ThreadFin\dbg;
use function ThreadFin\do_for_all_key;
use function ThreadFin\func_name;
use function ThreadFin\partial_right as BINDR;
use function ThreadFin\partial as BINDL;
use function ThreadFin\trace;
use function ThreadFin\utc_time;

const DB_FETCH_NUM_ROWS = 2;
const DB_FETCH_INSERT_ID = 4;
const DB_DUPLICATE_IGNORE = 8;
const DB_DUPLICATE_ERROR = 16;
const DB_DUPLICATE_UPDATE = 32;
const DB_FETCH_SUCCESS = 1;


/**
 * The property is a primary key and will not update on duplicate
 */
#[Attribute(Attribute::TARGET_CLASS_CONSTANT|Attribute::TARGET_PROPERTY)]
class NoUpdate { public function __construct() {} }
/** 
 * the attribute will not update on duplicate if the update would null it
 */
#[Attribute(Attribute::TARGET_CLASS_CONSTANT|Attribute::TARGET_PROPERTY)]
class NotNull { public function __construct() {} }
/**
 * The property should only be updated if the value is not null and null in the DB
 */
#[Attribute(Attribute::TARGET_CLASS_CONSTANT|Attribute::TARGET_PROPERTY)]
class IfSet { public function __construct() {} }


// set the error log file if running in cli mode
if (!defined("BitFire\WAF_ROOT")) {
    define("SQL_ERROR_FILE", "/tmp/php_sql_errors.log");
} else {
    define("SQL_ERROR_FILE", false);
}


class Credentials {
    public $username;
    public $password;
    public $prefix;
    public $db_name;
    public $host;

    /**
     * create database credentials
     * @return Credentials
     */
    public function __construct(string $user, string $pass, string $host, string $db_name, string $pre = "") {
        $this->username = $user;
        $this->password = $pass;
        $this->prefix = $pre;
        $this->host = $host;
        $this->db_name = $db_name;
    }
}

/** 
 * used to glue key values pairs together for SQL queries
 * if data key begins with ! then the value is not quoted
 * EG: UPDATE $table set " . glue(" = ", $data, ", ") .  where_clause($where);
 */
function glue(string $join, array $data, string $append_str = "") : string {
    $result = "";
    foreach ($data as $key => $value) {
        if ($result != '') { $result .= $append_str; }
        if ($key[0] === '!') { $key = substr($key, 1); $result .= "`{$key}` $join $value"; }
        else { $result .= "`{$key}`" . $join . quote($value); }
    }
    return $result;   
}

/**
 * add SQL quoting to a string, convert ints and bool to SQL types
 * @param mixed $input 
 * @return string 
 */
function quote($input) : string {
    if (is_null($input)) { return 'null'; }
    if (is_numeric($input)) { return strval($input); }
    if (is_string($input)) { return "'".addslashes($input)."'"; }
    if (is_bool($input)) { return $input ? 1 : 0; }
    return $input;
}

/**
 * create a where clause from an array of key value pairs
 * @param array $data 
 * @return string - the generated SQL where clause
 */
function where_clause(array $data) : string { 
    $result = " WHERE ";
    foreach ($data as $key => $value) {
        if (strlen($result) > 7) { $result .= " AND "; }
        if ($key[0] == '!') {
            $t = substr($key, 1);
            $result .= " `{$t}` = {$value} ";
        } else {
            $result .= " `{$key}` = " . quote($value);
        }
    }
    $x = trim($result, ",");
    return $x;
}


class DB {
    protected $_db;
    public $errors;
    public $logs = [];
    public $host;
    public $user;
    public $database;
    public $prefix;
    protected $_log_enabled = false;
    protected $_simulation = false;
    protected $_replay_enabled = false;
    protected $_err_filter_fn;
    protected $_replay_log = "";
    protected $_replay = [];

    /**
     * @param null|mysqli $db 
     * @return void 
     */
    protected function __construct(?\mysqli $db) {
        $this->_db = $db;
        $this->errors = array();
    }

    public function __destruct() {
        if ($this->_db) { $this->_db->close(); }
        if ($this->_replay_enabled && count($this->_replay) >= 1) { 
            file_put_contents($this->_replay_log, "\n".implode(";\n", $this->logs).";\n", FILE_APPEND);
        }
    }

    /**
     * disallow object POP chaining...
     */
    public function __wakeup() {
        trigger_error("No POP Chaining", E_USER_ERROR);
    }


    /**
     * @param null|mysqli $mysqli 
     * @return DB a new DB object, from existing connection
     */
    public static function from(?\mysqli $mysqli) : DB {
        trace("DB_FROM");
        return new DB($mysqli);
    }

    /**
     * @param bool $enable - enable or disable logging
     * @return DB 
     */
    public function enable_log(bool $enable = true) : DB {
        $this->_log_enabled = $enable;
        return $this;
    }

    /**
     * @param string $replay_file_name - the name of the replay log to record to
     * @return DB 
     */
    public function enable_replay(string $replay_file_name) : DB {
        $this->_replay_log = $replay_file_name;
        $this->_replay_enabled = true;
        return $this;
    }


    /**
     * @param bool $enable - enable or disable simulation (no queries, just the query log)
     * @return DB 
     */
    public function enable_simulation(bool $enable) : DB {
        $this->_simulation = $enable;
        $this->enable_log(true);
        return $this;
    }


    /**
     * @param null|Credentials $cred create a new DB connection
     * @return DB 
     */
    public static function cred_connect(?Credentials $cred) : DB {
        if ($cred == NULL) { return DB::from(NULL); }
        $db = DB::connect($cred->host, $cred->username, $cred->password, $cred->db_name);
        $db->prefix = $cred->prefix;
        return $db;
    }

    /**
     * @return bool - true if the database is connected
     */
    public function is_connected() : bool {
        return $this->_db != NULL;
    }

    /**
     * @todo: add support for retry.  busy network environments can cause connection failures
     * 
     * @param string $host 
     * @param string $user 
     * @param string $passwd 
     * @param string $db_name 
     * @return DB 
     */
    public static function connect(string $host, string $user, string $passwd, string $db_name) : DB {
        trace("DB_CONNECT");
        $db = mysqli_init();
        mysqli_options($db, MYSQLI_OPT_CONNECT_TIMEOUT, 3);
        if(mysqli_real_connect($db, $host, $user, $passwd, $db_name)) {
            $db = DB::from($db);
            $db->host = $host;
            $db->user = $user;
            $db->database = $db_name;
            return $db;
        } else {
            //debug("failed to connect to {$host} as {$user} on {$db_name}");
            debug("failed to connect to %s as %s on %s", $host, $user, $db_name);
        }
        return DB::from(NULL);
    }

    /**
     * @param string $sql raw SQL to run.  be careful!
     * @return bool 
     */
    public function unsafe_raw(string $sql) : bool {
        return (bool)$this->_qb($sql, DB_FETCH_SUCCESS);
    }

    /**
     * run SQL $sql return result as bool. errors stored tail($this->errors)
     */
    protected function _qb(string $sql, int $return_type = DB_FETCH_SUCCESS) : int {
        assert(!empty($this->_db), "database: {$this->database} is not connected [".gettype($this->_db)."]");
        $r = false;
        $errno = 0;
        try {
            if (!$this->_simulation) {
                $r = mysqli_query($this->_db, $sql); 
            }
        }
        // silently swallow exceptions, will catch them in next line
        catch (Exception $e) { $r = false; }
        if ($r === false) {
            $errno = mysqli_errno($this->_db);
            $err = "[$sql] errno($errno) " . mysqli_error($this->_db);
            $this->errors[] = $err;
        }
        $success = (bool)$r;
        if ($success) {
            if ($this->_replay_enabled || $this->_log_enabled) {
                $e = mysqli_affected_rows($this->_db);
                if ($this->_log_enabled) {
                    $msg = "# [$sql] errno($errno) affected rows($e)";
                    $this->logs[] = $msg;
                }
                if ($this->_replay_enabled) {
                    $this->_replay[] = $sql;
                }
            }
            if ($return_type == DB_FETCH_NUM_ROWS) {
                return intval($e);
            } else if ($return_type == DB_FETCH_INSERT_ID) {
                $id = intval(mysqli_insert_id($this->_db));
                return ($id == 0) ? -1 : $id;
            }
        }
        return ($success) ? 1 : 0;
    }

    /**
     * run SQL $sql return result as bool. errors stored tail($this->errors)
     */
    protected function _qr(string $sql, $mode = MYSQLI_ASSOC) : SQL {
        $r = false;
        $errno = 0;
        try {
            if (!$this->_simulation) {
                $r = mysqli_query($this->_db, $sql); 
            }
        }
        // silently swallow exceptions, will catch them in next line
        catch (Exception $e) { $r = false; }
        if ($r == false || !$r instanceof mysqli_result) {
            $errno = mysqli_errno($this->_db);
            $err = "[$sql] errno($errno) " . mysqli_error($this->_db);
            $this->errors[] = $err;
            return SQL::from(NULL, $sql);
        }
        else {
            if ($this->_log_enabled) {
                $e = mysqli_affected_rows($this->_db);
                // $this->logs[] = $sql;
                $msg = "# [$sql] errno($errno) selected rows($e)";
                $this->logs[] = $msg;
            }
        }

       return SQL::from(mysqli_fetch_all($r, $mode), $sql);
    }

    /**
     * build sql replacing {name} with values from $data[name] = value
     * auto quotes values,  use {!name} to not quote
     * @return SQL - SQL result abstraction
     */
    //public function fetch(string $sql, array|object $data = NULL, $mode = MYSQLI_ASSOC) : SQL {
    public function fetch(string $sql, $data = NULL, $mode = MYSQLI_ASSOC) : SQL {
        assert(!empty($this->_db), "database: {$this->database} is not connected [".gettype($this->_db)."]");

        $type = (is_array($data)) ? 'array' : ((is_object($data)) ? 'object' : 'scalar');
        // replace {} with named values from $data, or $this->_x
        $new_sql = preg_replace_callback("/{\w+}/", function ($x) use ($data, $type) {
            // strip { }
            $param = str_replace(array('{', '}'), '', $x[0]);
            // access the value of $param (by object or array)
            $data_param = ($type === 'array') ? $data[$param]??"_NO_KEY_$param" : $data->$param;
            // no quoting
            if ($param[0] === "!") {
                return $data_param;
            } 
            // replace with the value of the param
            return quote($data_param);
        }, $sql);

        return $this->_qr($new_sql, $mode);
    }

    /**
     * delete entries from $table where $data matches
     * @param string $table table name
     * @param array $where key value pairs of column names and values
     * @return bool 
     */
    public function delete(string $table, array $where) : int {
        $sql = "DELETE FROM $table " . where_clause($where);
        return $this->_qb($sql);
    }

    protected function insert_stmt(string $table, array $data, int $on_duplicate = DB_DUPLICATE_IGNORE, ?array $no_update = null, ?array $if_null = null) : string {
        
        $ignore = "";
        // ignore duplicates
        if ($on_duplicate === DB_DUPLICATE_IGNORE) {
            $ignore = "IGNORE";
        }

        $sql = "INSERT $ignore INTO `$table` (`" . join("`,`", array_keys($data)) . 
        "`) VALUES (" . join(",", array_map('\ThreadFinDB\quote', array_values($data))).")";

        // update on duplicate, exclude any PKS
        if ($on_duplicate === DB_DUPLICATE_UPDATE) {
            $update_data = array_diff_key($data, $no_update);
            // UGLY AF
            $suffix = "";
            foreach($update_data as $key => $value) {
                $q_value = quote($value);
                if (isset($if_null[$key])) {
                    $suffix .= "`$key` = IF(`$key` = '' OR `$key` IS NULL, $q_value, `$key`), ";
                } else {
                    $suffix .= "`$key` = $q_value, ";
                }
            }
            if (!empty($suffix)) {
                $sql .= " ON DUPLICATE KEY UPDATE " . substr($suffix, 0, -2);
            }
        }

        return $sql;
    }

    /**
     * insert $data into $table 
     * @param string $table 
     * @param array $kvp 
     * @param int $on_duplicate - DB_DUPLICATE_IGNORE, DB_DUPLICATE_UPDATE. 
     *   IMPORTANT! for update be sure auto incrementing PK is not in $data
     * @return bool 
     */
    public function insert(string $table, array $kvp, int $on_duplicate = DB_DUPLICATE_IGNORE) : int {
        $sql = $this->insert_stmt($table, $kvp, $on_duplicate);

        return $this->_qb($sql);
    }

    /**
     * return a function that will insert key value pairs into $table.  
     * keys are column names, values are data to insert.
     * @param string $table the table name
     * @param ?array $keys list of allowed key names from the passed $data
     * @return callable(array $data) insert $data into $table 
     */
    public function insert_fn(string $table, ?array $keys = null, $ignore_duplicate = true) : callable { 
        $t = $this;
        return function(array $data) use ($table, &$t, $keys, $ignore_duplicate) : int {
            if (!empty($keys)) {
                $data = array_filter($data, BINDR('in_array', $keys), ARRAY_FILTER_USE_KEY);
            }
            $ignore = ($ignore_duplicate) ? "IGNORE" : "";
            $sql = "INSERT $ignore INTO $table (" . join(",", array_keys($data)) . 
                ") VALUES (" . join(",", array_map('\ThreadFinDB\quote', array_values($data))) . ")";

            return $t->_qb($sql);
        };
    }

    /**
     * return a function that will insert key value pairs into $table.
     * does not support {} replacement
     * @param string $table the table name
     * @param array $columns column names in (col1, col2) or (col->data) format
     * @return callable(?array $data) that takes KVP ordered in $columns order. 
     *         pass null as KVP data to run the bulk query
     */
    public function bulk_fn(string $table, array $columns, bool $ignore_duplicate = true) : callable { 
        $t = $this;
        $ignore = ($ignore_duplicate) ? "IGNORE" : "";
        $sql = "INSERT $ignore INTO $table (" . join(",", array_keys($columns)) . ") VALUES ";
        return function(?array $data = null) use (&$sql) : bool {
            if ($data !== null) {
                $sql .= join(",", array_map('\ThreadFinDB\quote', array_values($data))) . ")";
                return false;
            }

            return $this->_qb($sql);
        };
    }



    /**
     * update $table and set $data where $where
     */
    public function update(string $table, array $data, array $where) : int {
        // unset all where keys in data. this makes no sense when where is a PK
        //do_for_all_key($where, function ($x) use (&$data) { unset($data[$x]); });
        do_for_all_key($where, function ($x) use (&$data) { unset($data[$x]); });

        $sql = "UPDATE $table set " . glue(" = ", $data, ", ") .  where_clause($where);
        return $this->_qb($sql);
    }

    /**
     * store object data into table.  data must have public members and have the 
     * same names as the table
     * @return bool true if the SQL write is successful
     */
    public function store(string $table, Object $data, int $on_duplicate = DB_DUPLICATE_IGNORE) : int {
        assert(is_resource($this->_db), "database not connected");

        // TODO: this should be it's own object to array function with tests
        $r = new \ReflectionClass($data);
        $props = $r->getProperties(\ReflectionProperty::IS_PUBLIC);
        $no_updates = [];
        $if_null = [];
        // turn the object into an array, update PKS for update list
        $kvp = array_reduce($props, function($kvp, $item) use ($data, $on_duplicate, &$no_updates, &$if_null) {
            $name = $item->name;
            $attrs = $item->getAttributes();

            foreach ($attrs as $attr) {
                $attribute = $attr->getName();
                switch($attribute) {
                    case "ThreadFinDB\NoUpdate":
                        $no_updates[$name] = true;
                        break;
                    case "ThreadFinDB\NotNull":
                        if (!isset($data->$name) || empty($data->$name)) {
                            return $kvp;
                        }
                        break;
                    case "ThreadFinDB\IfNull":
                        $if_null[$name] = true;
                }
            }

            if (isset($data->$name)) {
                $kvp[$name] = $data->$name;
            }
            return $kvp;
        }, []);

        $sql = $this->insert_stmt($table, $kvp, $on_duplicate, $no_updates, $if_null);
        return $this->_qb($sql, DB_FETCH_INSERT_ID);
    }

    public function close() : void {
        if ($this->_db) { mysqli_close($this->_db); $this->_db = NULL; }
        if (SQL_ERROR_FILE) {
            if (count($this->errors) > 0) {
                $errors = array_filter($this->errors, function($x) { return stripos($x, "Duplicate") != false; });
                file_put_contents(SQL_ERROR_FILE, print_r($errors, true), FILE_APPEND);
            }
        }
    }
}


/**
 * SQL result abstraction
 */
class SQL {
    protected $_x;
    protected $_data = NULL;
    protected $_idx = 0;
    protected $_errors;
    protected $_sql;
    protected $_len;

    /**
     * create a new SQL result abstraction from a SQL associative result
     * @param null|array $x 
     * @param string $sql the sql that generated the result
     * @return SQL 
     */
    public static function from(?array $x, string $in_sql="") : SQL { 
        $sql = new SQL();
        $sql->_x = $x;
        $sql->_len < (is_array($x)) ? count($x) : 0;
        $sql->_sql = $in_sql;
        return $sql; 
    }
    
    /**
     * set internal dataset to value of $name at current row index 
     */
    public function set_col(string $name) : SQL {
        if (isset($this->_x[$this->_idx])) {
            $this->_data = $this->_x[$this->_idx][$name]??NULL;
        } else {
            $this->_data = NULL;
        }

        return $this;
    }

    /**
     * set internal dataset to row  at current row index 
     */
    public function set_row(?int $idx = NULL) : SQL {
        $idx = ($idx !== NULL) ? $idx : $this->_idx;
        if (isset($this->_x[$idx])) {
            $this->_data = $this->_x[$idx];
        }
        return $this;
    }

    /**
     * @return MaybeStr of column $name at current row index
     */
    public function col(string $name) : MaybeSTR {
        if (isset($this->_x[$this->_idx])) {
            return MaybeStr::of($this->_x[$this->_idx][$name]??NULL);
        } 
        return MaybeStr::of(NULL);
    }

    /**
     * @return bool true if column name has a row with at least one value of $value 
     */
    public function in_set(string $name, string $value) : bool {
        return array_reduce($this->_x, function ($carry, $item) use ($name, $value) {
            return $carry || $item[$name] == $value;
        }, false);
    }

    /**
     * @return MaybeA of result row at $idx or current row indx
     */
    public function row(?int $idx = NULL) : MaybeA {
        $idx = ($idx !== NULL) ? $idx : $this->_idx;
        if (isset($this->_x[$idx])) {
            return MaybeA::of($this->_x[$idx]);
        }
        return MaybeA::of(NULL);
    }

    /**
     * increment row index
     */
    public function next() : void {
        $this->_idx++;
    }

    /**
     * return true if data has a row at index $idx
     */
    public function has_row(int $idx = 0) : bool {
        return isset($this->_x[$idx]);
    }

    /**
     * call $fn on current $this->_data (see set_row, set_col)
     * @param bool $spread if true, call $fn(...$this->_data)
     */
    public function ondata(callable $fn, bool $spread = false) : SQL {
        if (!empty($this->_data)) {
            $this->_data = 
                ($spread) ?
                $fn(...$this->_data) :
                $fn($this->_data);
        } else {
            $this->_errors[] = "wont call " . func_name($fn) . " on data : " . var_export($this->_data, true);
        }

        return $this;
    }

    /**
     * map $fn on each row in entire result (works on raw result, no set necessary)
     */
    public function map(callable $fn) : array {
        if (is_array($this->_x) && !empty($this->_x)) {
            return array_map($fn, $this->_x);
        } else {
            $this->_errors[] = "wont call " . func_name($fn) . " on data : " . var_export($this->_data, true);
        }
        return [];
    }

    /**
     * reduce $fn($carry, $item) on each row in entire result (works on raw result, no set necessary)
     * $fn may return any type, but should be a string in 99% cases
     * @return mixed return type of $fn, false if rows (_x) is empty
     */
    public function reduce(callable $fn, $initial = "") {
        if (is_array($this->_x) && !empty($this->_x)) {
            return array_reduce($this->_x, $fn, $initial);
        } else {
            $this->_errors[] = "wont call " . func_name($fn) . " on data : " . var_export($this->_data, true);
        }
        return false;
    }
    // run an a function that has external effect on current data
    public function effect(callable $fn) : SQL { 
        if (!empty($this->_data)) { $fn($this->_data); } return $this;
    }
    // set data to NULL if $fn returns false
    public function if(callable $fn) : SQL {
        if ($fn($this->_data) === false) { $this->_data = NULL; } return $this;
    }
    // set data to NULL if $fn returns true
    public function if_not(callable $fn) : SQL {
        if ($fn($this->_data) !== false) { $this->_data = NULL; } return $this;
    }
    // return true if we have an empty result set
    public function empty() : bool {
        return empty($this->_x);
    } 
    public function count() : int {
        return is_array($this->_x) ? count($this->_x) : 0;
    } 
    // get all errors
    public function errors() : array {
        return $this->_errors;
    }
    // size of result set
    public function size() : int {
        return is_array($this->_x) ? count($this->_x) : ((empty($this->_x)) ? 0 : 1);
    }
    public function data() : ?array {
        return $this->_x;
    }
    public function __toString() : string {
        return (string)$this->_data;
    }
}


/**
 * database backup checkpoint offset
 * @package ThreadFinDB
 */
class Offset {
    public $table;
    public $limit_sz = 0;
    public $offset = 0;
    const TABLE_COMPLETE = -1;

    public function __construct(string $table, int $limit_sz = 300) {
        $this->limit_sz = $limit_sz;
        $this->table = $table;
    }

    /**
     * update a table saved offset
     * @param string $table 
     * @param int $offset 
     * @param int $limit 
     * @return void 
     */
    public function set_check_point(int $offset) {
        $this->offset = $offset;
    }

    /**
     * @param string $table 
     * @return bool true if the table is completely dumped, false if not or incomplete
     */
    public function is_table_complete() : bool {
        return $this->offset == Offset::TABLE_COMPLETE;
    }
}

/**
 * function suitable for database dumping to gz compressed output file
 * this is equal to calling stream_output_fn($data, $stream, "gzwrite")
 * @param string $data 
 * @param mixed $stream 
 * @return int -1 on error, else total byte length written to stream across all writes
 */
function gz_output_fn(?string $data, $stream) : int {
    return stream_output_fn($data, $stream, "gzwrite");
}

/**
 * function suitable for database dumping to gz compressed output file
 * @param string $data 
 * @param mixed $stream 
 * @return int -1 on error, else total byte length written to stream across all writes
 */
function stream_output_fn(?string $data, $stream, $fn = "fwrite") : int {
    assert(is_resource($stream), "stream must be a resource");
    static $total_bytes = 0;

    if ($data && strlen($data) > 0) {
        $bytes = $fn($stream, $data);
        if (!$bytes) {
            return -1;
        }
        $total_bytes += $bytes;
    }
    return $total_bytes;
}


/**
 * dump a single SQL table 100 rows at a time
 * @param DB $db 
 * @param string $db_dump_file 
 * @param mixed $row 
 * @return int number of uncompressed bytes written
 */
function dump_table(DB $db, callable $write_fn, array $row) : ?Offset {
    $idx = 0;
    $limit = 300;
    $db_name = $db->database;
    $table = $row["Tables_in_$db_name"];
    $offset = new Offset($table);

    // find number of rows
    $num_rows = intval($db->fetch("SELECT count(*) as count FROM $table")->col("count")());
    // the create statement
    $create = $db->fetch("SHOW CREATE TABLE $table");
    // table header line
    $write_fn("# Export of $table\n# $num_rows rows in $table\n");
    // drop table if it exists
    $write_fn("DROP TABLE IF EXISTS `$table`;\n");
    // add create statement
    $write_fn($create->col("Create Table")() . ";\n\n");

    // insert $limit rows at a time
    while($idx < $num_rows) {
        $limit = min($limit, $num_rows - $idx);
        $rows = $db->fetch("SELECT * FROM $table LIMIT $limit OFFSET $idx", NULL, MYSQLI_NUM);
        // create the output string
        $result = $rows->reduce(function(string $carry, array $row) {
            return $carry . "(" . implode(",", array_map('\ThreadFinDB\quote', $row)) . "),\n";
        }, "INSERT IGNORE INTO $table VALUES");
        // write to the output stream
        $bytes_written = $write_fn(substr($result, 0, -2) . ";\n\n");
        if ($bytes_written < 0 || $bytes_written > 1048576*20) {
            return $offset;
        }

        // increment the offset by the limit
        $idx += $limit;
        $offset->set_check_point($idx);

        // let the database rest a second
        usleep(100000);
    }
    $offset->set_check_point(Offset::TABLE_COMPLETE);

    return $offset;
}

/**
 * dump all database tables to the function $write_fn
 * @param Credentials $cred Access credentials to the database
 * @param string $db_name the name of the database (eg, wordpress)
 * @param callable $write_fn a function that takes a string and 
 *      writes it to the output stream (fwrite, gzwrite, etc)
 * @return array of Offset objects. one for each table in $db_name
 */
function dump_database(Credentials $cred, callable $write_fn, int $max_bytes = 1024*1024*50) : array {
    $db_name = $cred->db_name;
    $header = "# Database export of ($db_name) began at UTC: " 
            . date(DATE_RFC3339) . "\n# UTC tv: " . utc_time() . "\n\n";
    $init_sql = "SET NAMES 'utf8'\n";

    $db = DB::cred_connect($cred);
    $db->unsafe_raw($init_sql);
    $tables = $db->fetch("SHOW TABLES");
    $write_fn($header . $init_sql);

    $t = BINDL('\ThreadFinDB\dump_table', $db, $write_fn);
    $data = $tables->map($t);
    //$data = $tables->data();
    return (!$data || empty($data) || !is_array($data)) ? [] : $data;
}