CycloneTechnology/ChaMP

View on GitHub
champ-wsman/src/main/scala/com/cyclone/wsman/WSMan.scala

Summary

Maintainability
A
0 mins
Test Coverage
package com.cyclone.wsman

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import com.cyclone.akka.{ActorSystemComponent, MaterializerComponent}
import com.cyclone.command.{OperationDeadline, TimeoutContext}
import com.cyclone.util.concurrent.Futures
import com.cyclone.util.net._
import com.cyclone.wsman.WSManError.WSManErrorOr
import com.cyclone.wsman.command.WSManCommands.CommandExecutor
import com.cyclone.wsman.command._
import com.cyclone.wsman.impl._
import com.cyclone.wsman.impl.http.settings.ConfigHttpSettingsComponent
import com.cyclone.wsman.impl.http.{DefaultAsyncHttpClientComponent, DefaultWSManConnectionFactoryComponent}
import com.cyclone.wsman.impl.model.OperationsReferenceResolverComponent
import com.cyclone.wsman.impl.subscription._
import com.cyclone.wsman.impl.subscription.pull.PullDeliveryHandler
import com.cyclone.wsman.impl.subscription.push._
import com.cyclone.wsman.subscription.{PushDeliveryConfig, SubscriptionExecutor, SubscriptionId, WSManSubscriptionDefn}
import scalaz.EitherT._
import scalaz.Scalaz._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}

object WSMan {

  // Depending on the uri, sometimes need to append the the CIM namespace before the
  // class name and sometimes need to use a __cimselector selector element. Leave the
  // value of the uri to the user (as per openwsman wsman client) but specify namespace for CIM operations:
  val defaultCimNamespace: Option[String] = None

  private def port(ssl: Boolean): Int =
    if (ssl) WSManTarget.defaultSslPort else WSManTarget.defaultNonSslPort

  def httpUrlFor(hostAndPort: HostAndPort, ssl: Boolean): HttpUrl =
    HttpUrl.fromParts(hostAndPort.withDefaultPort(port(ssl)), "wsman", ssl)

  def httpUrlFor(host: String, ssl: Boolean): HttpUrl =
    HttpUrl.fromParts(HostAndPort.fromParts(host, port(ssl)), "wsman", ssl)

  /**
    * Creates a [[WSMan]].
    *
    * The resulting object can be used for multiple command executions etc.
    *
    * @param system an actor system
    */
  def create(implicit system: ActorSystem): WSMan =
    doCreate(PushDeliveryConfig.Dummy)

  /**
    * Creates a [[WSMan]] with the capability to perform push delivery of subscription events.
    *
    * The resulting object can be used for multiple command executions etc.
    *
    * @param pushDeliveryConfig links the web service endpoint at the
    *                           push delivery address to the subscriptions
    * @param system an actor system
    */
  def create(pushDeliveryConfig: PushDeliveryConfig)(implicit system: ActorSystem): WSMan =
    doCreate(pushDeliveryConfig)

  private def doCreate(pushDeliveryConfig: PushDeliveryConfig)(
    implicit system: ActorSystem
  ): WSMan = {
    val component: WSManComponent = new DefaultWSManComponent with DefaultWSManContextFactoryComponent
    with PushDeliveryRouterComponent with OperationsReferenceResolverComponent
    with DefaultWSManConnectionFactoryComponent with DefaultAsyncHttpClientComponent with ConfigHttpSettingsComponent
    with KerberosTokenCacheComponent with ActorSystemComponent with MaterializerComponent with Dns4sDnsLookupComponent
    with DnsConfigSourceComponent with ConfigDnsConfigSourceComponent {
      lazy val actorSystem: ActorSystem = system
      lazy val materializer: Materializer = ActorMaterializer()
      lazy val pushDeliveryRouter: PushDeliveryRouter = pushDeliveryConfig.pushDeliveryRouter
      lazy val kerberosTokenCache: KerberosTokenCache = pushDeliveryConfig.kerberosTokenCache
    }

    component.wsman
  }
}

/**
  * WS-Management API.
  *
  * @author Jeremy.Stone
  */
trait WSMan {

  /**
    * Executes a [[WSManCommand]]
    *
    * Errors are converted to failed futures.
    */
  def executeCommand[Command <: WSManCommand, Result <: WSManCommandResult](
    target: WSManTarget,
    command: Command
  )(
    implicit timeoutContext: TimeoutContext,
    executor: CommandExecutor[Command, Result]
  ): Future[Result] = {
    val raw = executeCommandOrError(target, command)

    Futures.disjunctionToFailedFuture(raw)(WSManError.toThrowable)
  }

  /**
    * Executes a [[WSManCommand]] in a context.
    *
    * Errors are converted to failed futures.
    */
  def executeCommand[Command <: WSManCommand, Result <: WSManCommandResult](command: Command)(
    implicit executor: CommandExecutor[Command, Result],
    ctx: WSManOperationContext
  ): Future[Result] = {
    val raw = executeCommandOrError(command)

    Futures.disjunctionToFailedFuture(raw)(WSManError.toThrowable)
  }

  /**
    * Executes a [[WSManCommand]] returning the result as a disjunction
    */
  def executeCommandOrError[Command <: WSManCommand, Result <: WSManCommandResult](
    target: WSManTarget,
    command: Command
  )(
    implicit timeoutContext: TimeoutContext,
    executor: CommandExecutor[Command, Result]
  ): Future[WSManErrorOr[Result]]

  /**
    * Executes a [[WSManCommand]] in a context returning the result as a disjunction
    */
  def executeCommandOrError[Command <: WSManCommand, Result <: WSManCommandResult](
    command: Command
  )(
    implicit executor: CommandExecutor[Command, Result],
    ctx: WSManOperationContext
  ): Future[WSManErrorOr[Result]] = {
    executor.execute(command)
  }

  /**
    * Utility to perform some operation (e.q. sequence of commands) with the same context.
    */
  def withContextOrError[T](target: WSManTarget)(
    operation: WSManOperationContext => Future[WSManErrorOr[T]]
  )(implicit timeoutContext: TimeoutContext): Future[WSManErrorOr[T]]

  /**
    * Utility to perform some operation (e.q. sequence of commands) with the same context.
    */
  def withContext[T](target: WSManTarget)(operation: WSManOperationContext => Future[T])(
    implicit timeoutContext: TimeoutContext
  ): Future[T]

  /**
    * Subscribes to events
    */
  def subscribe[S <: WSManSubscriptionDefn](
    target: WSManTarget,
    subscriptionDefn: S,
    deliveryHandler: DeliveryHandler = PullDeliveryHandler()
  )(implicit executor: SubscriptionExecutor[S]): Source[SubscriptionItem, SubscriptionId]

  def testConnection(
    target: WSManTarget,
    timeout: FiniteDuration = 1.minute
  ): Future[WSManErrorOr[Unit]]
}

trait WSManComponent {
  def wsman: WSMan
}

trait DefaultWSManComponent extends WSManComponent {
  self: WSManOperationContextFactoryComponent =>

  lazy val wsman: WSMan = new WSMan {

    def testConnection(target: WSManTarget, timeout: FiniteDuration): Future[WSManErrorOr[Unit]] = {
      implicit val context: WSManOperationContext =
        wsmanOperationContextFactory.wsmanContextFor(
          target,
          OperationDeadline.reusableTimeout(timeout)
        )

      def testPossibleAvailability: Future[WSManErrorOr[Unit]] =
        WSManOperations
          .determineAvailability(context.operationDeadline)
          .map { avail =>
            if (avail.possibilyAvailable) ().right
            else WSManAvailabilityTestError(avail).left
          }

      val result = for {
        _ <- eitherT(testPossibleAvailability)
        _ <- eitherT(WSManOperations.identify(context.operationDeadline))
      } yield ()

      result.run
    }

    def executeCommandOrError[Command <: WSManCommand, Result <: WSManCommandResult](
      target: WSManTarget,
      command: Command
    )(
      implicit timeoutContext: TimeoutContext,
      executor: CommandExecutor[Command, Result]
    ): Future[WSManErrorOr[Result]] = {
      implicit val context: WSManOperationContext =
        wsmanOperationContextFactory.wsmanContextFor(target, timeoutContext.deadline)

      executor.execute(command)
    }

    private def contextForSubscription(target: WSManTarget) =
      // TODO pass in a timeout for the actual subscription/unsubscription??
      // Use reasonable value for now...
      wsmanOperationContextFactory.wsmanContextFor(target, OperationDeadline.reusableTimeout(5.seconds))

    def subscribe[S <: WSManSubscriptionDefn](
      target: WSManTarget,
      subscriptionDefn: S,
      deliveryHandler: DeliveryHandler
    )(implicit executor: SubscriptionExecutor[S]): Source[SubscriptionItem, SubscriptionId] = {
      implicit val context: WSManOperationContext = contextForSubscription(target)

      executor.source(subscriptionDefn, deliveryHandler)
    }

    def withContextOrError[T](target: WSManTarget)(
      operation: WSManOperationContext => Future[WSManErrorOr[T]]
    )(implicit timeoutContext: TimeoutContext): Future[WSManErrorOr[T]] = {

      withContext(target)(operation)
    }

    def withContext[T](target: WSManTarget)(
      operation: WSManOperationContext => Future[T]
    )(implicit timeoutContext: TimeoutContext): Future[T] = {
      val context: WSManOperationContext =
        wsmanOperationContextFactory.wsmanContextFor(target, timeoutContext.deadline)

      operation(context)
    }

  }
}