Chocobozzz/PeerTube

View on GitHub
server/core/lib/views/shared/video-viewer-stats.ts

Summary

Maintainability
B
4 hrs
Test Coverage
import { VideoViewEvent } from '@peertube/peertube-models'
import { isTestOrDevInstance } from '@peertube/peertube-node-utils'
import { GeoIP } from '@server/helpers/geo-ip.js'
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEWER_SYNC_REDIS, VIEW_LIFETIME } from '@server/initializers/constants.js'
import { sequelizeTypescript } from '@server/initializers/database.js'
import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js'
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js'
import { Redis } from '@server/lib/redis.js'
import { VideoModel } from '@server/models/video/video.js'
import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section.js'
import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer.js'
import { MVideo, MVideoImmutable } from '@server/types/models/index.js'
import { Transaction } from 'sequelize'

const lTags = loggerTagsFactory('views')

type LocalViewerStats = {
  firstUpdated: number // Date.getTime()
  lastUpdated: number // Date.getTime()

  watchSections: {
    start: number
    end: number
  }[]

  watchTime: number

  country: string
  subdivisionName: string

  videoId: number
}

export class VideoViewerStats {
  private processingViewersStats = false
  private processingRedisWrites = false

  private readonly viewerCache = new Map<string, LocalViewerStats>()
  private readonly redisPendingWrites = new Map<string, { sessionId: string, videoId: number, stats: LocalViewerStats }>()

  constructor () {
    setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
    setInterval(() => this.syncRedisWrites(), VIEWER_SYNC_REDIS)
  }

  // ---------------------------------------------------------------------------

  async addLocalViewer (options: {
    video: MVideoImmutable
    currentTime: number
    ip: string
    sessionId: string
    viewEvent?: VideoViewEvent
  }) {
    const { video, ip, viewEvent, currentTime, sessionId } = options

    logger.debug(
      'Adding local viewer to video stats %s.', video.uuid,
      { currentTime, viewEvent, sessionId, ...lTags(video.uuid) }
    )

    const nowMs = new Date().getTime()

    let stats: LocalViewerStats = await this.getLocalVideoViewer({ sessionId, videoId: video.id })

    if (stats && stats.watchSections.length >= MAX_LOCAL_VIEWER_WATCH_SECTIONS) {
      logger.warn(
        'Too much watch section to store for a viewer, skipping this one',
        { sessionId, currentTime, viewEvent, ...lTags(video.uuid) }
      )
      return
    }

    if (!stats) {
      const { country, subdivisionName } = await GeoIP.Instance.safeIPISOLookup(ip)

      stats = {
        firstUpdated: nowMs,
        lastUpdated: nowMs,

        watchSections: [],

        watchTime: 0,

        country,
        subdivisionName,

        videoId: video.id
      }
    }

    stats.lastUpdated = nowMs

    if (viewEvent === 'seek' || stats.watchSections.length === 0) {
      stats.watchSections.push({
        start: currentTime,
        end: currentTime
      })
    } else {
      const lastSection = stats.watchSections[stats.watchSections.length - 1]

      if (lastSection.start > currentTime) {
        logger.debug('Invalid end watch section %d. Last start record was at %d. Starting a new section.', currentTime, lastSection.start)

        stats.watchSections.push({
          start: currentTime,
          end: currentTime
        })
      } else {
        lastSection.end = currentTime
      }
    }

    stats.watchTime = this.buildWatchTimeFromSections(stats.watchSections)

    logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })

    this.setLocalVideoViewer(sessionId, video.id, stats)
  }

  // ---------------------------------------------------------------------------

  async getWatchTime (videoId: number, sessionId: string) {
    const stats: LocalViewerStats = await this.getLocalVideoViewer({ sessionId, videoId })

    return stats?.watchTime || 0
  }

  // ---------------------------------------------------------------------------

  async processViewerStats () {
    if (this.processingViewersStats) return
    this.processingViewersStats = true

    if (!isTestOrDevInstance()) logger.info('Processing viewer statistics.', lTags())

    const now = new Date().getTime()

    try {
      await this.syncRedisWrites()

      const allKeys = await Redis.Instance.listLocalVideoViewerKeys()

      for (const key of allKeys) {
        const stats: LocalViewerStats = await this.getLocalVideoViewerByKey(key)

        // Process expired stats
        if (stats.lastUpdated > now - VIEW_LIFETIME.VIEWER_STATS) {
          continue
        }

        try {
          await sequelizeTypescript.transaction(async t => {
            const video = await VideoModel.load(stats.videoId, t)
            if (!video) return

            const statsModel = await this.saveViewerStats(video, stats, t)

            if (statsModel && video.remote) {
              await sendCreateWatchAction(statsModel, t)
            }
          })

          await this.deleteLocalVideoViewersKeys(key)
        } catch (err) {
          logger.error('Cannot process viewer stats for Redis key %s.', key, { err, stats, ...lTags() })
        }
      }
    } catch (err) {
      logger.error('Error in video save viewers stats scheduler.', { err, ...lTags() })
    }

    this.processingViewersStats = false
  }

  private async saveViewerStats (video: MVideo, stats: LocalViewerStats, transaction: Transaction) {
    if (stats.watchTime === 0) return

    const statsModel = new LocalVideoViewerModel({
      startDate: new Date(stats.firstUpdated),
      endDate: new Date(stats.lastUpdated),
      watchTime: stats.watchTime,
      country: stats.country,
      subdivisionName: stats.subdivisionName,
      videoId: video.id
    })

    statsModel.url = getLocalVideoViewerActivityPubUrl(statsModel)
    statsModel.Video = video as VideoModel

    await statsModel.save({ transaction })

    statsModel.WatchSections = await LocalVideoViewerWatchSectionModel.bulkCreateSections({
      localVideoViewerId: statsModel.id,
      watchSections: stats.watchSections,
      transaction
    })

    return statsModel
  }

  private buildWatchTimeFromSections (sections: { start: number, end: number }[]) {
    return sections.reduce((p, current) => p + (current.end - current.start), 0)
  }

  /**
   *
   *  Redis calls can be expensive so try to cache things in front of it
   *
   */

  private getLocalVideoViewer (options: {
    sessionId: string
    videoId: number
  }): Promise<LocalViewerStats> {
    const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(options.sessionId, options.videoId)

    return this.getLocalVideoViewerByKey(viewerKey)
  }

  private getLocalVideoViewerByKey (key: string): Promise<LocalViewerStats> {
    const viewer = this.viewerCache.get(key)
    if (viewer) return Promise.resolve(viewer)

    return Redis.Instance.getLocalVideoViewer({ key })
  }

  private setLocalVideoViewer (sessionId: string, videoId: number, stats: LocalViewerStats) {
    const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(sessionId, videoId)
    this.viewerCache.set(viewerKey, stats)

    this.redisPendingWrites.set(viewerKey, { sessionId, videoId, stats })
  }

  private deleteLocalVideoViewersKeys (key: string) {
    this.viewerCache.delete(key)

    return Redis.Instance.deleteLocalVideoViewersKeys(key)
  }

  private async syncRedisWrites () {
    if (this.processingRedisWrites) return

    this.processingRedisWrites = true

    for (const [ key, pendingWrite ] of this.redisPendingWrites) {
      const { sessionId, videoId, stats } = pendingWrite
      this.redisPendingWrites.delete(key)

      try {
        await Redis.Instance.setLocalVideoViewer(sessionId, videoId, stats)
      } catch (err) {
        logger.error('Cannot write viewer into redis', { sessionId, videoId, stats, err })
      }
    }

    this.processingRedisWrites = false
  }
}