seancarroll/fdb-java-es

View on GitHub
src/main/java/com/eventfully/foundationdb/eventstore/EventStoreLayer.java

Summary

Maintainability
F
3 days
Test Coverage
package com.eventfully.foundationdb.eventstore;

import com.apple.foundationdb.Database;
import com.apple.foundationdb.KeySelector;
import com.apple.foundationdb.KeyValue;
import com.apple.foundationdb.MutationType;
import com.apple.foundationdb.ReadTransaction;
import com.apple.foundationdb.StreamingMode;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.directory.DirectoryLayer;
import com.apple.foundationdb.directory.DirectorySubspace;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.Versionstamp;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static java.util.concurrent.CompletableFuture.completedFuture;

// TODO: for append, should we be starting a transaction that encompasses the read + write just to make sure nothing
// else can write to our expected version?
// internal methods would need to take transaction

// TODO: run vs runAsync
// https://forums.foundationdb.org/t/fdbdatabase-usage-from-java-api/593/2
// run synchronously commit your transaction
// he runAsync retry loop will only call commit after the returned future has completed (so you could do something like read a key from database, make some write based on your read, and then commit).

//To define ranges that extend from the beginning the database, you can use the empty string '':
//
//    for k, v in tr.get_range('', 'm'):
//    print(k, v)
//    Likewise, to define ranges that extend to the end of the database, you can use the key '\xFF':
//
//    for k, v in tr.get_range('m', '\xFF'):
//    print(k, v)

// TODO: for multi-tenant that is using the shared cluster we need to allow passing in a directory
// so that we can separate tenants

/**
 *
 */
public class EventStoreLayer implements EventStore {

    private static final Logger LOG = LoggerFactory.getLogger(EventStoreLayer.class);

    public static final int MAX_READ_SIZE = 4096;

    private final Database database;
    private final DirectorySubspace esSubspace;

    /**
     *
     * @param database the foundationDB database
     * @param subspace the directory you wish use to store events for your event store. TODO: does this need to allow manual subspaces?
     */
    public EventStoreLayer(Database database, DirectorySubspace subspace) {
        this.database = database;
        this.esSubspace = subspace;
    }

    /**
     * Default factory method
     * @param database the foundationDB database
     * @return an EventStoreLayer with a under an "es" Directory
     * @see EventStoreLayer#getDefaultDirectorySubspace
     */
    public static CompletableFuture<EventStoreLayer> getDefault(Database database) {
        return getDefaultDirectorySubspace(database)
            .thenCompose(esSubspace -> completedFuture(new EventStoreLayer(database, esSubspace)));
    }

    /**
     *
     * @param database
     * @return
     */
    public static CompletableFuture<DirectorySubspace> getDefaultDirectorySubspace(Database database) {
        return DirectoryLayer.getDefault().createOrOpen(database, Collections.singletonList("es"));
    }

    @Override
    public CompletableFuture<AppendResult> appendToStream(String streamId, long expectedVersion, NewStreamMessage... messages) {
        Preconditions.checkNotNull(streamId);
        if (messages == null || messages.length == 0) {
            throw new IllegalArgumentException("messages must not be null or empty");
        }

        StreamId stream = new StreamId(streamId);
        if (expectedVersion == ExpectedVersion.ANY) {
            return appendToStreamExpectedVersionAny(stream, messages);
        }
        if (expectedVersion == ExpectedVersion.NO_STREAM) {
            return appendToStreamExpectedVersionNoStream(stream, messages);
        }
        return appendToStreamExpectedVersion(stream, expectedVersion, messages);
    }

    // TODO: write test for appendToStreamExpectedVersionAny where stream does not exist and verify starting position
    private CompletableFuture<AppendResult> appendToStreamExpectedVersionAny(StreamId streamId, NewStreamMessage[] messages) {
        final AtomicLong latestStreamVersion = new AtomicLong(0);
        return database.runAsync(tr -> {
            CompletableFuture<ReadEventResult> readEventResultFuture = readEventInternal(tr, streamId, StreamPosition.END);
            return readEventResultFuture.thenCompose(readEventResult -> {
                latestStreamVersion.set(readEventResult.getEventNumber());
                return write(tr, streamId, latestStreamVersion, messages);
            });
        })
            .thenCompose(Function.identity())
            .thenApply(Versionstamp::complete)
            .thenApply(completedVersion -> new AppendResult(latestStreamVersion.get(), completedVersion));
    }

    // TODO: clean up
    private CompletableFuture<AppendResult> appendToStreamExpectedVersionNoStream(StreamId streamId, NewStreamMessage[] messages) {
        AtomicLong latestStreamVersion = new AtomicLong(-1);
        return database.runAsync(tr -> {
            CompletableFuture<ReadStreamSlice> backwardSliceFuture = readStreamBackwardsInternal(tr, streamId, StreamPosition.END, 1);
            return backwardSliceFuture.thenApplyAsync(backwardSlice -> {
                if (SliceReadStatus.STREAM_NOT_FOUND != backwardSlice.getStatus()) {
                    throw new WrongExpectedVersionException(streamId.getOriginalId(), StreamVersion.NONE);
                }
                return completedFuture(null);
            }).thenComposeAsync(ignore -> write(tr, streamId, latestStreamVersion, messages));
        })
            .thenCompose(Function.identity())
            .thenApply(Versionstamp::complete)
            .thenApply(completedVersion -> new AppendResult(latestStreamVersion.get(), completedVersion));
    }

    private CompletableFuture<AppendResult> appendToStreamExpectedVersion(StreamId streamId, long expectedVersion, NewStreamMessage[] messages) {
        final AtomicLong latestStreamVersion = new AtomicLong(0);
        return database.runAsync(tr -> {
            CompletableFuture<ReadEventResult> readEventResultFuture = readEventInternal(tr, streamId, StreamPosition.END);
            // TODO: do we need to do any version/event number checking?
            return readEventResultFuture.thenCompose(readEventResult -> {
                if (!Objects.equals(expectedVersion, readEventResult.getEventNumber())) {
                    throw new WrongExpectedVersionException(streamId.getOriginalId(), expectedVersion, readEventResult.getEventNumber());
                }
                return completedFuture(readEventResult);
            }).thenCompose(readEventResult -> {
                latestStreamVersion.set(readEventResult.getEventNumber());
                return write(tr, streamId, latestStreamVersion, messages);
            });
        })
            .thenCompose(Function.identity())
            .thenApply(Versionstamp::complete)
            .thenApply(completedVersion -> new AppendResult(latestStreamVersion.get(), completedVersion));
    }

    private CompletableFuture<CompletableFuture<byte[]>> write(Transaction tr,
                                                               StreamId streamId,
                                                               AtomicLong latestStreamVersion,
                                                               NewStreamMessage[] messages) {
        Subspace globalSubspace = getGlobalSubspace();
        Subspace streamSubspace = getStreamSubspace(streamId);

        for (int i = 0; i < messages.length; i++) {
            long eventNumber = latestStreamVersion.incrementAndGet();
            Versionstamp versionstamp = Versionstamp.incomplete(i);
            Tuple streamSubspaceValue = packStreamSubspaceValue(streamId, messages[i], eventNumber, versionstamp);
            tr.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, streamSubspace.subspace(Tuple.from(latestStreamVersion)).pack(), streamSubspaceValue.packWithVersionstamp());
            Tuple globalSubspaceValue = Tuple.from(streamId.getOriginalId(), eventNumber);
            // we cant encode additional values (stream / event) into the key as for some reason it causes issues with
            // getRange and inclusive end
            tr.mutate(MutationType.SET_VERSIONSTAMPED_KEY, globalSubspace.packWithVersionstamp(Tuple.from(versionstamp)), globalSubspaceValue.pack());
        }

        return completedFuture(tr.getVersionstamp());
    }

    @Override
    public void deleteStream(String streamId) {
        Preconditions.checkNotNull(streamId);
        // TODO: how to handle?
        // We can clear the stream subspace via clear(Range) but how to delete from global subspace?
        // would we need a scavenger process? something else?
        // add to a delete job scavenger process which contains the stream id/hash to delete
        // database.run(tr -> {);
        throw new RuntimeException("Not implemented exception");
    }

    @Override
    public SetStreamMetadataResult setStreamMetadata(String streamId,
                                                     long expectedStreamMetadataVersion,
                                                     Integer maxAge,
                                                     Integer maxCount,
                                                     String metadataJson) {
        // TODO: implement
        return null;
    }

    @Override
    public CompletableFuture<ReadAllSlice> readAllForwards(Versionstamp fromPositionInclusive, int maxCount) {
        Preconditions.checkNotNull(fromPositionInclusive);
        Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
        Preconditions.checkArgument(maxCount <= MAX_READ_SIZE, "maxCount should be less than %d", MAX_READ_SIZE);

        return database.readAsync(tr -> readAllForwardInternal(tr, fromPositionInclusive, maxCount));
    }

    @Override
    public CompletableFuture<ReadAllSlice> readAllBackwards(Versionstamp fromPositionInclusive, int maxCount) {
        Preconditions.checkNotNull(fromPositionInclusive);
        Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
        Preconditions.checkArgument(maxCount <= MAX_READ_SIZE, "maxCount should be less than %d", MAX_READ_SIZE);

        return database.readAsync(tr -> readAllBackwardInternal(tr, fromPositionInclusive, maxCount));
    }

    private CompletableFuture<ReadAllSlice> readAllForwardInternal(ReadTransaction tr, Versionstamp fromPositionInclusive, int maxCount) {
        Subspace globalSubspace = getGlobalSubspace();

        // add one so we can determine if we are at the end of the stream
        int rangeCount = maxCount + 1;

        KeySelector begin = Objects.equals(fromPositionInclusive, Position.END)
            ? KeySelector.lastLessOrEqual(globalSubspace.range().end)
            : KeySelector.firstGreaterOrEqual(globalSubspace.pack(fromPositionInclusive));

        CompletableFuture<List<KeyValue>> kvs = tr.getRange(
            begin,
            KeySelector.firstGreaterThan(globalSubspace.range().end),
            rangeCount,
            false,
            StreamingMode.WANT_ALL
        ).asList();

        ReadNextAllSlice readNext = (Versionstamp nextPosition)
            -> database.readAsync(readTransaction -> readAllForwardInternal(readTransaction, nextPosition, maxCount));

        return kvs.thenCompose(keyValues -> {
            if (keyValues.isEmpty()) {
                return completedFuture(new ReadAllSlice(
                    fromPositionInclusive,
                    fromPositionInclusive,
                    true,
                    ReadDirection.FORWARD,
                    readNext,
                    Empty.STREAM_MESSAGES)
                );
            }

            int limit = Math.min(maxCount, keyValues.size());
            List<CompletableFuture<ReadEventResult>> completableFutures = new ArrayList<>(limit);
            for (int i = 0; i < limit; i++) {
                KeyValue kv = keyValues.get(i);
                Tuple t = Tuple.fromBytes(kv.getValue());
                completableFutures.add(readEvent(t.getString(0), t.getLong(1)));
            }

            // allof doesnt work with lists however if we make completablesFutures an array we run into a
            // different issue with the `.map(ReadEventResult::getEvent)` as the array needs to be bound to ?
            return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
                .thenCompose(ignore -> {
                    StreamMessage[] messages = completableFutures.stream()
                        .map(CompletableFuture::join)
                        .map(ReadEventResult::getEvent)
                        .toArray(size -> new StreamMessage[completableFutures.size()]);


                    // null represents end and used to mean not known
                    // TODO: see what EventStore does
                    Versionstamp nextPosition = maxCount >= keyValues.size()
                        ? null
                        : globalSubspace.unpack(keyValues.get(keyValues.size() - 1).getKey()).getVersionstamp(0);

                    return completedFuture(new ReadAllSlice(
                        fromPositionInclusive,
                        nextPosition,
                        maxCount >= keyValues.size(),
                        ReadDirection.FORWARD,
                        readNext,
                        messages)
                    );
                });
        });
    }

    private CompletableFuture<ReadAllSlice> readAllBackwardInternal(ReadTransaction tr, Versionstamp fromPositionInclusive, int maxCount) {
        Subspace globalSubspace = getGlobalSubspace();

        // add one so we can determine if we are at the end of the stream
        int rangeCount = maxCount + 1;

        final KeySelector end;
        if (Objects.equals(fromPositionInclusive, Position.START)) {
            // TODO: check this...I feel like its wrong
            // firstGreaterThan (+1) doesn't work when attempting to get start position
            // Seems like range queries don't work when begin has firstGreaterOrEqual and end with firstGreaterThan or firstGreaterOrEqual
            // so will bump the offset by 2
            end = new KeySelector(globalSubspace.range().begin, false, 2);
        } else if (Objects.equals(fromPositionInclusive, Position.END)) {
            end = KeySelector.firstGreaterThan(globalSubspace.range().end);
        } else {
            end = KeySelector.firstGreaterThan(globalSubspace.pack(Tuple.from(fromPositionInclusive)));
        }

        CompletableFuture<List<KeyValue>> kvs = tr.getRange(
            KeySelector.firstGreaterOrEqual(globalSubspace.range().begin),
            end,
            rangeCount,
            true,
            StreamingMode.WANT_ALL
        ).asList();

        ReadNextAllSlice readNext = (Versionstamp nextPosition)
            -> database.readAsync(readTransaction -> readAllBackwardInternal(readTransaction, nextPosition, maxCount));

        return kvs.thenCompose(keyValues -> {
            if (keyValues.isEmpty()) {
                return completedFuture(new ReadAllSlice(
                    fromPositionInclusive,
                    fromPositionInclusive,
                    true,
                    ReadDirection.BACKWARD,
                    readNext,
                    Empty.STREAM_MESSAGES)
                );
            }

            int limit = Math.min(maxCount, keyValues.size());
            List<CompletableFuture<ReadEventResult>> completableFutures = new ArrayList<>(limit);
            for (int i = 0; i < limit; i++) {
                KeyValue kv = keyValues.get(i);
                Tuple t = Tuple.fromBytes(kv.getValue());
                completableFutures.add(readEvent(t.getString(0), t.getLong(1)));
            }

            return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
                .thenCompose(ignore -> {
                    StreamMessage[] messages = completableFutures.stream()
                        .map(CompletableFuture::join)
                        .map(ReadEventResult::getEvent)
                        .toArray(size -> new StreamMessage[completableFutures.size()]);

                    // TODO: improve this so its not so obtuse
                    Versionstamp nextPosition = maxCount >= keyValues.size()
                        ? globalSubspace.unpack(keyValues.get(Math.max(0, keyValues.size() - 1)).getKey()).getVersionstamp(0)
                        : globalSubspace.unpack(keyValues.get(keyValues.size() - 1).getKey()).getVersionstamp(0);

                    return completedFuture(new ReadAllSlice(
                        fromPositionInclusive,
                        nextPosition,
                        maxCount >= keyValues.size(),
                        ReadDirection.BACKWARD,
                        readNext,
                        messages)
                    );
                });
        });

    }

    @Override
    public CompletableFuture<ReadStreamSlice> readStreamForwards(String streamId, long fromVersionInclusive, int maxCount) {
        Preconditions.checkNotNull(streamId);
        Preconditions.checkArgument(fromVersionInclusive >= -1, "fromVersionInclusive must greater than -1");
        Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
        Preconditions.checkArgument(maxCount <= MAX_READ_SIZE, "maxCount should be less than %d", MAX_READ_SIZE);

        return database.readAsync(tr -> readStreamForwardsInternal(tr, new StreamId(streamId), fromVersionInclusive, maxCount));
    }

    private CompletableFuture<ReadStreamSlice> readStreamForwardsInternal(ReadTransaction tr,
                                                                          StreamId streamId,
                                                                          long fromVersionInclusive,
                                                                          int maxCount) {
        Subspace streamSubspace = getStreamSubspace(streamId);

        // add one so we can determine if we are at the end of the stream
        int rangeCount = maxCount + 1;

        KeySelector begin = fromVersionInclusive == StreamPosition.END
            ? KeySelector.lastLessOrEqual(streamSubspace.range().end)
            : KeySelector.firstGreaterOrEqual(streamSubspace.pack(fromVersionInclusive));

        CompletableFuture<List<KeyValue>> kvs = tr.getRange(
            begin,
            KeySelector.firstGreaterOrEqual(streamSubspace.range().end),
            rangeCount,
            false,
            StreamingMode.WANT_ALL
        ).asList();

        ReadNextStreamSlice readNext = (long nextPosition)
            -> database.readAsync(readTransaction -> readStreamForwardsInternal(readTransaction, streamId, nextPosition, maxCount));

        return kvs.thenCompose(keyValues -> {
            if (keyValues.isEmpty()) {
                return completedFuture(ReadStreamSlice.notFound(streamId, fromVersionInclusive, ReadDirection.FORWARD, readNext));
            }

            int limit = Math.min(maxCount, keyValues.size());
            StreamMessage[] messages = new StreamMessage[limit];
            for (int i = 0; i < limit; i++) {
                StreamMessage message = unpackByteTupleToStreamMessage(streamId, keyValues.get(i).getValue());
                messages[i] = message;
            }

            // TODO: review this. What should next position be if at end and when not at end?
            Tuple nextPositionValue = Tuple.fromBytes(keyValues.get(limit - 1).getValue());
            long nextPosition = nextPositionValue.getLong(5) + 1;

            return completedFuture(new ReadStreamSlice(
                streamId.getOriginalId(),
                SliceReadStatus.SUCCESS,
                fromVersionInclusive,
                nextPosition,
                0, // TODO: fix
                0L, // TODO: fix
                ReadDirection.FORWARD,
                maxCount >= keyValues.size(),
                readNext,
                messages)
            );
        });
    }

    @Override
    public CompletableFuture<ReadStreamSlice> readStreamBackwards(String streamId, long fromVersionInclusive, int maxCount) {
        Preconditions.checkNotNull(streamId);
        Preconditions.checkArgument(fromVersionInclusive >= -1, "fromVersionInclusive must greater than -1");
        Preconditions.checkArgument(maxCount > 0, "maxCount must be greater than 0");
        Preconditions.checkArgument(maxCount <= MAX_READ_SIZE, "maxCount should be less than %d", MAX_READ_SIZE);

        return database.readAsync(tr -> readStreamBackwardsInternal(tr, new StreamId(streamId), fromVersionInclusive, maxCount));
    }

    private CompletableFuture<ReadStreamSlice> readStreamBackwardsInternal(ReadTransaction tr,
                                                                           StreamId streamId,
                                                                           long fromVersionInclusive,
                                                                           int maxCount) {
        Subspace streamSubspace = getStreamSubspace(streamId);

        int rangeCount = maxCount + 1;
        CompletableFuture<List<KeyValue>> kvs = tr.getRange(
            streamSubspace.pack(Tuple.from(fromVersionInclusive - maxCount)),
            // adding one because readTransaction.getRange's end range is exclusive
            streamSubspace.pack(Tuple.from(fromVersionInclusive == StreamPosition.END ? Long.MAX_VALUE : fromVersionInclusive + 1)),
            rangeCount,
            true,
            StreamingMode.WANT_ALL
        ).asList();

        ReadNextStreamSlice readNext = (long nextPosition)
            -> database.readAsync(readTransaction -> readStreamBackwardsInternal(readTransaction, streamId, nextPosition, maxCount));

        return kvs.thenCompose(keyValues -> {
            if (keyValues.isEmpty()) {
                return completedFuture(ReadStreamSlice.notFound(streamId, fromVersionInclusive, ReadDirection.BACKWARD, readNext));
            }

            int limit = Math.min(maxCount, keyValues.size());
            StreamMessage[] messages = new StreamMessage[limit];
            for (int i = 0; i < limit; i++) {
                StreamMessage message = unpackByteTupleToStreamMessage(streamId, keyValues.get(i).getValue());
                messages[i] = message;
            }

            // TODO: review this. What should next position be if at end and when not at end?
            Tuple nextPositionValue = Tuple.fromBytes(keyValues.get(limit - 1).getValue());
            long nextPosition = nextPositionValue.getLong(5) - 1;

            return completedFuture(new ReadStreamSlice(
                streamId.getOriginalId(),
                SliceReadStatus.SUCCESS,
                fromVersionInclusive,
                nextPosition,
                0, // TODO: fix
                0L, // TODO: fix
                ReadDirection.BACKWARD,
                maxCount >= keyValues.size(),
                readNext,
                messages)
            );
        });
    }

    @Override
    public CompletableFuture<Versionstamp> readHeadPosition() {
        Subspace globalSubspace = getGlobalSubspace();

        // TODO: just call readAllBackwards by 1?
        return database.readAsync(tr -> tr.getKey(KeySelector.lastLessThan(globalSubspace.range().end)))
            .thenCompose(k -> {
                if (ByteBuffer.wrap(k).compareTo(ByteBuffer.wrap(globalSubspace.range().begin)) < 0) {
                    return completedFuture(null);
                }

                return completedFuture(globalSubspace.unpack(k).getVersionstamp(0));
            });
    }

    @Override
    public StreamMetadataResult getStreamMetadata(String streamId) {
        // TODO: implement
        return null;
    }

    @Override
    public CompletableFuture<ReadEventResult> readEvent(String stream, long eventNumber) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(stream));
        Preconditions.checkArgument(eventNumber >= -1);
        return database.readAsync(readTransaction -> readEventInternal(readTransaction, new StreamId(stream), eventNumber));
    }

    private CompletableFuture<ReadEventResult> readEventInternal(ReadTransaction tr, StreamId streamId, long eventNumber) {
        Subspace streamSubspace = getStreamSubspace(streamId);

        if (Objects.equals(eventNumber, StreamPosition.END)) {
            CompletableFuture<ReadStreamSlice> readFuture = readStreamBackwardsInternal(tr, streamId, StreamPosition.END, 1);
            return readFuture.thenCompose(read -> {
                if (read.getStatus() == SliceReadStatus.STREAM_NOT_FOUND) {
                    return completedFuture(new ReadEventResult(ReadEventStatus.NOT_FOUND, streamId.getOriginalId(), eventNumber, null));
                }

                return completedFuture(new ReadEventResult(ReadEventStatus.SUCCESS, streamId.getOriginalId(), read.getMessages()[0].getStreamVersion(), read.getMessages()[0]));
            });

        } else {
            CompletableFuture<byte[]> valueBytesFuture = tr.get(streamSubspace.pack(Tuple.from(eventNumber)));
            return valueBytesFuture.thenCompose(valueBytes -> {
                if (valueBytes == null) {
                    return completedFuture(new ReadEventResult(ReadEventStatus.NOT_FOUND, streamId.getOriginalId(), eventNumber, null));
                }

                StreamMessage message = unpackByteTupleToStreamMessage(streamId, valueBytes);
                return completedFuture(new ReadEventResult(ReadEventStatus.SUCCESS, streamId.getOriginalId(), eventNumber, message));
            });
        }
    }

    private static Tuple packStreamSubspaceValue(StreamId streamId, NewStreamMessage message, long eventNumber, Versionstamp versionstamp) {
        return Tuple.from(
            message.getMessageId(),
            streamId.getOriginalId(),
            message.getType(),
            message.getData(),
            message.getMetadata(),
            eventNumber,
            Instant.now().toEpochMilli(),
            versionstamp
        );
    }

    private static StreamMessage unpackByteTupleToStreamMessage(StreamId streamId, byte[] bytes) {
        Tuple value = Tuple.fromBytes(bytes);
        return new StreamMessage(
            streamId.getOriginalId(),
            value.getUUID(0),
            value.getLong(5),
            value.getVersionstamp(7),
            value.getLong(6),
            value.getString(2),
            value.getBytes(4),
            value.getBytes(3)
        );
    }

    private Subspace getGlobalSubspace() {
        return esSubspace.subspace(Tuple.from(EventStoreSubspaces.GLOBAL.getValue()));
    }

    private Subspace getStreamSubspace(StreamId streamId) {
        return esSubspace.subspace(Tuple.from(EventStoreSubspaces.STREAM.getValue(), streamId.getHash()));
    }

}