wikimedia/mediawiki-core

View on GitHub
includes/libs/WRStats/WRStatsReader.php

Summary

Maintainability
C
1 day
Test Coverage
<?php

namespace Wikimedia\WRStats;

/**
 * Readers gather a batch of read operations, returning
 * promises. The batch is executed when the first promise is resolved.
 *
 * @since 1.39
 */
class WRStatsReader {
    /** @var StatsStore */
    private $store;
    /** @var array<string,MetricSpec> */
    private $metricSpecs;
    /** @var string[] */
    private $prefixComponents;
    /** @var float|int|null The UNIX timestamp used for the current time */
    private $now;
    /** @var bool[] Storage keys ready to be fetched */
    private $queuedKeys = [];
    /** @var int[] Unscaled integers returned by the store, indexed by key */
    private $cachedValues = [];

    /**
     * @internal Use WRStatsFactory::createReader instead
     * @param StatsStore $store
     * @param array<string,array> $specs
     * @param string|string[] $prefix
     */
    public function __construct( StatsStore $store, $specs, $prefix ) {
        $this->store = $store;
        $this->metricSpecs = [];
        foreach ( $specs as $name => $spec ) {
            $this->metricSpecs[$name] = new MetricSpec( $spec );
        }
        $this->prefixComponents = is_array( $prefix ) ? $prefix : [ $prefix ];
        if ( !count( $this->prefixComponents ) ) {
            throw new WRStatsError( __METHOD__ .
                ': there must be at least one prefix component' );
        }
    }

    /**
     * Get a TimeRange for some period ending at the current time. Note that
     * this will use the same value of the current time for subsequent calls
     * until resetCurrentTime() is called.
     *
     * @param int|float $numSeconds
     * @return TimeRange
     */
    public function latest( $numSeconds ) {
        $now = $this->now();
        return new TimeRange( $now - $numSeconds, $now );
    }

    /**
     * Get a specified time range
     *
     * @param int|float $start The UNIX time of the start of the range
     * @param int|float $end The UNIX time of the end of the range
     * @return TimeRange
     */
    public function timeRange( $start, $end ) {
        return new TimeRange( $start, $end );
    }

    /**
     * Queue a fetch operation.
     *
     * @param string $metricName The metric name, the key into $specs.
     * @param EntityKey|null $entity Additional storage key components
     * @param TimeRange $range The time range to fetch
     * @return RatePromise
     */
    public function getRate( $metricName, ?EntityKey $entity, TimeRange $range ) {
        $metricSpec = $this->metricSpecs[$metricName] ?? null;
        if ( $metricSpec === null ) {
            throw new WRStatsError( "Unrecognised metric \"$metricName\"" );
        }
        $entity ??= new LocalEntityKey;
        $now = $this->now();
        $seqSpec = null;
        foreach ( $metricSpec->sequences as $seqSpec ) {
            $seqStart = $now - $seqSpec->softExpiry;
            if ( $seqStart <= $range->start ) {
                break;
            }
        }
        $timeStep = $seqSpec->timeStep;
        $firstBucket = (int)( $range->start / $timeStep );
        $lastBucket = (int)ceil( $range->end / $timeStep );
        for ( $bucket = $firstBucket; $bucket <= $lastBucket; $bucket++ ) {
            $key = $this->store->makeKey(
                $this->prefixComponents,
                [ $metricName, $seqSpec->name, $bucket ],
                $entity
            );
            if ( !isset( $this->cachedValues[$key] ) ) {
                $this->queuedKeys[$key] = true;
            }
        }
        return new RatePromise( $this, $metricName, $entity, $metricSpec, $seqSpec, $range );
    }

    /**
     * Queue a batch of fetch operations for different metrics with the same
     * time range.
     *
     * @param string[] $metricNames
     * @param EntityKey|null $entity
     * @param TimeRange $range
     * @return RatePromise[]
     */
    public function getRates( $metricNames, ?EntityKey $entity, TimeRange $range ) {
        $rates = [];
        foreach ( $metricNames as $name ) {
            $rates[$name] = $this->getRate( $name, $entity, $range );
        }
        return $rates;
    }

    /**
     * Perform any queued fetch operations.
     */
    public function fetch() {
        if ( !$this->queuedKeys ) {
            return;
        }
        $this->cachedValues += $this->store->query( array_keys( $this->queuedKeys ) );
        $this->queuedKeys = [];
    }

    /**
     * Set the current time to be used in latest() etc.
     *
     * @param int|float $now
     */
    public function setCurrentTime( $now ) {
        $this->now = $now;
    }

    /**
     * Clear the current time so that it will be filled with the real current
     * time on the next call.
     */
    public function resetCurrentTime() {
        $this->now = null;
    }

    /**
     * @return float|int
     */
    private function now() {
        $this->now ??= microtime( true );
        return $this->now;
    }

    /**
     * @internal Utility for resolution in RatePromise
     * @param string $metricName
     * @param EntityKey $entity
     * @param MetricSpec $metricSpec
     * @param SequenceSpec $seqSpec
     * @param TimeRange $range
     * @return float|int
     */
    public function internalGetCount(
        $metricName,
        EntityKey $entity,
        MetricSpec $metricSpec,
        SequenceSpec $seqSpec,
        TimeRange $range
    ) {
        $this->fetch();
        $timeStep = $seqSpec->timeStep;
        $firstBucket = (int)( $range->start / $timeStep );
        $lastBucket = (int)( $range->end / $timeStep );
        $now = $this->now();
        $total = 0;
        for ( $bucket = $firstBucket; $bucket <= $lastBucket; $bucket++ ) {
            $key = $this->store->makeKey(
                $this->prefixComponents,
                [ $metricName, $seqSpec->name, $bucket ],
                $entity
            );
            $value = $this->cachedValues[$key] ?? 0;
            if ( !$value ) {
                continue;
            } elseif ( $bucket === $firstBucket ) {
                if ( $bucket === $lastBucket ) {
                    // It can be assumed that there are zero events in the future
                    $bucketStartTime = $bucket * $timeStep;
                    $rateInterpolationEndTime = min( $bucketStartTime + $timeStep, $now );
                    $interpolationDuration = $rateInterpolationEndTime - $bucketStartTime;
                    if ( $interpolationDuration > 0 ) {
                        $total += $value * $range->getDuration() / $interpolationDuration;
                    }
                } else {
                    $overlapDuration = max( ( $bucket + 1 ) * $timeStep - $range->start, 0 );
                    $total += $value * $overlapDuration / $timeStep;
                }
            } elseif ( $bucket === $lastBucket ) {
                // It can be assumed that there are zero events in the future
                $bucketStartTime = $bucket * $timeStep;
                $rateInterpolationEndTime = min( $bucketStartTime + $timeStep, $now );
                $overlapDuration = max( $range->end - $bucketStartTime, 0 );
                $interpolationDuration = $rateInterpolationEndTime - $bucketStartTime;
                if ( $overlapDuration === $interpolationDuration ) {
                    // Special case for 0/0 -- current time exactly on boundary.
                    $total += $value;
                } elseif ( $interpolationDuration > 0 ) {
                    $total += $value * $overlapDuration / $interpolationDuration;
                }
            } else {
                $total += $value;
            }
        }
        // Round to nearest resolution step for nicer display
        $rounded = round( $total ) * $metricSpec->resolution;
        // Convert to integer if integer is expected
        if ( is_int( $metricSpec->resolution ) ) {
            $rounded = (int)$rounded;
        }
        return $rounded;
    }

    /**
     * Resolve a batch of RatePromise objects, returning their counter totals,
     * indexed as in the input array.
     *
     * @param array<mixed,RatePromise> $rates
     * @return array<mixed,float|int>
     */
    public function total( $rates ) {
        $result = [];
        foreach ( $rates as $key => $rate ) {
            $result[$key] = $rate->total();
        }
        return $result;
    }

    /**
     * Resolve a batch of RatePromise objects, returning their per-second rates.
     *
     * @param array<mixed,RatePromise> $rates
     * @return array<mixed,float>
     */
    public function perSecond( $rates ) {
        $result = [];
        foreach ( $rates as $key => $rate ) {
            $result[$key] = $rate->perSecond();
        }
        return $result;
    }

    /**
     * Resolve a batch of RatePromise objects, returning their per-minute rates.
     *
     * @param array<mixed,RatePromise> $rates
     * @return array<mixed,float>
     */
    public function perMinute( $rates ) {
        $result = [];
        foreach ( $rates as $key => $rate ) {
            $result[$key] = $rate->perMinute();
        }
        return $result;
    }

    /**
     * Resolve a batch of RatePromise objects, returning their per-hour rates.
     *
     * @param array<mixed,RatePromise> $rates
     * @return array<mixed,float>
     */
    public function perHour( $rates ) {
        $result = [];
        foreach ( $rates as $key => $rate ) {
            $result[$key] = $rate->perHour();
        }
        return $result;
    }

    /**
     * Resolve a batch of RatePromise objects, returning their per-day rates.
     *
     * @param array<mixed,RatePromise> $rates
     * @return array<mixed,float>
     */
    public function perDay( $rates ) {
        $result = [];
        foreach ( $rates as $key => $rate ) {
            $result[$key] = $rate->perDay();
        }
        return $result;
    }
}