CycloneTechnology/ChaMP

View on GitHub
champ-ipmi/src/main/scala/com/cyclone/ipmi/protocol/sdr/SdrReader.scala

Summary

Maintainability
A
0 mins
Test Coverage
package com.cyclone.ipmi.protocol.sdr

import akka.util.ByteString
import com.cyclone.command.{RequestTimeouts, TimeoutContext}
import com.cyclone.ipmi.IpmiError._
import com.cyclone.ipmi.client.IpmiConnection
import com.cyclone.ipmi.command.GenericStatusCodeErrors
import com.cyclone.ipmi.command.sdrRepository.{GetSDR, ReserveSDRRepository}
import com.cyclone.ipmi.{IpmiError, IpmiOperationContext}
import com.typesafe.scalalogging.LazyLogging
import scalaz.EitherT._
import scalaz.Scalaz._
import scalaz._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._

/**
  * Utility for reading SDRs
  */
trait SdrReader {

  /**
    * Reads an SDR header with a specific record id
    */
  def readSdrHeader(
    recordId: SensorDataRecordId,
    reservationId: SdrReservationId = SdrReservationId.noReservation
  )(implicit ctx: IpmiOperationContext): Future[IpmiErrorOr[SdrHeader]]

  /**
    * Reads all SDR headers
    */
  def readAllSdrHeaders(reservationId: SdrReservationId = SdrReservationId.noReservation)(
    implicit ctx: IpmiOperationContext
  ): Future[IpmiErrorOr[Seq[SdrHeader]]]

  /**
    * Reads an SDR with a specific record id
    */
  def readSdr(recordId: SensorDataRecordId)(
    implicit ctx: IpmiOperationContext
  ): Future[IpmiErrorOr[SdrKeyAndBody]]

  /**
    * Reads all SDRs for a connection
    *
    * @return the SDRs
    */
  def readAllSdrs(implicit ctx: IpmiOperationContext): Future[IpmiErrorOr[Seq[SdrKeyAndBody]]]

}

trait SdrReaderComponent {
  def sdrReader: SdrReader
}

object SdrReaderComponent extends SdrReaderComponent with LazyLogging {

  private val chunkSize = 16

  lazy val sdrReader: SdrReader = new SdrReader {

    def readAllSdrs(implicit ctx: IpmiOperationContext): Future[IpmiError \/ Seq[SdrKeyAndBody]] = {
      val result = for {
        reservation <- eitherT(getReservation)
        records     <- eitherT(readAllSdrBodies(reservation.reservationId))
      } yield records

      result.run
    }

    def readSdrHeader(recordId: SensorDataRecordId, reservationId: SdrReservationId)(
      implicit ctx: IpmiOperationContext
    ): Future[IpmiError \/ SdrHeader] = {

      logger.debug(s"Reading SDR header for $recordId, $reservationId")

      val result = for {
        commandResult <- runGetSdr(
          recordId,
          reservationId,
          bytesToRead = SensorDataRecord.headerLength
        )
        sensorDataRecordHeader <- eitherT(SdrHeader.fromCommandResult(commandResult).point[Future])
      } yield sensorDataRecordHeader

      result.run
    }

    def readAllSdrHeaders(
      reservationId: SdrReservationId
    )(implicit ctx: IpmiOperationContext): Future[IpmiError \/ Seq[SdrHeader]] = {
      def loop(
        acc: Vector[SdrHeader],
        recordId: SensorDataRecordId
      ): FutureIpmiErrorOr[Seq[SdrHeader]] =
        if (recordId == SensorDataRecordId.last)
          rightT(acc.point[Future])
        else
          for {
            header <- eitherT(readSdrHeader(recordId, reservationId))

            _ = logger.debug(s"Read $header; next is ${header.nextRecordId}")
            headers <- loop(acc :+ header, header.nextRecordId)
          } yield headers

      loop(Vector.empty, SensorDataRecordId.first).run
    }

    def readSdr(
      recordId: SensorDataRecordId
    )(implicit ctx: IpmiOperationContext): Future[IpmiError \/ SdrKeyAndBody] = {
      val result = for {
        reservation         <- eitherT(getReservation)
        bodyAndNextRecordId <- eitherT(readSdrBody(recordId, reservation.reservationId))
      } yield bodyAndNextRecordId._1

      result.run
    }

    private def getReservation(implicit ctx: IpmiOperationContext) = {
      implicit val timeoutContext: TimeoutContext = ctx.timeoutContext
      import ctx._

      // Repeated reservation attempts can cause reservation to be cancelled
      // - so just do once with a long timeout...
      connection.executeCommandOrError(ReserveSDRRepository.Command)(
        timeoutContext = timeoutContext.withTimeouts(RequestTimeouts.simple(30.seconds, maxAttempts = 1)),
        codec = ReserveSDRRepository.Command.codec
      )
    }

    private def readAllSdrBodies(
      reservationId: SdrReservationId
    )(implicit ctx: IpmiOperationContext): Future[IpmiErrorOr[Seq[SdrKeyAndBody]]] = {

      def loop(
        acc: Vector[SdrKeyAndBody],
        recordId: SensorDataRecordId
      ): FutureIpmiErrorOr[Seq[SdrKeyAndBody]] =
        if (recordId == SensorDataRecordId.last)
          rightT(acc.point[Future])
        else
          for {
            bodyAndNextRecordId <- eitherT(readSdrBody(recordId, reservationId))
            (body, nextRecordId) = bodyAndNextRecordId

            _ = logger.debug(s"Read $body; next is $nextRecordId")
            bodies <- loop(acc :+ body, nextRecordId)
          } yield bodies

      loop(Vector.empty, SensorDataRecordId.first).run
    }

    private def readSdrBody(recordId: SensorDataRecordId, reservationId: SdrReservationId)(
      implicit ctx: IpmiOperationContext
    ): Future[IpmiErrorOr[(SdrKeyAndBody, SensorDataRecordId)]] = {
      import GenericStatusCodeErrors._

      def doReadSdrBody(
        remainingToChunkSize: Int => Int
      ): Future[IpmiErrorOr[(SdrKeyAndBody, SensorDataRecordId)]] = {

        def loop(
          acc: ByteString,
          totalLength: Int,
          offset: Int,
          bytesToRead: Int
        ): FutureIpmiErrorOr[ByteString] = {
          logger.debug(
            s"Getting SDR chunk from $offset to ${offset + bytesToRead} (of $totalLength)."
          )
          if (bytesToRead <= 0) rightT(acc.point[Future])
          else
            for {
              commandResult <- runGetSdr(recordId, reservationId, offset, bytesToRead)
              data = commandResult.recordData
              newAcc = acc ++ data
              remaining = totalLength - newAcc.length
              result <- loop(
                newAcc,
                totalLength,
                offset = offset + data.length,
                bytesToRead = remaining min chunkSize
              )
            } yield result
        }

        val result = for {
          header <- eitherT(readSdrHeader(recordId, reservationId))

          remaining = header.bodyLength

          _ = logger.debug(s"Got partial SDR. Reading remaining $remaining...")
          sensorDataRecordBytes <- loop(
            header.headerData,
            header.totalLength,
            SensorDataRecord.headerLength,
            remainingToChunkSize(remaining)
          )

          recordBody <- eitherT(decodeBody(sensorDataRecordBytes).point[Future])
        } yield (recordBody, header.nextRecordId)

        result.run
      }

      logger.debug(s"Reading SDR for $recordId, $reservationId")

      val inChunks = { remaining: Int =>
        remaining min chunkSize
      }
      val inOneGo = { remaining: Int =>
        remaining
      }

      // See sec 33.12 - use small chunks if this fails...
      doReadSdrBody(inOneGo).flatMap {
        case \/-(r) => r.right.point[Future]
        case -\/(e)
            if e == TooManyBytesRequested |
            e == UnspecifiedError =>
          doReadSdrBody(inChunks)
        case -\/(e) => e.left.point[Future]
      }
    }

    private def runGetSdr(
      recordId: SensorDataRecordId,
      reservationId: SdrReservationId,
      offset: Int = 0,
      bytesToRead: Int = 0xff
    )(implicit ctx: IpmiOperationContext) = {
      implicit val timeoutContext: TimeoutContext = ctx.timeoutContext
      import ctx._

      for {
        commandResult <- eitherT(
          connection
            .executeCommandOrError(GetSDR.Command(reservationId, recordId, offset, bytesToRead))
        )
        _ = logger.debug(
          s"Read SDR chunk from $offset for $bytesToRead bytes for $recordId: $commandResult"
        )
      } yield commandResult
    }

    private def decodeBody(sensorDataRecordBytes: ByteString): IpmiErrorOr[SdrKeyAndBody] =
      for {
        sensorDataRecord <- SensorDataRecord.decoder.handleExceptions.decode(sensorDataRecordBytes)
        recordBody       <- SdrKeyAndBody.decodeBody(sensorDataRecord)
      } yield recordBody

  }

}