flaw-oil/Flaw/Oil/ServerRepo.hs

Summary

Maintainability
Test Coverage
{-|
Module: Flaw.Oil.ServerRepo
Description: Oil server repo.
License: MIT
-}

module Flaw.Oil.ServerRepo
  ( ServerRepo()
  , openServerRepo
  , serverRepoMaxRevision
  , syncServerRepo
  ) where

import Control.Exception
import Control.Monad
import qualified Data.ByteString as B
import Data.Int
import qualified Data.Text as T

import Flaw.Book
import Flaw.Data.Sqlite
import Flaw.Exception
import Flaw.Oil.Repo

data ServerRepo = ServerRepo
  { serverRepoDb :: !SqliteDb
  , serverRepoStmtGetMaxRevision  :: !SqliteStmt
  , serverRepoStmtClearLatest     :: !SqliteStmt
  , serverRepoStmtWrite           :: !SqliteStmt
  , serverRepoStmtPull            :: !SqliteStmt
  , serverRepoStmtGetWeakRevision :: !SqliteStmt
  , serverRepoStmtPullTotalSize   :: !SqliteStmt
  }

openServerRepo :: T.Text -> IO (ServerRepo, IO ())
openServerRepo fileName = describeException "failed to open oil server repo" $ withSpecialBook $ \bk -> do
  -- open db
  db <- book bk $ openRepoDb fileName serverRepoVersion

  -- ensure tables and indices exist

  -- revs table
  sqliteExec db $ T.pack
    "CREATE TABLE IF NOT EXISTS revs (\
    \rev INTEGER PRIMARY KEY AUTOINCREMENT, \
    \date INTEGER NOT NULL, \
    \user INTEGER NOT NULL, \
    \latest INTEGER NOT NULL, \
    \key BLOB NOT NULL, \
    \value BLOB NOT NULL)"
  -- revs_rev__latest_1 index
  sqliteExec db $ T.pack
    "CREATE UNIQUE INDEX IF NOT EXISTS revs_rev__latest_1 ON revs (rev) WHERE latest = 1"
  -- revs_key__latest_1 index
  sqliteExec db $ T.pack
    "CREATE UNIQUE INDEX IF NOT EXISTS revs_key__latest_1 ON revs (key) WHERE latest = 1"

  -- create statements
  let
    createStmt str = book bk $ sqliteStmt db $ T.pack str
  stmtGetMaxRevision  <- createStmt "SELECT MAX(rev) FROM revs"
  stmtClearLatest     <- createStmt "UPDATE revs SET latest = 0 WHERE key = ?1 AND latest = 1"
  stmtWrite           <- createStmt "INSERT INTO revs (date, user, latest, key, value) VALUES (strftime('%s','now'), ?1, 1, ?2, ?3)"
  stmtPull            <- createStmt "SELECT rev, key, value FROM revs WHERE rev > ?1 AND rev <= ?2 AND latest = 1 ORDER BY rev LIMIT ?3"
  stmtGetWeakRevision <- createStmt "SELECT rev FROM revs WHERE rev > ?1 AND rev <= ?2 AND latest = 1 ORDER BY rev LIMIT 1"
  stmtPullTotalSize   <- createStmt "SELECT COUNT(rev) FROM revs WHERE rev > ?1 AND latest = 1"

  return ServerRepo
    { serverRepoDb = db
    , serverRepoStmtGetMaxRevision  = stmtGetMaxRevision
    , serverRepoStmtClearLatest     = stmtClearLatest
    , serverRepoStmtWrite           = stmtWrite
    , serverRepoStmtPull            = stmtPull
    , serverRepoStmtGetWeakRevision = stmtGetWeakRevision
    , serverRepoStmtPullTotalSize   = stmtPullTotalSize
    }

serverRepoMaxRevision :: ServerRepo -> IO Int64
serverRepoMaxRevision ServerRepo
  { serverRepoStmtGetMaxRevision = stmtGetMaxRevision
  } =
  sqliteQuery stmtGetMaxRevision $ \query -> do
    r <- sqliteStep query
    unless r $ throwIO $ DescribeFirstException "failed to get server repo max revision"
    sqliteColumn query 0

-- | Sync operation.
-- Push limits are not checked.
syncServerRepo :: ServerRepo -> Manifest -> Push -> UserId -> IO Pull
syncServerRepo repo@ServerRepo
  { serverRepoDb = db
  , serverRepoStmtClearLatest     = stmtClearLatest
  , serverRepoStmtWrite           = stmtWrite
  , serverRepoStmtPull            = stmtPull
  , serverRepoStmtGetWeakRevision = stmtGetWeakRevision
  , serverRepoStmtPullTotalSize   = stmtPullTotalSize
  } Manifest
  { manifestMaxPullItemsCount = maxPullItemsCount
  , manifestMaxPullValuesTotalSize = maxPullValuesTotalSize
  } Push
  { pushClientRevision = clientRevision
  , pushClientUpperRevision = clientUpperRevisionUncorrected
  , pushItems = itemsToPush
  } userId = describeException "failed to sync server repo" $ sqliteTransaction db $ \commit -> do

  -- get pre-push revision
  prePushRevision <- serverRepoMaxRevision repo

  -- get corrected client upper revision
  let
    clientUpperRevision = if clientUpperRevisionUncorrected == 0 || clientUpperRevisionUncorrected > prePushRevision
      then prePushRevision
      else clientUpperRevisionUncorrected

  -- determine pull lag
  lag <- sqliteQuery stmtPullTotalSize $ \query -> do
    sqliteBind query 1 clientRevision
    r <- sqliteStep query
    unless r $ throwIO $ DescribeFirstException "failed to determine total pull size"
    sqliteColumn query 0

  -- loop for push items
  forM_ itemsToPush $ \(key, value) -> do
    -- clear latest flag for that key
    sqliteQuery stmtClearLatest $ \query -> do
      sqliteBind query 1 key
      sqliteFinalStep query
    -- write key-value pair
    sqliteQuery stmtWrite $ \query -> do
      sqliteBind query 1 userId
      sqliteBind query 2 key
      sqliteBind query 3 value
      sqliteFinalStep query

  -- get post-push revision
  postPushRevision <- serverRepoMaxRevision repo

  -- perform pull
  (itemsToPull, lastKnownClientRevision) <- sqliteQuery stmtPull $ \query -> do
    sqliteBind query 1 clientRevision
    sqliteBind query 2 clientUpperRevision
    sqliteBind query 3 (fromIntegral maxPullItemsCount :: Int64)

    let
      step valuesTotalSize lastKnownClientRevision = do
        -- get next row
        r <- sqliteStep query
        -- if there's row
        if r then do
          -- get value
          value <- sqliteColumn query 2
          -- if we breach total values limit by adding this row, stop
          let
            newValuesTotalSize = valuesTotalSize + B.length value
          if newValuesTotalSize > maxPullValuesTotalSize then return ([], lastKnownClientRevision)
          else do
            -- get revision and key
            revision <- sqliteColumn query 0
            key <- sqliteColumn query 1
            -- get rest items and return
            (restItemsToPull, newLastKnownClientRevision) <- step newValuesTotalSize revision
            return ((revision, key, value) : restItemsToPull, newLastKnownClientRevision)
        else return ([], lastKnownClientRevision)
      in step 0 clientRevision

  -- get new client revision
  newClientRevision <- sqliteQuery stmtGetWeakRevision $ \query -> do
    sqliteBind query 1 lastKnownClientRevision
    sqliteBind query 2 clientUpperRevision
    r <- sqliteStep query
    if r then (+ (-1)) <$> sqliteColumn query 0
    else return clientUpperRevision

  -- commit transaction
  commit

  -- return answer
  return Pull
    { pullLag = lag
    , pullPrePushRevision = prePushRevision
    , pullPostPushRevision = postPushRevision
    , pullItems = itemsToPull
    , pullNewClientRevision = newClientRevision
    }

instance Repo ServerRepo where
  repoDb = serverRepoDb