CycloneTechnology/ChaMP

View on GitHub
champ-wsman/src/it/scala/com/cyclone/wsman/impl/subscription/push/WSManSubscriptionPushTest.scala

Summary

Maintainability
A
0 mins
Test Coverage
package com.cyclone.wsman.impl.subscription.push

import akka.stream.KillSwitches
import akka.stream.scaladsl.{Keep, Sink}
import com.cyclone.util.kerberos.TestKerberosDeployment
import com.cyclone.util.net.HttpUrl
import com.cyclone.wsman.RequiresRealWsman
import com.cyclone.wsman.impl.subscription.{WSManSubscriptionExpiryException, WSManSubscriptionTest}
import com.cyclone.wsman.subscription.DeliveryExpiryParams

import scala.concurrent.duration._
import scala.language.postfixOps

class WSManSubscriptionPushTest
    extends WSManSubscriptionTest
    with TestKerberosDeployment
    with AkkaHttpTestWebServerComponent
    with EventService
    with DefaultPushDeliveryResourceComponent
    with GuavaKerberosTokenCacheComponent
    with DefaultPushEventXmlParserComponent {

  def route = eventServiceRoute

  lazy val deliveryHandler: PushDeliveryHandler = {
    testWebServer.start()

    PushDeliveryHandler(
      HttpUrl.fromParts(hostAndPort = testWebServer.hostAndPort, resource = "/wsman/event_receiver/receive"),
      DeliveryExpiryParams.Expiring(10.seconds, 2)
    )
  }

  "push subscription handling" must {
    "error if no heartbeat within expiry" taggedAs RequiresRealWsman in {
      lazy val expiringHandler =
        deliveryHandler.copy(
          expiryParams = new DeliveryExpiryParams {
            val heartbeat = Some(20 seconds)
            val expiry = Some(1 seconds)
          }
        )

      val (subscribed, source) = doSubscribe(fileCreationSubsDefn, expiringHandler)
      val (kill, completion) = source.viaMat(KillSwitches.single)(Keep.right).toMat(Sink.ignore)(Keep.both).run

      subscribed.futureValue

      try completion.failed.futureValue shouldBe WSManSubscriptionExpiryException
      finally kill.shutdown()
    }

    "not expire subscription when no events" taggedAs RequiresRealWsman in {
      lazy val expiringHandler =
        deliveryHandler.copy(
          expiryParams = new DeliveryExpiryParams {
            val heartbeat = Some(1 seconds)
            val expiry = Some(2 seconds)
          }
        )

      val (subscribed, source) = doSubscribe(fileCreationSubsDefn, expiringHandler)
      val (kill, completion) = source.viaMat(KillSwitches.single)(Keep.right).toMat(Sink.ignore)(Keep.both).run

      subscribed.futureValue

      try {
        Thread.sleep(5000)

        completion.isCompleted shouldBe false
      } finally kill.shutdown()
    }

    // Test for NP-2377
    "remove token from cache on unsubscribe" taggedAs RequiresRealWsman in {
      val (subscribed, source) = doSubscribe(fileCreationSubsDefn)
      val ((localSubscriptionId, kill), completion) =
        source.viaMat(KillSwitches.single)(Keep.both).toMat(Sink.ignore)(Keep.both).run

      subscribed.futureValue

      kerberosTokenCache.getTokenFor(localSubscriptionId) shouldBe a[Some[_]]

      kill.shutdown()
      Thread.sleep(200)
      kerberosTokenCache.getTokenFor(localSubscriptionId) shouldBe None
    }
  }
}