Chapter 3.1: Commands and the Macro System
This chapter explores how EventCore’s command system works, focusing on the #[derive(Command)]
macro that eliminates boilerplate while maintaining type safety.
The Command Pattern
Commands in EventCore represent user intentions - things that should happen in your system. They:
- Declare required streams - What data they need access to
- Validate business rules - Ensure operations are allowed
- Generate events - Record what actually happened
- Maintain consistency - All changes are atomic
Anatomy of a Command
Let’s dissect a command to understand each part:
#![allow(unused)] fn main() { #[derive(Command, Clone)] // 1. Derive macro generates boilerplate struct TransferMoney { #[stream] // 2. Declares this field is a stream from_account: StreamId, #[stream] to_account: StreamId, amount: Money, // 3. Regular fields for command data reference: String, } }
What the Macro Generates
The #[derive(Command)]
macro generates several things:
#![allow(unused)] fn main() { // 1. A phantom type for compile-time stream tracking #[derive(Debug, Clone, Copy, Default)] pub struct TransferMoneyStreamSet; // 2. Implementation of CommandStreams trait impl CommandStreams for TransferMoney { type StreamSet = TransferMoneyStreamSet; fn read_streams(&self) -> Vec<StreamId> { vec![ self.from_account.clone(), self.to_account.clone(), ] } } // 3. Blanket implementation gives you Command trait // (because TransferMoney also implements CommandLogic) }
The Two-Trait Design
EventCore splits the Command pattern into two traits:
CommandStreams (Generated)
Handles infrastructure concerns:
#![allow(unused)] fn main() { pub trait CommandStreams: Send + Sync + Clone { /// Phantom type for compile-time stream access control type StreamSet: Send + Sync; /// Returns the streams this command needs to read fn read_streams(&self) -> Vec<StreamId>; } }
CommandLogic (You Implement)
Contains your domain logic:
#![allow(unused)] fn main() { #[async_trait] pub trait CommandLogic: CommandStreams { /// State type that will be reconstructed from events type State: Default + Send + Sync; /// Event type this command produces type Event: Send + Sync; /// Apply an event to update state (event sourcing fold) fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>); /// Business logic that validates and produces events async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, state: Self::State, stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>>; } }
Stream Declaration Patterns
Basic Stream Declaration
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct UpdateProfile { #[stream] user_id: StreamId, // Single stream } }
Multiple Streams
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct ProcessOrder { #[stream] order_id: StreamId, #[stream] customer_id: StreamId, #[stream] inventory_id: StreamId, #[stream] payment_id: StreamId, } }
Stream Arrays (Planned Feature)
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct BulkUpdate { #[stream("items")] item_ids: Vec<StreamId>, // Multiple streams of same type } }
Conditional Streams
For streams discovered at runtime:
#![allow(unused)] fn main() { async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, state: Self::State, stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { // Discover we need another stream based on state if state.requires_approval { let approver_stream = StreamId::from_static("approver-stream"); stream_resolver.add_streams(vec![approver_stream]); // EventCore will re-execute with the additional stream } // Continue with logic... } }
Type-Safe Stream Access
The ReadStreams
type ensures you can only write to declared streams:
#![allow(unused)] fn main() { // In your handle method: async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, state: Self::State, _stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { // ✅ This works - from_account was declared with #[stream] let withdraw_event = StreamWrite::new( &read_streams, self.from_account.clone(), BankEvent::MoneyWithdrawn { amount: self.amount } )?; // ❌ This won't compile - random_stream wasn't declared let invalid = StreamWrite::new( &read_streams, StreamId::from_static("random-stream"), SomeEvent {} )?; // Compile error! Ok(vec![withdraw_event]) } }
State Reconstruction
The apply
method builds state by folding events:
#![allow(unused)] fn main() { fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>) { match &event.payload { BankEvent::AccountOpened { balance, .. } => { state.exists = true; state.balance = *balance; } BankEvent::MoneyDeposited { amount, .. } => { state.balance += amount; } BankEvent::MoneyWithdrawn { amount, .. } => { state.balance = state.balance.saturating_sub(*amount); } } } }
This is called for each event in sequence to rebuild current state.
Command Validation Patterns
Using the require!
Macro
#![allow(unused)] fn main() { async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, state: Self::State, _stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { // Business rule validation with good error messages require!( state.balance >= self.amount, "Insufficient funds: balance={}, requested={}", state.balance, self.amount ); require!( self.amount > 0, "Transfer amount must be positive" ); require!( self.from_account != self.to_account, "Cannot transfer to same account" ); // Generate events after validation passes Ok(vec![/* events */]) } }
Custom Validation Functions
#![allow(unused)] fn main() { impl TransferMoney { fn validate_transfer_limits(&self, state: &AccountState) -> CommandResult<()> { const DAILY_LIMIT: u64 = 10_000; let daily_total = state.transfers_today + self.amount; require!( daily_total <= DAILY_LIMIT, "Daily transfer limit exceeded: {} > {}", daily_total, DAILY_LIMIT ); Ok(()) } } }
Advanced Macro Features
Custom Stream Names
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct ComplexCommand { #[stream(name = "primary")] main_stream: StreamId, #[stream(name = "secondary", optional = true)] optional_stream: Option<StreamId>, } }
Computed Streams
#![allow(unused)] fn main() { impl ComplexCommand { fn compute_streams(&self) -> Vec<StreamId> { let mut streams = vec![self.main_stream.clone()]; if let Some(ref optional) = self.optional_stream { streams.push(optional.clone()); } streams } } }
Command Composition
Commands can be composed for complex operations:
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct CompleteOrderWorkflow { #[stream] order_id: StreamId, // Sub-commands to execute payment: ProcessPayment, fulfillment: FulfillOrder, notification: SendNotification, } impl CommandLogic for CompleteOrderWorkflow { // ... implementation delegates to sub-commands } }
Performance Optimizations
Pre-computed State
For expensive computations:
#![allow(unused)] fn main() { #[derive(Default)] struct PrecomputedState { balance: u64, transaction_count: u64, daily_totals: HashMap<Date, u64>, // Pre-aggregated } fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>) { // Update pre-computed values incrementally match &event.payload { BankEvent::MoneyTransferred { amount, date, .. } => { state.balance -= amount; *state.daily_totals.entry(*date).or_insert(0) += amount; } // ... } } }
Lazy State Loading
For large states:
#![allow(unused)] fn main() { struct LazyState { core: AccountCore, // Always loaded history: Option<Box<TransactionHistory>>, // Load on demand } async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, mut state: Self::State, _stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { // Load history only if needed if self.requires_history_check() { state.load_history().await?; } // Continue... } }
Testing Commands
Unit Testing
#![allow(unused)] fn main() { #[test] fn test_command_stream_declaration() { let cmd = TransferMoney { from_account: StreamId::from_static("account-1"), to_account: StreamId::from_static("account-2"), amount: 100, reference: "test".to_string(), }; let streams = cmd.read_streams(); assert_eq!(streams.len(), 2); assert!(streams.contains(&StreamId::from_static("account-1"))); assert!(streams.contains(&StreamId::from_static("account-2"))); } }
Testing State Reconstruction
#![allow(unused)] fn main() { #[test] fn test_apply_events() { let cmd = TransferMoney { /* ... */ }; let mut state = AccountState::default(); let event = create_test_event(BankEvent::AccountOpened { balance: 1000, owner: "alice".to_string(), }); cmd.apply(&mut state, &event); assert_eq!(state.balance, 1000); assert!(state.exists); } }
Common Patterns
Idempotent Commands
Make commands idempotent by checking for duplicate operations:
#![allow(unused)] fn main() { async fn handle(/* ... */) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { // Check if operation was already performed if state.transfers.contains(&self.reference) { // Already processed - return success with no new events return Ok(vec![]); } // Process normally... } }
Command Versioning
Handle command evolution:
#![allow(unused)] fn main() { #[derive(Command, Clone)] #[command(version = 2)] struct TransferMoneyV2 { #[stream] from_account: StreamId, #[stream] to_account: StreamId, amount: Money, reference: String, // New in V2 category: TransferCategory, } }
Summary
The EventCore command system provides:
- ✅ Zero boilerplate through
#[derive(Command)]
- ✅ Type-safe stream access preventing invalid writes
- ✅ Clear separation between infrastructure and domain logic
- ✅ Flexible validation with the
require!
macro - ✅ Extensibility through the two-trait design
Key takeaways:
- Use
#[derive(Command)]
to eliminate boilerplate - Declare streams with
#[stream]
attributes - Implement business logic in
CommandLogic
- Leverage type safety for compile-time guarantees
- Commands are just data - easy to test and reason about
Next, let’s explore Events and Event Stores →