Anapher/Strive

View on GitHub
src/Services/ConferenceManagement/Strive.Infrastructure/KeyValue/InMemory/InMemoryDatabaseTransaction.cs

Summary

Maintainability
A
2 hrs
Test Coverage
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using Nito.Disposables;
using StackExchange.Redis;
using Strive.Core.Extensions;
using Strive.Infrastructure.KeyValue.Abstractions;
using Strive.Infrastructure.KeyValue.Redis.Scripts;

namespace Strive.Infrastructure.KeyValue.InMemory
{
    public class InMemoryDatabaseTransaction : InMemoryDatabaseActions, IKeyValueDatabaseTransaction
    {
        private readonly InMemoryKeyValueData _data;
        private readonly List<Func<ValueTask>> _transactionSteps = new();
        private readonly CancellationTokenSource _cancellationTokenSource = new();
        private bool _isExecuting;

        public InMemoryDatabaseTransaction(InMemoryKeyValueData data) : base(data.Data)
        {
            _data = data;
        }

        public void Dispose()
        {
            _cancellationTokenSource.Cancel();
            _cancellationTokenSource.Dispose();
        }

        public async ValueTask<bool> ExecuteAsync()
        {
            _cancellationTokenSource.Token.ThrowIfCancellationRequested();

            using (_data.Lock.WriterLock())
            {
                _isExecuting = true;

                foreach (var step in _transactionSteps)
                {
                    await step();
                }
            }

            return true;
        }

        private ValueTask<T> AddTransactionStep<T>(Func<ValueTask<T>> operation)
        {
            var waitHandler = new TaskCompletionSource<T>();

            async ValueTask Step()
            {
                var result = await operation();
                waitHandler.SetResult(result);
            }

            _cancellationTokenSource.Token.Register(() => waitHandler.TrySetCanceled());
            _transactionSteps.Add(Step);

            return new ValueTask<T>(waitHandler.Task);
        }

        private async ValueTask AddTransactionStep(Func<ValueTask> operation)
        {
            await AddTransactionStep(async () =>
            {
                await operation();
                return Unit.Value;
            });
        }

        protected override IDisposable Lock()
        {
            if (!_isExecuting)
                throw new InvalidOperationException(
                    "It seems like the method that is tries to lock is not wrapped in AddTransactionStep() " +
                    "(and therefor executing immediately instead of executing in the transaction). Please declare " +
                    $"the method as virtual in {nameof(InMemoryDatabaseActions)} and override it in this class");

            return NoopDisposable.Instance;
        }

        public override ValueTask<bool> KeyDeleteAsync(string key)
        {
            return AddTransactionStep(() => base.KeyDeleteAsync(key));
        }

        public override ValueTask<string?> HashGetAsync(string key, string field)
        {
            return AddTransactionStep(() => base.HashGetAsync(key, field));
        }

        public override ValueTask HashSetAsync(string key, IEnumerable<KeyValuePair<string, string>> keyValuePairs)
        {
            return AddTransactionStep(() => base.HashSetAsync(key, keyValuePairs));
        }

        public override ValueTask HashSetAsync(string key, string field, string value)
        {
            return AddTransactionStep(() =>
                base.HashSetAsync(key, new KeyValuePair<string, string>(field, value).Yield()));
        }

        public override ValueTask<bool> HashExistsAsync(string key, string field)
        {
            return AddTransactionStep(() => base.HashExistsAsync(key, field));
        }

        public override ValueTask<bool> HashDeleteAsync(string key, string field)
        {
            return AddTransactionStep(() => base.HashDeleteAsync(key, field));
        }

        public override ValueTask<IReadOnlyDictionary<string, string>> HashGetAllAsync(string key)
        {
            return AddTransactionStep(() => base.HashGetAllAsync(key));
        }

        public override ValueTask<string?> GetAsync(string key)
        {
            return AddTransactionStep(() => base.GetAsync(key));
        }

        public override ValueTask<string?> GetSetAsync(string key, string value)
        {
            return AddTransactionStep(() => base.GetSetAsync(key, value));
        }

        public override ValueTask SetAsync(string key, string value)
        {
            return AddTransactionStep(() => base.SetAsync(key, value));
        }

        public override ValueTask<RedisResult> ExecuteScriptAsync(RedisScript script, params string[] parameters)
        {
            return AddTransactionStep(() => base.ExecuteScriptAsync(script, parameters));
        }

        public override ValueTask ListRightPushAsync(string key, string item)
        {
            return AddTransactionStep(() => base.ListRightPushAsync(key, item));
        }

        public override ValueTask<int> ListLenAsync(string key)
        {
            return AddTransactionStep(() => base.ListLenAsync(key));
        }

        public override ValueTask<long> ListRemoveAsync(string key, string item)
        {
            return AddTransactionStep(() => base.ListRemoveAsync(key, item));
        }

        public override ValueTask<string?> ListLeftPopAsync(string key)
        {
            return AddTransactionStep(() => base.ListLeftPopAsync(key));
        }

        public override ValueTask<IReadOnlyList<string>> ListRangeAsync(string key, int start, int end)
        {
            return AddTransactionStep(() => base.ListRangeAsync(key, start, end));
        }

        public override ValueTask<bool> SetAddAsync(string key, string value)
        {
            return AddTransactionStep(() => base.SetAddAsync(key, value));
        }

        public override ValueTask<bool> SetRemoveAsync(string key, string value)
        {
            return AddTransactionStep(() => base.SetRemoveAsync(key, value));
        }

        public override ValueTask<IReadOnlyList<string>> SetMembersAsync(string key)
        {
            return AddTransactionStep(() => base.SetMembersAsync(key));
        }
    }
}