O NEventStore é uma biblioteca de persistência usada para abstrair diferentes implementações de armazenamento utilizando Event Sourcing. É voltado para arquiteturas baseadas em DDD (Domain Driven Design) e CQRS (Command Query Responsibility Segregation). Não se destina a ser um armazenamento de stream.
Para utilizarmos, basta simplesmente criar uma store, semelhante a:
var store = Wireup.Init() .UsingSqlPersistence("ConnectionString to the DB that store the events") .InitializeStorageEngine() .UsingJsonSerialization() .Build();
Com isto temos a nossa store e passamos a dizer que queremos guardar os eventos baseados na Serialização em JSON, até aqui tudo bem.
Eventualmente algures no teu projeto hás de ter algo para escrever os eventos na BD, algo semelhante ao seguinte excerto de código, denominados de Commits pelo NEventStore.
using (store) { using (var stream = store.CreateStream(myMessage.CustomerId)) { stream.Add(new EventMessage { Body = myMessage }); stream.CommitChanges(myMessage.MessageId); } }
Basicamente dizemos à nossa store para colocar como stream o nosso evento que vai Serialized em JSON, e é aqui que começa a residir o problema que vamos ver já a seguir.
Para começares a ler os eventos da BD o NEventStore tem uma opção denominada de Polling Client e, para tal, basta usar algo semelhante a isto:
using (store) { long checkpointToken = LoadCheckpoint(); var client = new PollingClient2(store.Advanced, commit => { Handle(commit); // DO WHAT YOU HAVE TO DO WITH THE COMMIT checkpointToken = commit.CheckpointToken; return PollingClient2.HandlingResult.MoveToNext; }, waitInterval: 3000); client.StartFrom(checkpointToken); client.Stop(); SaveCheckpoint(checkpointToken); }
Neste ponto basicamente somos capazes de ter uma store que escreve eventos na nossa BD bem como os vai ler automaticamente, e passamos esse evento (commit) para o nosso método Handle(commit).
O commit é do tipo ICommit que contem uma coleção de EventMessage que tem uma propriedade Body do tipo object que é onde efetivamente o nosso evento é armazenado.
O problema é que, por detrás, ele guarda o Payload da seguinte forma:
[ { "Headers":{ ... }, "Body":{ "$type":"Events.User.UserAdded, EventSourcing", ... } } ]
Ou seja, suponhamos que tens um evento chamado UserAdded que está no namespace Events.Users e que está no projeto EventSourcing. Se por alguma razão o nome do projeto mudar, ou o namespace, por exemplo por questão de organização e/ou erros de escrita, ou mesmo remover a redundância no nome da classe User, temos um problema.
Isto porque, se por alguma razão precisares reprocessar esse evento, ao correr o Deserialize ele vai procurar o $type e vai procurar a classe UserAdded, no namespace Events.User, que supostamente estaria contido na EventSourcing.dll, através de reflection, mas o problema é que essa classe já não existe naquele namespace ou, provavelmente a DLL mudou de nome, daí temos a exceção: Exception thrown: ‘System.IO.FileNotFoundException’ in System.Private.CoreLib.dll
Para resolver isso, precisas de mudar a tua store para utilizar um custom Serialization:
var store = Wireup.Init() .UsingSqlPersistence("ConnectionString to the DB that store the events") .InitializeStorageEngine() .UsingCustomSerialization(new MyCustomSerialization()) .Build();
Com isso basicamente o que muda é deixas de usar o UsingJsonSerialization e passas a usar o UsingCustomSerialization e fazes a implementação da classe MyCustomSerialization.
public class MyCustomSerialization : ISerialize { private readonly JsonSerializer _typedSerializer; public CustomSerialization() { _typedSerializer = new() { TypeNameHandling = TypeNameHandling.All, DefaultValueHandling = DefaultValueHandling.Ignore, NullValueHandling = NullValueHandling.Ignore }; } public void Serialize<T>(Stream output, T graph) { // DO YOUR LOGIC HERE } public T Deserialize<T>(Stream input) { using var streamReader = new StreamReader(input); var json = streamReader.ReadToEnd(); // now we have a json string we can parse to JObject or handle as a string // and replace for the new content we want return reader is null ? default : (T)_typedSerializer.Deserialize(reader, typeof(T)); } }
Agora podes, por exemplo, criar um strategy que verifique se for um evento antigo faz esse processo de mudar o $type se for um evento novo já com o $type correto podes fazer o Deserialize direto.
E é isso, espero ter ajudado, qualquer dúvida deixa nos comentários.