This is a heavily modified version of the original node-eventstore by Adriano Raiano.
The project goal is to provide an eventstore implementation for node.js:
- load and store events via EventStream object
- event dispatching to your publisher (optional)
- supported Dbs (inmemory, mongodb,
redis, tingodb, elasticsearch, azuretable, dynamodb) - snapshot support
- query your events
npm install @avanzu/eventstore
var eventstore = require('@avanzu/eventstore')
var es = eventstore()
By default the eventstore will use an inmemory Storage.
For logging and debugging you can use debug by TJ Holowaychuk
simply run your process with
DEBUG=@avanzu/eventstore/* node app.js
example with mongodb:
var es = require('@avanzu/eventstore')({
type: 'mongodb',
host: 'localhost', // optional
port: 27017, // optional
dbName: 'eventstore', // optional
eventsCollectionName: 'events', // optional
snapshotsCollectionName: 'snapshots', // optional
transactionsCollectionName: 'transactions', // optional
timeout: 10000, // optional
// emitStoreEvents: true // optional, by default no store events are emitted
// maxSnapshotsCount: 3 // optional, defaultly will keep all snapshots
// authSource: 'authedicationDatabase' // optional
// username: 'technicalDbUser' // optional
// password: 'secret' // optional
// url: 'mongodb://user:pass@host:port/db?opts // optional
// positionsCollectionName: 'positions' // optional, defaultly wont keep position
})
es.on('connect', function () {
console.log('storage connected')
})
es.on('disconnect', function () {
console.log('connection to storage is gone')
})
Define which values should be mapped/copied to the payload event.
es.defineEventMappings({
id: 'id',
commitId: 'commitId',
commitSequence: 'commitSequence',
commitStamp: 'commitStamp',
streamRevision: 'streamRevision',
})
await es.init()
const { events } = await es.getEventStream({ query: 'streamId' })
or
const { events } = await es.getEventStream({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
})
'streamId' and 'aggregateId' are the same... In ddd terms aggregate and context are just to be more precise in language. For example you can have a 'person' aggregate in the context 'human ressources' and a 'person' aggregate in the context of 'business contracts'... So you can have 2 complete different aggregate instances of 2 complete different aggregates (but perhaps with same name) in 2 complete different contexts
you can request an eventstream even by limit the query with a 'minimum revision number' and a 'maximum revision number'
const { events } = await es.getEventStream({
query:
'streamId' ||
{
/* query */
},
revMin: 5,
revMax: 8,
})
store a new event and commit it to store
const stream = await es.getEventStream({ query: 'streamId' })
stream.addEvent({ my: 'event' })
stream.addEvents([{ my: 'event2' }])
await stream.commit()
console.log(stream.eventsToDispatch)
if you defined an event publisher function the committed event will be dispatched to the provided publisher
if you just want to load the last event as stream you can call getLastEventAsStream instead of ´getEventStream´.
get snapshot and eventhistory from the snapshot point
const [snapshot, stream] = await es.getFromSnapshot({ query: 'streamId' })
or
const [snapshot, stream] = await es.getFromSnapshot({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
})
you can request a snapshot and an eventstream even by limit the query with a 'maximum revision number'
const [snapshot, stream] = es.getFromSnapshot({
query:
'streamId' ||
{
/* query */
},
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
create a snapshot point
const [snapshot, stream] = await es.getFromSnapshot('streamId')
const snap = snapshot.data
const history = stream.events
// create a new snapshot depending on your rules
if (history.length > myLimit) {
await es.createSnapshot({
streamId: 'streamId',
data: myAggregate.getSnap(),
revision: stream.lastRevision,
version: 1 // optional
});
// or
await es.createSnapshot({
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr' // optional
data: myAggregate.getSnap(),
revision: stream.lastRevision,
version: 1 // optional
});
}
// go on: store new event and commit it
// stream.addEvents...
You can automatically clean older snapshots by configuring the number of snapshots to keep with maxSnapshotsCount
in eventstore
options.
const evts = await es.getUndispatchedEvents()
currently supported by:
- mongodb
You can delete an aggregate including the event history, snapshots and transactions by calling deleteStream
.
const deletedStream = await es.deleteStream('myStreamId')
The return value is the EventStream
that has just been deleted.
This stream will contain an undispatched TombstoneEvent
ready to be processed.
The payload
attribute of that event contains the complete event history.
const [tombstoneEvent] = deletedStream.eventsToDispatch
for replaying your events or for rebuilding a viewmodel or just for fun...
skip, limit always optional
var skip = 0,
limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end
const events = await es.getEvents({ skip, limit })
// or
const events = await es.getEvents({ query: 'streamId', skip, limit })
// or
const events = await es.getEvents({
query: {
// free choice (all, only context, only aggregate, only aggregateId...)
context: 'hr',
aggregate: 'person',
aggregateId: 'uuid',
},
skip,
limit,
})
by revision
revMin, revMax always optional
const events = await es.getEventsByRevision({
query: 'streamId',
revMin: 5,
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
// or
const events = await es.getEventsByRevision({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person', // optional
context: 'hr', // optional
},
revMin: 5,
revMax: 8, // if you omit revMax or you define it as -1 it will retrieve until the end
})
by commitStamp
skip, limit always optional
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
skip: 10,
limit: 100, // if you omit limit or you define it as -1 it will retrieve until the end
})
// or
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
limit: 50,
})
// or
const events = await es.getEventsSince({
commitStamp: new Date(2015, 5, 23),
})
Some databases support streaming your events, the api is similar to the query one
skip, limit always optional
var skip = 0,
limit = 100 // if you omit limit or you define it as -1 it will retrieve until the end
var stream = es.streamEvents({ skip, limit })
// or
var stream = es.streamEvents({ query: 'streamId', skip, limit })
// or by commitstamp
var stream = es.streamEventsSince({ commitStamp: new Date(2015, 5, 23), skip, limit })
// or by revision
var stream = es.streamEventsByRevision({
query: {
aggregateId: 'myAggregateId',
aggregate: 'person',
context: 'hr',
},
})
stream.on('data', function (e) {
doSomethingWithEvent(e)
})
stream.on('end', function () {
console.log('no more evets')
})
// or even better
stream.pipe(myWritableStream)
currently supported by:
- mongodb (driver version <= 4.0.0)
for example to obtain the last revision nr
const event = await es.getLastEvent('streamId')
// or
const event = await es.getLastEvent({
// free choice (all, only context, only aggregate, only aggregateId...)
context: 'hr',
aggregate: 'person',
aggregateId: 'uuid',
})
const id = await es.getNewId()
some db implementations support writing the position of the event in the whole store additional to the streamRevision.
currently those implementations support this:
- inmemory ( by setting ``trackPosition` option )
- mongodb ( by setting
positionsCollectionName
option)
Inserting multiple events (documents) in mongodb, is not atomic.
For the eventstore tries to repair itself when calling getEventsByRevision
.
But if you want you can trigger this from outside:
const [firstTransaction] = await es.store.getPendingTransactions()
const lastEvent = await es.store.getLastEvent({
aggregateId: firstTransaction.aggregateId,
aggregate: firstTransaction.aggregate, // optional
context: firstTransaction.context, // optional
})
await es.store.repairFailedTransaction(lastEvent)
Starting from version 2.0.0 the eventstore does not longer support multiple positional arguments. Instead, you have to pass in a params object. The general idea, that you only have to specify the arguments that deviate from the defaults remains.
Please refer to the following table to see how the signatures have changed
1.x.x | 2.x.x |
---|---|
streamEvents(query, skip, limit) |
streamEvents({query, skip, limit}) |
streamEventsSince(commitStamp, skip, limit) |
streamEvents({commitStamp, skip, limit}) |
streamEventsSince(commitStamp, skip, limit) |
streamEventsSince({commitStamp, skip, limit}) |
streamEventsByRevision(query, revMin, revMax) |
streamEventsByRevision({query, revMin, revMax}) |
getEvents(query, skip, limit) |
getEvents({query, skip, limit}) |
getEventsSince(commitStamp, skip, limit) |
getEventsSince({commitStamp, skip, limit}) |
getEventsByRevision(query, revMin, revMax) |
getEventsByRevision({query, revMin, revMax}) |
getEventStream(query, revMin, revMax) |
getEventStream({query, revMin, revMax}) |
getFromSnapshot(query, revMax) |
getFromSnapshot({query, revMax}) |
- Jonathan Oliver's EventStore for .net.
Currently these databases are supported:
- inmemory
- mongodb (node-mongodb-native)
redis (redis)tingodb (tingodb)azuretable (azure-storage)dynamodb (aws-sdk)
You can use your own db implementation by extending this...
var Store = require('@avanzu/eventstore').Store,
util = require('util'),
_ = require('lodash')
class MyDB extends Store {
constructor(options) {
super(options)
}
}
module.exports = MyDB
and you can use it in this way
var es = require('@avanzu/eventstore')({
type: MyDB,
})
// es.init...