axe312ger/sftp-cache

View on GitHub
src/sftp.js

Summary

Maintainability
A
50 mins
Test Coverage
const { relative, join } = require('path')

async function sftpMkdirP({ sftp, dir }) {
  const dirs = dir.split('/')
  for (let depth = 0; depth <= dirs.length; depth++) {
    try {
      await new Promise((resolve, reject) => {
        const dirToCreate = join(dirs.slice(0, depth).join('/'))
        sftp.mkdir(dirToCreate, function (err) {
          if (err) {
            reject(err)
          }
          resolve()
        })
      })
    } catch (err) {
      if (err.code !== 4) {
        throw err
      }
    }
  }
}

async function sftpReadDir({ sftp, dir }) {
  try {
    const files = await new Promise((resolve, reject) => {
      sftp.readdir(dir, function (err, list) {
        if (err) {
          reject(err)
        }
        resolve(list)
      })
    })
    return files
  } catch (err) {
    if (err.code === 2) {
      console.log(`Directory ${dir} does not exist. Creating it.`)

      await sftpMkdirP({ sftp, dir })
      return []
    }

    throw err
  }
}

// Recursively walk folder on remote and gather files with stats
async function getRemoteFiles({
  // ssh,
  sftp,
  dir,
  remoteCacheDir
}) {
  console.log(`Looking for remote files at ${dir}...`)

  const files = await sftpReadDir({ sftp, dir })

  let list = []

  for (let file of files) {
    const {
      filename,
      longname,
      attrs: { size, mtime }
    } = file
    const path = join(dir, filename)
    if (longname[0] !== 'd') {
      list.push({
        name: filename,
        path: relative(remoteCacheDir, join(dir, filename)),
        stats: {
          size,
          mtime
        }
      })
      continue
    }

    const subDirList = await getRemoteFiles({
      sftp,
      dir: path,
      remoteCacheDir
    })

    list = [...list, ...subDirList]
  }

  return list
}

async function getRemoteMd5({ ssh, path }) {
  const md5sum = await ssh.exec('md5sum', [path])

  const md5 = md5sum.split(' ')[0].trim()

  return md5
}

class TimeoutError extends Error {
  constructor(timeout, path, ...params) {
    const seconds = timeout / 1000

    super(
      `Network issue: Aborting upload of ${path} after ${seconds.toFixed(
        2
      )} seconds`,
      ...params
    )

    if (Error.captureStackTrace) {
      Error.captureStackTrace(this, TimeoutError)
    }

    this.name = 'TimeoutError'
    this.path = path
  }
}

async function cacheFile({
  ssh,
  sftp,
  path,
  stats,
  localCacheDir,
  remoteCacheDir
}) {
  const { mtime, size } = stats

  const localPath = join(localCacheDir, path)
  const remotePath = join(remoteCacheDir, path)

  // 10 seconds plus theoretical upload time with a 5mbit connection
  const timeout = Math.ceil(
    10 * 1000 + ((size * 8) / (5 * Math.pow(10, 6))) * 1000
  )
  let uploaded = false

  const cacheFile = async (localPath, remotePath) => {
    await ssh.putFile(localPath, remotePath, null, {
      // @todo add code that aborts stalled uploads, should add extra performance on laggy connections
      // step: (total_transferred, chunk, total) => {
      //     console.log(path, 'uploaded', total_transferred, 'of', total);
      // }
    })
    uploaded = true
  }

  await Promise.race([
    cacheFile(localPath, remotePath),
    new Promise((resolve, reject) =>
      setTimeout(
        () => !uploaded && reject(new TimeoutError(timeout, path)),
        timeout
      )
    )
  ])

  await new Promise((resolve, reject) => {
    sftp.setstat(
      remotePath,
      {
        mtime,
        atime: mtime
      },
      (err) => {
        if (err) {
          return reject(err)
        }
        resolve()
      }
    )
  })
  console.log(`Cached ${path}`)
}

module.exports = {
  sftpMkdirP,
  sftpReadDir,
  getRemoteFiles,
  getRemoteMd5,
  cacheFile,
  TimeoutError
}