guibranco/Sankhya-SDK-dotnet

View on GitHub
Src/Sankhya/RequestWrappers/PagedRequestWrapper.cs

Summary

Maintainability
D
2 days
Test Coverage
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using CrispyWaffle.Cache;
using CrispyWaffle.Composition;
using CrispyWaffle.Infrastructure;
using CrispyWaffle.Log;
using Sankhya.Enums;
using Sankhya.GoodPractices;
using Sankhya.Helpers;
using Sankhya.Properties;
using Sankhya.Service;
using Sankhya.Transport;
using Sankhya.ValueObjects;

namespace Sankhya.RequestWrappers;

/// <summary>
/// An erp managed service request.
/// </summary>
internal sealed class PagedRequestWrapper
{
    /// <summary>
    /// all pages loaded.
    /// </summary>
    private readonly AutoResetEvent _allPagesLoaded = new(false);

    /// <summary>
    /// The on demand tasks.
    /// </summary>
    private readonly List<Task> _onDemandTasks = new();

    /// <summary>
    /// The Sankhya context.
    /// </summary>
    private readonly SankhyaContext _context;

    /// <summary>
    /// The token identifying the session on SankhyaContext.
    /// </summary>
    private readonly Guid _token;

    /// <summary>
    /// The results loaded.
    /// </summary>
    private int _resultsLoaded;

    /// <summary>
    /// true if set.
    /// </summary>
    private bool _set;

    /// <summary>
    /// The entity name.
    /// </summary>
    private readonly string _entityName;

    /// <summary>
    /// The maximum results.
    /// </summary>
    private readonly int _maxResults;

    /// <summary>
    /// Collection of responses.
    /// </summary>
    private readonly Queue<EntityDynamicSerialization> _items;

    /// <summary>
    /// The request.
    /// </summary>
    private readonly ServiceRequest _request;

    /// <summary>
    /// The cache key.
    /// </summary>
    private readonly string _cacheKey;

    /// <summary>
    /// The type.
    /// </summary>
    private readonly Type _type;

    /// <summary>
    /// Delegate PageProcessedEventHandler.
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="e">The <see cref="PagedRequestEventArgs"/> instance containing the event data.</param>
    public delegate void PageProcessedEventHandler(object sender, PagedRequestEventArgs e);

    /// <summary>
    /// Delegate PageLoadedSuccessfullyEventHandler.
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="e">The <see cref="PagedRequestEventArgs"/> instance containing the event data.</param>
    public delegate void PageLoadedSuccessfullyEventHandler(object sender, PagedRequestEventArgs e);

    /// <summary>
    /// Delegate PageNotLoadedEventHandler.
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="e">The <see cref="PagedRequestEventArgs"/> instance containing the event data.</param>
    public delegate void PageNotLoadedEventHandler(object sender, PagedRequestEventArgs e);

    /// <summary>
    /// Occurs when [page loaded successfully].
    /// </summary>
    public event PageLoadedSuccessfullyEventHandler PageLoadedSuccessfully;

    /// <summary>
    /// Occurs when [page processed].
    /// </summary>
    public static event PageProcessedEventHandler PageProcessed;

    /// <summary>
    /// Occurs when [page loaded error].
    /// </summary>
    public event PageNotLoadedEventHandler PageLoadedError;

    /// <summary>
    /// Initializes a new instance of the <see cref="PagedRequestWrapper"/> class.
    /// </summary>
    /// <param name="type">The type.</param>
    /// <param name="request">The request.</param>
    /// <param name="maxResults">The maximum results.</param>
    private PagedRequestWrapper(Type type, ServiceRequest request, int maxResults)
    {
        _context = ServiceLocator.Resolve<SankhyaContext>();
        _entityName =
            request.RequestBody.Entity?.Name
            ?? request.RequestBody.Entity?.RootEntity
            ?? request.RequestBody.DataSet.RootEntity;
        _items = new();
        _maxResults = maxResults;
        _request = request;
        _resultsLoaded = 0;
        _token = _context.AcquireNewSession(ServiceRequestType.PagedCrud);
        _cacheKey = $@"PagedRequestWrapper_{_entityName}_{_context.UserName}";
        _type = type;
    }

    /// <summary>
    /// Called when [load page successfully].
    /// </summary>
    /// <param name="quantityLoaded">The quantity loaded.</param>
    /// <param name="currentPageNumber">The current page number.</param>
    /// <param name="totalPages">The total pages.</param>
    private void OnLoadPageSuccessfully(int quantityLoaded, int currentPageNumber, int totalPages)
    {
        if (PageLoadedSuccessfully == null && PageProcessed == null)
        {
            return;
        }

        LogConsumer.Trace(
            Resources.PagedRequestWrapper_Signaling,
            _entityName,
            Resources.PageLoaded
        );
        var eventArgs = new PagedRequestEventArgs(
            _type,
            quantityLoaded,
            _resultsLoaded,
            currentPageNumber,
            totalPages
        );
        PageLoadedSuccessfully?.Invoke(this, eventArgs);
        PageProcessed?.Invoke(this, eventArgs);
    }

    /// <summary>
    /// Called when [load page error].
    /// </summary>
    /// <param name="currentPageNumber">The current page number.</param>
    /// <param name="exception">The exception.</param>
    private void OnLoadPageError(int currentPageNumber, ServiceRequestGeneralException exception)
    {
        var ex = new PagedRequestException(_request, exception);

        if (PageLoadedError == null && PageProcessed == null)
        {
            LogConsumer.Handle(ex);
            return;
        }

        LogConsumer.Trace(Resources.PagedRequestWrapper_Signaling, _entityName, Resources.Error);

        var eventArgs = new PagedRequestEventArgs(
            _type,
            currentPageNumber,
            _resultsLoaded,
            exception
        );

        PageLoadedError?.Invoke(this, eventArgs);

        PageProcessed?.Invoke(this, eventArgs);
    }

    /// <summary>
    /// Loads the page.
    /// </summary>
    /// <param name="page">The page.</param>
    /// <param name="quantityLoaded">The quantity loaded.</param>
    /// <param name="totalPages">The total pages.</param>
    /// <returns>Boolean.</returns>
    private bool LoadPage(int page, out int quantityLoaded, out int totalPages)
    {
        quantityLoaded = -1;

        totalPages = 0;

        try
        {
            if (page > 1)
            {
                _request.RequestBody.DataSet.PageNumber = page;
            }

            var response = SankhyaContext.ServiceInvoker(_request, _token);

            _request.RequestBody.DataSet.PagerId = response
                .ResponseBody
                .CrudServiceProviderEntities
                .PagerId;

            quantityLoaded = response.ResponseBody.CrudServiceProviderEntities.Total;

            totalPages = response.ResponseBody.CrudServiceProviderEntities.TotalPages;

            Interlocked.Add(ref _resultsLoaded, quantityLoaded);

            foreach (
                EntityDynamicSerialization ds in response
                    .ResponseBody
                    .CrudServiceProviderEntities
                    .Entities
            )
            {
                _items.Enqueue(ds);
            }

            OnLoadPageSuccessfully(quantityLoaded, page, totalPages);

            return true;
        }
        catch (ServiceRequestGeneralException e)
        {
            OnLoadPageError(page, e);
        }

        return false;
    }

    /// <summary>
    /// Loads the response.
    /// </summary>
    /// <param name="token">The token.</param>
    /// <exception cref="ServiceRequestRepeatedException">Repeated request detected.</exception>
    private void LoadResponse(CancellationToken token)
    {
        if (_set)
        {
            throw new ServiceRequestRepeatedException(_request);
        }

        var pageNumber = 1;

        try
        {
            LoadResponseInternal(token, ref pageNumber);
        }
        catch (ServiceRequestGeneralException e)
        {
            OnLoadPageError(pageNumber, e);
        }
    }

    /// <summary>
    /// Loads the response internal.
    /// </summary>
    /// <param name="token">The cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
    /// <param name="pageNumber">The page number.</param>
    private void LoadResponseInternal(CancellationToken token, ref int pageNumber)
    {
        const int lockMinutes = 3;

        WaitAnotherLoad();

        CacheManager.Set(EnvironmentHelper.ProcessId, _cacheKey, new TimeSpan(0, lockMinutes, 0));

        var firstPage = LoadPage(pageNumber++, out var quantityLoaded, out var totalPages);

        var maxResultsReached = _maxResults != -1 && _maxResults <= quantityLoaded;

        if (
            !firstPage
            || quantityLoaded != 150
            || token.IsCancellationRequested
            || maxResultsReached
        )
        {
            Close();
            return;
        }

        while (true)
        {
            if (totalPages <= 0)
            {
                totalPages = 1 + pageNumber;
            }

            CacheManager.Set(
                EnvironmentHelper.ProcessId,
                _cacheKey,
                TimeSpan.FromMinutes(lockMinutes * (totalPages - pageNumber + 1))
            );

            var success = LoadPage(pageNumber++, out quantityLoaded, out totalPages);

            var shouldRequestNextPage = _maxResults != -1 && _maxResults > _resultsLoaded;

            if (
                success
                && quantityLoaded == 300
                && !token.IsCancellationRequested
                && shouldRequestNextPage
            )
            {
                continue;
            }

            Close();

            break;
        }
    }

    /// <summary>
    /// Waits another load.
    /// </summary>
    private void WaitAnotherLoad()
    {
        while (true)
        {
            if (
                !CacheManager.TryGet(_cacheKey, out int processId)
                || processId == EnvironmentHelper.ProcessId
            )
            {
                break;
            }

            var timeToWait = CacheManager.TTL(_cacheKey);

            if (timeToWait.Ticks == 0)
            {
                CacheManager.Remove(_cacheKey);
                break;
            }

            LogConsumer.Warning(
                Resources.PagedRequestWrapper_LoadResponse_Waiting,
                _entityName,
                timeToWait
            );

            Thread.Sleep(Math.Min(10000, (int)timeToWait.TotalMilliseconds));
        }
    }

    /// <summary>
    /// Closes this instance.
    /// </summary>
    private void Close()
    {
        if (_set)
        {
            return;
        }

        try
        {
            CacheManager.Remove(_cacheKey);
        }
        catch (Exception e)
        {
            LogConsumer.Handle(e);
        }

        LogConsumer.Trace(
            Resources.PagedRequestWrapper_Signaling,
            _entityName,
            Resources.AllPagesLoaded
        );
        LogConsumer.Info(
            Resources.PagedRequestWrapper_Close,
            _resultsLoaded,
            _resultsLoaded == 1 ? Resources.YSingularSuffix : Resources.YPluralSuffix,
            _entityName
        );
        _context.FinalizeSession(_token);
        _allPagesLoaded.Set();
        _set = true;
    }

    /// <summary>
    /// Gets the managed enumerator internal.
    /// </summary>
    /// <typeparam name="T">The type param.</typeparam>
    /// <param name="request">The request.</param>
    /// <param name="processOnDemandData">The process on demand data.</param>
    /// <param name="maxResults">The maximum results.</param>
    /// <param name="entityName">Name of the entity.</param>
    /// <param name="cts">The CTS.</param>
    /// <param name="stronglyTypedCollection">The strongly typed collection.</param>
    /// <returns>The exception.</returns>
    private static ServiceRequestGeneralException GetManagedEnumeratorInternal<T>(
        ServiceRequest request,
        Action<List<T>> processOnDemandData,
        int maxResults,
        string entityName,
        CancellationTokenSource cts,
        BlockingCollection<T> stronglyTypedCollection
    )
        where T : class, IEntity, new()
    {
        if (cts.IsCancellationRequested)
        {
            return null;
        }

        Thread.CurrentThread.Name = $@"PagedRequestWrapper of {entityName}";

        ServiceRequestGeneralException ex = null;

        var wrapper = new PagedRequestWrapper(typeof(T), request, maxResults);

        wrapper.PageLoadedError += (_, e) =>
            ex = HandlePageLoadedError(
                entityName,
                cts,
                stronglyTypedCollection,
                wrapper,
                e.Exception
            );
        wrapper.PageLoadedSuccessfully += (_, e) =>
            HandlePageLoaded(
                processOnDemandData,
                entityName,
                cts,
                stronglyTypedCollection,
                e.CurrentPage,
                e.TotalPages,
                e.QuantityLoaded,
                wrapper
            );

        try
        {
            wrapper.LoadResponse(cts.Token);
        }
        catch (ObjectDisposedException e)
        {
            LogConsumer.Trace(e);
        }

        wrapper._allPagesLoaded.WaitOne();

        Task.WaitAll(wrapper._onDemandTasks.ToArray());

        try
        {
            if (!cts.IsCancellationRequested)
            {
                stronglyTypedCollection.CompleteAdding();
            }

            wrapper.Dispose();
        }
        catch (ObjectDisposedException)
        {
            LogConsumer.Error(Resources.PagedRequestWrapper_GetManagedEnumeratorInternal);
        }

        return ex;
    }

    /// <summary>
    /// Handles the page loaded.
    /// </summary>
    /// <typeparam name="T">The type parameter.</typeparam>
    /// <param name="processOnDemandData">The process on demand data.</param>
    /// <param name="entityName">Name of the entity.</param>
    /// <param name="cts">The CTS.</param>
    /// <param name="stronglyTypedCollection">The strongly typed collection.</param>
    /// <param name="currentPageNumber">The current page number.</param>
    /// <param name="totalPages">The total pages.</param>
    /// <param name="quantityLoaded">The quantity loaded.</param>
    /// <param name="wrapper">The wrapper.</param>
    private static void HandlePageLoaded<T>(
        Action<List<T>> processOnDemandData,
        string entityName,
        CancellationTokenSource cts,
        BlockingCollection<T> stronglyTypedCollection,
        int currentPageNumber,
        int totalPages,
        int quantityLoaded,
        PagedRequestWrapper wrapper
    )
        where T : class, IEntity, new()
    {
        var ofTotal =
            totalPages > 0
                ? string.Format(CultureInfo.CurrentCulture, Resources.OfTotal, totalPages)
                : string.Empty;
        LogConsumer.Info(
            Resources.PagedRequestWrapper_GetManagedEnumerator_PageLoaded,
            currentPageNumber,
            ofTotal,
            entityName,
            quantityLoaded,
            quantityLoaded == 1 ? Resources.YSingularSuffix : Resources.YPluralSuffix
        );
        var temp = new List<T>();
        for (var i = 0; i < quantityLoaded; i++)
        {
            temp.Add(wrapper._items.Dequeue().ConvertToType<T>());
        }

        if (!temp.Any() || cts.IsCancellationRequested || stronglyTypedCollection.IsAddingCompleted)
        {
            return;
        }

        if (processOnDemandData == null)
        {
            temp.ForEach(stronglyTypedCollection.Add);
            return;
        }

        wrapper._onDemandTasks.Add(
            Task.Factory.StartNew(
                () => LoadOnDemandData(processOnDemandData, temp, stronglyTypedCollection, cts),
                cts.Token,
                TaskCreationOptions.AttachedToParent,
                TaskScheduler.Current
            )
        );
    }

    /// <summary>
    /// Loads the on demand data.
    /// </summary>
    /// <typeparam name="T">The type parameter.</typeparam>
    /// <param name="processOnDemandData">The process on demand data.</param>
    /// <param name="temp">The temporary.</param>
    /// <param name="stronglyTypedCollection">The strongly typed collection.</param>
    /// <param name="cts">The CTS.</param>
    private static void LoadOnDemandData<T>(
        Action<List<T>> processOnDemandData,
        List<T> temp,
        BlockingCollection<T> stronglyTypedCollection,
        CancellationTokenSource cts
    )
        where T : class, IEntity, new()
    {
        try
        {
            processOnDemandData(temp);
            if (!cts.IsCancellationRequested && !stronglyTypedCollection.IsAddingCompleted)
            {
                temp.ForEach(stronglyTypedCollection.Add);
            }
        }
        catch (Exception e)
        {
            cts.Cancel();
            LogConsumer.Handle(e);
        }
    }

    /// <summary>
    /// Handles the page loaded error.
    /// </summary>
    /// <typeparam name="T">The type parameter.</typeparam>
    /// <param name="entityName">Name of the entity.</param>
    /// <param name="cts">The CTS.</param>
    /// <param name="stronglyTypedCollection">The strongly typed collection.</param>
    /// <param name="wrapper">The wrapper.</param>
    /// <param name="exception">The exception.</param>
    /// <returns>Service request general exception.</returns>
    private static ServiceRequestGeneralException HandlePageLoadedError<T>(
        string entityName,
        CancellationTokenSource cts,
        BlockingCollection<T> stronglyTypedCollection,
        PagedRequestWrapper wrapper,
        Exception exception
    )
        where T : class, IEntity, new()
    {
        LogConsumer.Warning(
            Resources.PagedRequestWrapper_GetManagedEnumerator_ErrorOccured,
            entityName
        );

        wrapper.Close();

        var ex = exception;

        try
        {
            if (!cts.IsCancellationRequested)
            {
                stronglyTypedCollection.CompleteAdding();
            }
        }
        catch (Exception e)
        {
            cts.Cancel();
            LogConsumer.Handle(e);
        }

        return (ServiceRequestGeneralException)ex;
    }

    /// <summary>
    /// Handles the cancellation token cancelled.
    /// </summary>
    /// <param name="request">The request.</param>
    /// <param name="entityName">Name of the entity.</param>
    /// <param name="timeout">The timeout.</param>
    private static void HandleCancellationTokenCancelled(
        ServiceRequest request,
        string entityName,
        TimeSpan timeout
    ) =>
        LogConsumer.Error(
            string.Format(
                CultureInfo.CurrentCulture,
                Resources.PagedRequestWrapper_HandleCancellationTokenCancelled,
                entityName,
                timeout.TotalSeconds,
                request.ServiceInternal
            )
        );

    /// <summary>
    /// Gets the managed enumerator.
    /// </summary>
    /// <typeparam name="T">The type parameter.</typeparam>
    /// <param name="request">The request.</param>
    /// <param name="timeout">The timeout.</param>
    /// <param name="processOnDemandData">The process on demand data.</param>
    /// <param name="maxResults">The maximum results.</param>
    /// <returns>IEnumerable&lt;T&gt;.</returns>
    public static IEnumerable<T> GetManagedEnumerator<T>(
        ServiceRequest request,
        TimeSpan timeout,
        Action<List<T>> processOnDemandData = null,
        int maxResults = -1
    )
        where T : class, IEntity, new()
    {
        var entityName = typeof(T).GetEntityName();

        ServiceRequestGeneralException ex = null;

        using var cts = new CancellationTokenSource(timeout);

        cts.Token.Register(() => HandleCancellationTokenCancelled(request, entityName, timeout));

        using var stronglyTypedCollection = new BlockingCollection<T>();

        var timer = new Stopwatch();

        timer.Start();

        var task = Task.Run(
            () =>
                ex = GetManagedEnumeratorInternal(
                    request,
                    processOnDemandData,
                    maxResults,
                    entityName,
                    cts,
                    stronglyTypedCollection
                ),
            cts.Token
        );

        var counter = 0;

        foreach (var item in stronglyTypedCollection.GetConsumingEnumerable(cts.Token))
        {
            counter++;

            yield return item;

            if (stronglyTypedCollection.IsCompleted)
            {
                yield break;
            }
        }

        task.Wait(cts.Token);

        timer.Stop();

        LogConsumer.Trace(
            Resources.PagedRequestWrapper_GetManagedEnumerator,
            counter,
            entityName,
            timer.Elapsed
        );

        if (ex != null)
        {
            throw ex;
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting
    /// unmanaged resources.
    /// </summary>
    private void Dispose() => Close();
}