Chapter 3.2: Events and Event Stores
Events are the heart of EventCore - immutable records of things that happened in your system. This chapter explores event design, storage, and the guarantees EventCore provides.
What Makes a Good Event?
Events should be:
- Past Tense - They record what happened, not what should happen
- Immutable - Once written, events never change
- Self-Contained - Include all necessary data
- Business-Focused - Represent domain concepts, not technical details
Event Design Principles
#![allow(unused)] fn main() { // ❌ Bad: Technical focus, present tense, missing context #[derive(Serialize, Deserialize)] struct UpdateUser { id: String, data: HashMap<String, Value>, } // ✅ Good: Business focus, past tense, complete information #[derive(Serialize, Deserialize)] struct CustomerEmailChanged { customer_id: CustomerId, old_email: Email, new_email: Email, changed_by: UserId, changed_at: DateTime<Utc>, reason: EmailChangeReason, } }
Event Structure in EventCore
Core Event Types
#![allow(unused)] fn main() { /// Your domain event #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OrderShipped { pub order_id: OrderId, pub tracking_number: TrackingNumber, pub carrier: Carrier, pub shipped_at: DateTime<Utc>, } /// Event ready to be written pub struct EventToWrite<E> { pub stream_id: StreamId, pub payload: E, pub metadata: Option<EventMetadata>, pub expected_version: ExpectedVersion, } /// Event as stored in the event store pub struct StoredEvent<E> { pub id: EventId, // UUIDv7 for global ordering pub stream_id: StreamId, // Which stream this belongs to pub version: EventVersion, // Position in the stream pub payload: E, // Your domain event pub metadata: EventMetadata, // Who, when, why pub occurred_at: DateTime<Utc>, // When it happened } }
Event IDs and Ordering
EventCore uses UUIDv7 for event IDs, providing:
#![allow(unused)] fn main() { // UUIDv7 properties: // - Globally unique // - Time-ordered (sortable) // - Millisecond precision timestamp // - No coordination required let event1 = EventId::new(); let event2 = EventId::new(); // Later events have higher IDs assert!(event2 > event1); // Extract timestamp let timestamp = event1.timestamp(); }
Event Metadata
Every event carries metadata for auditing and debugging:
#![allow(unused)] fn main() { pub struct EventMetadata { /// Who triggered this event pub user_id: Option<UserId>, /// Correlation ID for tracking across services pub correlation_id: CorrelationId, /// What caused this event (previous event ID) pub causation_id: Option<CausationId>, /// Custom metadata pub custom: HashMap<String, Value>, } // Building metadata let metadata = EventMetadata::new() .with_user_id(UserId::from("alice@example.com")) .with_correlation_id(CorrelationId::new()) .caused_by(&previous_event) .with_custom("ip_address", "192.168.1.1") .with_custom("user_agent", "MyApp/1.0"); }
Event Store Abstraction
EventCore defines a trait that storage adapters implement:
#![allow(unused)] fn main() { #[async_trait] pub trait EventStore: Send + Sync { type Event: Send + Sync; type Error: Error + Send + Sync; /// Read events from a specific stream async fn read_stream( &self, stream_id: &StreamId, options: ReadOptions, ) -> Result<StreamEvents<Self::Event>, Self::Error>; /// Read events from multiple streams async fn read_streams( &self, stream_ids: &[StreamId], options: ReadOptions, ) -> Result<Vec<StreamEvents<Self::Event>>, Self::Error>; /// Write events atomically to multiple streams async fn write_events( &self, events: Vec<EventToWrite<Self::Event>>, ) -> Result<WriteResult, Self::Error>; /// Subscribe to real-time events async fn subscribe( &self, options: SubscriptionOptions, ) -> Result<Box<dyn EventSubscription<Self::Event>>, Self::Error>; } }
Stream Versioning
Streams maintain version numbers for optimistic concurrency:
#![allow(unused)] fn main() { pub struct StreamEvents<E> { pub stream_id: StreamId, pub version: EventVersion, // Current version after these events pub events: Vec<StoredEvent<E>>, } // Version control options pub enum ExpectedVersion { /// Stream must not exist NoStream, /// Stream must be at this exact version Exact(EventVersion), /// Stream must exist but any version is OK Any, /// No version check (dangerous!) NoCheck, } }
Using Version Control
#![allow(unused)] fn main() { // First write - stream shouldn't exist let first_event = EventToWrite { stream_id: stream_id.clone(), payload: AccountOpened { /* ... */ }, metadata: None, expected_version: ExpectedVersion::NoStream, }; // Subsequent writes - check version let next_event = EventToWrite { stream_id: stream_id.clone(), payload: MoneyDeposited { /* ... */ }, metadata: None, expected_version: ExpectedVersion::Exact(EventVersion::new(1)), }; }
Storage Adapters
PostgreSQL Adapter
The production-ready adapter with ACID guarantees:
#![allow(unused)] fn main() { use eventcore_postgres::{PostgresEventStore, PostgresConfig}; let config = PostgresConfig::new("postgresql://localhost/eventcore") .with_pool_size(20) .with_schema("eventcore"); let event_store = PostgresEventStore::new(config).await?; // Initialize schema (one time) event_store.initialize().await?; }
PostgreSQL schema:
-- Events table with optimal indexing
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_uuidv7(),
stream_id VARCHAR(255) NOT NULL,
version BIGINT NOT NULL,
event_type VARCHAR(255) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
-- Ensure stream version uniqueness
UNIQUE(stream_id, version),
-- Indexes for common queries
INDEX idx_stream_id (stream_id),
INDEX idx_occurred_at (occurred_at),
INDEX idx_event_type (event_type)
);
In-Memory Adapter
Perfect for testing and development:
#![allow(unused)] fn main() { use eventcore_memory::InMemoryEventStore; let event_store = InMemoryEventStore::<MyEvent>::new(); // Optionally add chaos for testing let chaotic_store = event_store .with_chaos(ChaosConfig { failure_probability: 0.1, // 10% chance of failure latency_ms: Some(50..200), // Random latency }); }
Event Design Patterns
Event Granularity
Choose the right level of detail:
#![allow(unused)] fn main() { // ❌ Too coarse - loses important details struct OrderUpdated { order_id: OrderId, new_state: OrderState, // What actually changed? } // ❌ Too fine - creates event spam struct OrderFieldUpdated { order_id: OrderId, field_name: String, old_value: Value, new_value: Value, } // ✅ Just right - meaningful business events enum OrderEvent { OrderPlaced { customer: CustomerId, items: Vec<Item> }, PaymentReceived { amount: Money, method: PaymentMethod }, OrderShipped { tracking: TrackingNumber }, OrderDelivered { signed_by: String }, } }
Event Evolution
Design events to evolve gracefully:
#![allow(unused)] fn main() { // Version 1 #[derive(Serialize, Deserialize)] struct UserRegistered { user_id: UserId, email: Email, } // Version 2 - Added field with default #[derive(Serialize, Deserialize)] struct UserRegistered { user_id: UserId, email: Email, #[serde(default)] referral_code: Option<String>, // New field } // Version 3 - Structural change #[derive(Serialize, Deserialize)] #[serde(tag = "version")] enum UserRegisteredVersioned { #[serde(rename = "1")] V1 { user_id: UserId, email: Email }, #[serde(rename = "2")] V2 { user_id: UserId, email: Email, referral_code: Option<String>, }, #[serde(rename = "3")] V3 { user_id: UserId, email: Email, referral: Option<ReferralInfo>, // Richer type }, } }
Event Enrichment
Add context to events:
#![allow(unused)] fn main() { trait EventEnricher { fn enrich<E>(&self, event: E) -> EnrichedEvent<E>; } struct EnrichedEvent<E> { pub event: E, pub context: EventContext, } struct EventContext { pub session_id: SessionId, pub request_id: RequestId, pub feature_flags: HashMap<String, bool>, pub environment: Environment, } }
Querying Events
Read Options
Control how events are read:
#![allow(unused)] fn main() { let options = ReadOptions::default() .from_version(EventVersion::new(10)) // Start from version 10 .to_version(EventVersion::new(20)) // Up to version 20 .max_events(100) // Limit results .backwards(); // Read in reverse let events = event_store .read_stream(&stream_id, options) .await?; }
Reading Multiple Streams
For multi-stream operations:
#![allow(unused)] fn main() { let stream_ids = vec![ StreamId::from_static("order-123"), StreamId::from_static("inventory-abc"), StreamId::from_static("payment-xyz"), ]; let all_events = event_store .read_streams(&stream_ids, ReadOptions::default()) .await?; // Events from all streams, ordered by EventId (time) }
Global Event Feed
Read all events across all streams:
#![allow(unused)] fn main() { let all_events = event_store .read_all_events( ReadOptions::default() .after(last_known_event_id) // For pagination .max_events(1000) ) .await?; }
Event Store Guarantees
1. Atomicity
All events in a write operation succeed or fail together:
#![allow(unused)] fn main() { let events = vec![ EventToWrite { /* withdraw from account A */ }, EventToWrite { /* deposit to account B */ }, ]; // Both events written atomically event_store.write_events(events).await?; }
2. Consistency
Version checks prevent conflicting writes:
#![allow(unused)] fn main() { // Two concurrent commands read version 5 let command1_events = vec![/* ... */]; let command2_events = vec![/* ... */]; // First write succeeds event_store.write_events(command1_events).await?; // OK // Second write fails - version conflict event_store.write_events(command2_events).await?; // Error: Version conflict }
3. Durability
Events are persisted before returning success:
#![allow(unused)] fn main() { // After this returns, events are durable let result = event_store.write_events(events).await?; // Even if the process crashes, events are safe }
4. Ordering
Events maintain both stream order and global order:
#![allow(unused)] fn main() { // Stream order: version within a stream stream_events.events[0].version < stream_events.events[1].version // Global order: EventId across all streams all_events[0].id < all_events[1].id }
Performance Optimization
Batch Writing
Write multiple events efficiently:
#![allow(unused)] fn main() { // Batch events for better performance let mut batch = Vec::with_capacity(1000); for item in large_dataset { batch.push(EventToWrite { stream_id: compute_stream_id(&item), payload: process_item(item), metadata: None, expected_version: ExpectedVersion::Any, }); // Write in batches if batch.len() >= 100 { event_store.write_events(batch.drain(..).collect()).await?; } } // Write remaining if !batch.is_empty() { event_store.write_events(batch).await?; } }
Stream Partitioning
Distribute load across streams:
#![allow(unused)] fn main() { // Instead of one hot stream let stream_id = StreamId::from_static("orders"); // Partition by hash let stream_id = StreamId::from_static(&format!( "orders-{}", order_id.hash() % 16 // 16 partitions )); }
Caching Strategies
Cache recent events for read performance:
#![allow(unused)] fn main() { struct CachedEventStore<ES: EventStore> { inner: ES, cache: Arc<RwLock<LruCache<StreamId, StreamEvents<ES::Event>>>>, } impl<ES: EventStore> CachedEventStore<ES> { async fn read_stream_cached( &self, stream_id: &StreamId, options: ReadOptions, ) -> Result<StreamEvents<ES::Event>, ES::Error> { // Check cache first if options.is_from_start() { if let Some(cached) = self.cache.read().await.get(stream_id) { return Ok(cached.clone()); } } // Read from store let events = self.inner.read_stream(stream_id, options).await?; // Update cache self.cache.write().await.insert(stream_id.clone(), events.clone()); Ok(events) } } }
Testing with Events
Event Fixtures
Create test events easily:
#![allow(unused)] fn main() { use eventcore::testing::builders::*; fn create_account_opened_event() -> StoredEvent<BankEvent> { StoredEventBuilder::new() .with_stream_id(StreamId::from_static("account-123")) .with_version(EventVersion::new(1)) .with_payload(BankEvent::AccountOpened { owner: "Alice".to_string(), initial_balance: 1000, }) .with_metadata( EventMetadataBuilder::new() .with_user_id(UserId::from("alice@example.com")) .build() ) .build() } }
Event Assertions
Test event properties:
#![allow(unused)] fn main() { use eventcore::testing::assertions::*; #[test] fn test_events_are_ordered() { let events = vec![/* ... */]; assert_events_ordered(&events); assert_unique_event_ids(&events); assert_stream_version_progression(&events, &stream_id); } }
Summary
Events in EventCore are:
- ✅ Immutable records of business facts
- ✅ Time-ordered with UUIDv7 IDs
- ✅ Version-controlled for consistency
- ✅ Atomically written across streams
- ✅ Rich with metadata for auditing
Best practices:
- Design events around business concepts
- Include all necessary data in events
- Plan for event evolution
- Use version control for consistency
- Optimize storage with partitioning
Next, let’s explore State Reconstruction →