
View on GitHub


Test Coverage
# Scratch Pad

TODO: Clean up! Add better description. Add details about subspace and tuple layout. Add sample code to write and read data    
TODO: javadocs - remove language to eventstore or sqlstreamstore outside of readme  
TODO: do we want to wrap Versionstamp in a "(Global )Position" domain object?     

An Eventstore Layer build on top of FoundationDB

TODO: add high level information about design/implementation  
fdbStore maintains two subspaces:  
Global / [versionstamp] / contract / <- vs pointer  
Aggregate / id / version / contract /  

Global / [versionstamp] / stream message (id, type, content, message metadata, etc) / <- vs pointer  
Stream / id (stream hash) / version / stream message (id, type, content, message metadata, etc) /  
do we want contract aka type to be a subspace or part of the value tuple?  
FoundationDB version timestamp doesnt appear to work. likely because we only get one versionstamp per transaction  
could we use the 2 byte user/client portion? take the index of each message as the user bytes portion.  
If we did that we could only support arrays length up to a short (32,767)  

TODO: add sample to write and read

Task<StreamEventsSlice> ReadStreamEventsForwardAsync(string stream, long start, int count, bool resolveLinkTos)
The ReadStreamEventsForwardAsync method reads the requested number of events in the order in which they were originally written to the stream from a nominated starting point in the stream.
long start - The earliest event to read (inclusive). 
553/8For the special case of the start of the stream, you should use the constant StreamPosition.Start.
var streamEvents = new List<ResolvedEvent>();

StreamEventsSlice currentSlice;
var nextSliceStart = StreamPosition.Start;
    currentSlice =
    _eventStoreConnection.ReadStreamEventsForward("myStream", nextSliceStart,
                                                  200, false)

    nextSliceStart = currentSlice.NextEventNumber;

} while (!currentSlice.IsEndOfStream);

Task<AllEventsSlice> ReadAllEventsForwardAsync(Position position, int maxCount, bool resolveLinkTos);
Read all events
Event Store allows you to read events across all streams using the ReadAllEventsForwardAsync and ReadAllEventsBackwardsAsync methods. 
These work in the same way as the regular read methods, but use an instance of the global log file Position to reference events rather than the simple integer stream position described previously.
They also return an AllEventsSlice rather than a StreamEventsSlice which is the same except it uses global Positions rather than stream positions.
var allEvents = new List<ResolvedEvent>();

AllEventsSlice currentSlice;
var nextSliceStart = Position.Start;

    currentSlice =
        connection.ReadAllEventsForwardAsync(nextSliceStart, 200, false).Result;

    nextSliceStart = currentSlice.NextPosition;

} while (!currentSlice.IsEndOfStream);

## Deletes 

Have a stream of persistent subscriptions  
When a delete operation is made it will just append a message to the "delete stream" stream  
A background process will be notified, or will poll, that a delete operation was requested and perform the following
- it will delete the entire stream (range delete)
- iterate over the all stream deleting events
- update the persistent subscription

All of this should be in a single transaction?    
Only one delete / clean up thread?  
Do we need to checkpoint where the thread is? Would be helpful in the scenarios in which the thread dies  
If multiple deletes occur should we do them both at the same time to avoid iterating over the all stream more than we have to?

## Random Research

Links to docs, research, projects that have inspired design or implementation


//    AllEventSlice takes in ClientMessage.ResolvedEvent which is converted to ClientAPI.ResolvedEvent which takes in either a ClientMessage.ResolvedEvent or ClientMessage.ResolvedIndexedEvent
//    From ClientApi.ResolvedEvent
//    /// <summary>
//    /// The logical position of the <see cref="OriginalEvent"/>.
//    /// </summary>
//    public readonly Position? OriginalPosition;
//    /// <summary>
//    /// The stream name of the <see cref="OriginalEvent" />.
//    /// </summary>
//    public string OriginalStreamId { get { return OriginalEvent.EventStreamId; } }
//    /// <summary>
//    /// The event number in the stream of the <see cref="OriginalEvent"/>.
//    /// </summary>
//    public long OriginalEventNumber { get { return OriginalEvent.EventNumber; } }

Range r1 = streamSubspace.range();
Range r2 = streamSubspace.range(Tuple.from());
Range r3 = streamSubspace.range(Tuple.from(intToBytes(fromVersionInclusive), intToBytes(Integer.MAX_VALUE)));
Range r4 = streamSubspace.range(Tuple.from(intToBytes(fromVersionInclusive)));
Range r5 = streamSubspace.range(Tuple.from(fromVersionInclusive));
Range r6 = getStreamSubspace(streamHash.toString(), fromVersionInclusive).range();

KeySelector begin = new KeySelector(r1.begin,true, rangeCount);
KeySelector end = new KeySelector(r1.end,true, rangeCount);
KeySelector n_begin = new KeySelector(begin.getKey(),true, begin.getOffset());
//AsyncIterable<KeyValue> r = tr.getRange(n_begin, end, rangeCount, reverse, StreamingMode.WANT_ALL);

// List<KeyValue> classes = tr.getRange(Tuple.from("attends", s).range()).asList().join();
//AsyncIterable<KeyValue> r = tr.getRange(r1, rangeCount, reverse, StreamingMode.WANT_ALL);
//AsyncIterable<KeyValue> r = tr.getRange(range, rangeCount, reverse, StreamingMode.WANT_ALL);
//AsyncIterable<KeyValue> r = tr.getRange(r3, rangeCount, reverse, StreamingMode.WANT_ALL);
//AsyncIterable<KeyValue> r = tr.getRange(r4, rangeCount, reverse, StreamingMode.WANT_ALL);
//AsyncIterable<KeyValue> r = tr.getRange(r5, rangeCount, reverse, StreamingMode.WANT_ALL);
//AsyncIterable<KeyValue> r = tr.getRange(r6, rangeCount, reverse, StreamingMode.WANT_ALL);
//AsyncIterable<KeyValue> r = tr.getRange(Tuple.from(streamSubspace, fromVersionInclusive).range(), rangeCount, reverse, StreamingMode.WANT_ALL);

//    public CompletableFuture<Versionstamp> getCurVersionStamp(ReadTransaction tr) {
//        AsyncIterator<KeyValue> iterator = tr.getRange(esSubspace.range(), /* limit = */ 1, /* reverse = */ true).iterator();
//        return iterator.onHasNext().thenApply(hasAny -> {
//            if (hasAny) {
//                // Get the last element from the log subspace and parse out the versionstamp
//                KeyValue kv =;
//                return Tuple.fromBytes(kv.getKey()).getVersionstamp(0);
//            } else {
//                // Log subspace is empty
//                return null; // or a versionstamp of all zeroes if you prefer
//            }
//        });
//    }

//    Construct a versionstamp from your transaction's read version
//    This makes use of the fact that the first 8 bytes of a versionstamp are the commit version of the data associated with a record.
//    Therefore, if you know the read version of the transaction, you also know that all data in your log subspace at version v will be prefixed by a version less than or equal to v and all future data added later will be prefixed with a version greater than v.
//    So you can do something like:
//    public CompletableFuture<Versionstamp> getCurVersionStamp(ReadTransaction tr) {
//        return tr.getReadVersion().thenApply(readVersion ->
//            Versionstamp.fromBytes(ByteBuffer.allocate(Versionstamp.LENGTH)
//                .order(ByteOrder.BIG_ENDIAN)
//                .putLong(readVersion)
//                .putInt(0xffffffff)
//                .array())
//        );
//    }