
View on GitHub


Test Coverage
 * Copyright 2018 Vizor Games LLC
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the
 * License for the specific language governing permissions and limitations
 * under the License.
#include "RpcClient.h"

#include "InfraworldRuntime.h"
#include "RpcClientWorker.h"
#include "GrpcUriValidator.h"

#include "Containers/Ticker.h"
#include "Misc/CoreDelegates.h"

#include "Misc/DefaultValueHelper.h"
#include "HAL/RunnableThread.h"
#include "Kismet/KismetStringLibrary.h"

// ============ RpcClient implementation ===========

bool URpcClient::Init(const FString& URI, UChannelCredentials* ChannelCredentials)
    if (bCanSendRequests)
        UE_LOG(LogInfraworldRuntime, Error, TEXT("You're trying to initialize an RPC Client more than once"));
        return true;
    FString ErrorMessage;
    if (!FGrpcUriValidator::Validate(URI, ErrorMessage))
        UE_LOG(LogInfraworldRuntime, Error, TEXT("%s Unable to validate URI: %s"), *(GetClass()->GetName()), *ErrorMessage);

    // Do it if and only if the thread is not yet created.
    if (Thread == nullptr)
        UE_LOG(LogInfraworldRuntime, Log, TEXT("RpcClient at [%p], Thread == nullptr, initializing"), this);

        // Launch 'chaining' hierarchical init, which will init a superclass (a concrete implementation).

        UE_LOG(LogInfraworldRuntime, Log, TEXT("RpcClient at [%p], finished HierarchicalInit"), this);

        // Retrieve and set an Error Message Queue
        if (InnerWorker)
            UE_LOG(LogInfraworldRuntime, Log, TEXT("RpcClient at [%p], InnerWorker = %p"), this, InnerWorker.Get());
            InnerWorker->URI = URI;
            InnerWorker->ChannelCredentials = ChannelCredentials;

            InnerWorker->ErrorMessageQueue = &ErrorMessageQueue;

            const FString ThreadName(FString::Printf(TEXT("RPC Client Thread %s %d"), *(GetClass()->GetName()), FMath::RandRange(0, TNumericLimits<int32>::Max())));
            Thread = FRunnableThread::Create(InnerWorker.Get(), *ThreadName);

            bCanSendRequests = true;
            UE_LOG(LogInfraworldRuntime, Log, TEXT("Just made a thread: %s, address %p"), *ThreadName, InnerWorker.Get());
            UE_LOG(LogInfraworldRuntime, Fatal, TEXT("An inner worker of %s wasn't initialized"), *(GetClass()->GetName()));

    if (CanSendRequests())
        TickDelegateHandle = FTicker::GetCoreTicker().AddTicker(FTickerDelegate::CreateLambda([this](float)
            if (!ErrorMessageQueue.IsEmpty())
                FRpcError ReceivedError;
                EventRpcError.Broadcast(this, ReceivedError);

                // No need to call URpcClient::HierarchicalUpdate() if got any errors (Errors first)

            return true;

    return bCanSendRequests;

URpcClient::URpcClient() : InnerWorker(nullptr), TickDelegateHandle()

    UE_LOG(LogInfraworldRuntime, Verbose, TEXT("An instance of RPC Client has been destroyed. Still can send requests: %s"),

void URpcClient::Update()
    // Occasionally left blank

bool URpcClient::CanSendRequests() const
    return bCanSendRequests;

URpcClient* URpcClient::CreateRpcClient(TSubclassOf<URpcClient> Class, FRpcClientInstantiationParameters InstantiationParameters, UObject* Outer)
    const FString& URI = FString::Printf(TEXT("%s:%d"), *(InstantiationParameters.Ip), InstantiationParameters.Port);
    return CreateRpcClientUri(Class, URI, InstantiationParameters.ChannelCredentials, Outer);

URpcClient* URpcClient::CreateRpcClientUri(TSubclassOf<URpcClient> Class, const FString& URI, UChannelCredentials* ChannelCredentials, UObject* Outer)
    UObject* const RealOuter = Outer ? Outer : (UObject*)GetTransientPackage();
    if (URpcClient* const CreatedClient = NewObject<URpcClient>(RealOuter, *Class))
        UE_LOG(LogInfraworldRuntime, Log, TEXT("Created RpcClient at [%p] with outer [%p]"), CreatedClient, RealOuter);
        bool IsClientInitialized = CreatedClient->Init(URI, ChannelCredentials);
        if (!IsClientInitialized)
            UE_LOG(LogInfraworldRuntime, Error, TEXT("Unable to initialize an RPC client (%s::Init() failed"), *(Class->GetName()));
            return nullptr;
            UE_LOG(LogInfraworldRuntime, Verbose, TEXT("An instance of %s has been created and initialized"), *(Class->GetName()));
            return CreatedClient;
        UE_LOG(LogInfraworldRuntime, Fatal, TEXT("Unable to create an instance of RPC client (NewObject<%s>() failed)"), *(Class->GetName()));
        return nullptr;

void URpcClient::BeginDestroy()
    // Being called when GC'ed, should be called synchronously.
    if (CanSendRequests())


void URpcClient::Stop(bool bSynchronous)
    FRunnableThread* ThreadToStop = Thread.Exchange(nullptr);

    if (ThreadToStop)
        if (!InnerWorker->IsPendingStopped())

        bCanSendRequests = false;
        UE_LOG(LogInfraworldRuntime, Verbose, TEXT("Scheduled to stop %s via setting 'bCanSendRequests = false', address %p"), *(GetClass()->GetName()), InnerWorker.Get());

        // Should be synchronous in (almost) any case

        delete ThreadToStop;
        ThreadToStop = nullptr;
        UE_LOG(LogInfraworldRuntime, Error, TEXT("Can not call Stop() for an already stopped (or penfing asinchronously stopped) instance of '%s'"), *(GetClass()->GetName()));