Mordil/RediStack

View on GitHub
Sources/RediStackTestUtils/EmbeddedMockRedisServer.swift

Summary

Maintainability
A
0 mins
Test Coverage
//===----------------------------------------------------------------------===//
//
// This source file is part of the RediStack open source project
//
// Copyright (c) 2020-2022 RediStack project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of RediStack project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import RediStack
import XCTest
import NIOCore
import NIOEmbedded

internal enum MockConnectionPoolError: Error {
    case unexpectedMessage
}

// TODO #64 -- Mock Redis Server

/// This is not really a Redis server: it's just something that lets us stub out the connection management in order to let
/// us test the connection pool.
internal final class EmbeddedMockRedisServer {
    var channels: ArraySlice<EmbeddedChannel> = []
    var loop: EmbeddedEventLoop = EmbeddedEventLoop()

    // Run the fake redis server as long as there is work to do.
    func runWhileActive() throws {
        var anyReads = true

        while anyReads {
            self.loop.run()

            anyReads = false
            for channel in self.channels {
                anyReads = try self.pumpChannel(channel) || anyReads
            }
        }
    }

    func pumpChannel(_ channel: EmbeddedChannel) throws -> Bool {
        var didRead = false

        while let nextRead = try channel.readOutbound(as: RedisCommandHandler.OutboundCommandPayload.self) {
            didRead = true
            try self.processChannelRead(nextRead, channel)
        }

        return didRead
    }

    func processChannelRead(_ data: RedisCommandHandler.OutboundCommandPayload, _ channel: Channel) throws {
        switch data.message {
        case .array([RESPValue(from: "QUIT")]):
            // We always allow this.
            let response = RESPValue.simpleString("OK".byteBuffer)
            data.responsePromise.succeed(response)

        default:
            XCTFail("Unexpected message: \(data.message)")
            data.responsePromise.fail(MockConnectionPoolError.unexpectedMessage)
        }
    }

    func createConnectedChannel() -> Channel {
        let channel = EmbeddedChannel(handler: GracefulShutdownToCloseHandler(), loop: self.loop)
        channel.closeFuture.whenComplete { _ in
            self.channels.removeAll(where: { $0 === channel })
        }

        // Activate it
        channel.connect(to: try! SocketAddress(unixDomainSocketPath: "/foo"), promise: nil)
        self.channels.append(channel)
        return channel
    }

    func shutdown() throws {
        try self.runWhileActive()
        try self.loop.close()
    }
}

/// A `ChannelHandler` that triggers a channel close once `RedisGracefulConnectionCloseEvent` is received
private final class GracefulShutdownToCloseHandler: ChannelHandler, ChannelOutboundHandler {
    typealias OutboundIn = NIOAny

    func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
        switch event {
        case is RedisGracefulConnectionCloseEvent:
            context.close(mode: .all, promise: promise)

        default:
            context.triggerUserOutboundEvent(event, promise: promise)
        }
    }
}