Chapter 3.1: Commands and the Macro System

This chapter explores how EventCore’s command system works, focusing on the #[derive(Command)] macro that eliminates boilerplate while maintaining type safety.

The Command Pattern

Commands in EventCore represent user intentions - things that should happen in your system. They:

  1. Declare required streams - What data they need access to
  2. Validate business rules - Ensure operations are allowed
  3. Generate events - Record what actually happened
  4. Maintain consistency - All changes are atomic

Anatomy of a Command

Let’s dissect a command to understand each part:

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]         // 1. Derive macro generates boilerplate
struct TransferMoney {
    #[stream]                     // 2. Declares this field is a stream
    from_account: StreamId,       
    
    #[stream] 
    to_account: StreamId,
    
    amount: Money,                // 3. Regular fields for command data
    reference: String,
}
}

What the Macro Generates

The #[derive(Command)] macro generates several things:

#![allow(unused)]
fn main() {
// 1. A phantom type for compile-time stream tracking
#[derive(Debug, Clone, Copy, Default)]
pub struct TransferMoneyStreamSet;

// 2. Implementation of CommandStreams trait
impl CommandStreams for TransferMoney {
    type StreamSet = TransferMoneyStreamSet;
    
    fn read_streams(&self) -> Vec<StreamId> {
        vec![
            self.from_account.clone(),
            self.to_account.clone(),
        ]
    }
}

// 3. Blanket implementation gives you Command trait
// (because TransferMoney also implements CommandLogic)
}

The Two-Trait Design

EventCore splits the Command pattern into two traits:

CommandStreams (Generated)

Handles infrastructure concerns:

#![allow(unused)]
fn main() {
pub trait CommandStreams: Send + Sync + Clone {
    /// Phantom type for compile-time stream access control
    type StreamSet: Send + Sync;
    
    /// Returns the streams this command needs to read
    fn read_streams(&self) -> Vec<StreamId>;
}
}

CommandLogic (You Implement)

Contains your domain logic:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait CommandLogic: CommandStreams {
    /// State type that will be reconstructed from events
    type State: Default + Send + Sync;
    
    /// Event type this command produces
    type Event: Send + Sync;
    
    /// Apply an event to update state (event sourcing fold)
    fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>);
    
    /// Business logic that validates and produces events
    async fn handle(
        &self,
        read_streams: ReadStreams<Self::StreamSet>,
        state: Self::State,
        stream_resolver: &mut StreamResolver,
    ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>>;
}
}

Stream Declaration Patterns

Basic Stream Declaration

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]
struct UpdateProfile {
    #[stream]
    user_id: StreamId,  // Single stream
}
}

Multiple Streams

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]
struct ProcessOrder {
    #[stream]
    order_id: StreamId,
    
    #[stream]
    customer_id: StreamId,
    
    #[stream]
    inventory_id: StreamId,
    
    #[stream]
    payment_id: StreamId,
}
}

Stream Arrays (Planned Feature)

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]
struct BulkUpdate {
    #[stream("items")]
    item_ids: Vec<StreamId>,  // Multiple streams of same type
}
}

Conditional Streams

For streams discovered at runtime:

#![allow(unused)]
fn main() {
async fn handle(
    &self,
    read_streams: ReadStreams<Self::StreamSet>,
    state: Self::State,
    stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
    // Discover we need another stream based on state
    if state.requires_approval {
        let approver_stream = StreamId::from_static("approver-stream");
        stream_resolver.add_streams(vec![approver_stream]);
        // EventCore will re-execute with the additional stream
    }
    
    // Continue with logic...
}
}

Type-Safe Stream Access

The ReadStreams type ensures you can only write to declared streams:

#![allow(unused)]
fn main() {
// In your handle method:
async fn handle(
    &self,
    read_streams: ReadStreams<Self::StreamSet>,
    state: Self::State,
    _stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
    // ✅ This works - from_account was declared with #[stream]
    let withdraw_event = StreamWrite::new(
        &read_streams,
        self.from_account.clone(),
        BankEvent::MoneyWithdrawn { amount: self.amount }
    )?;
    
    // ❌ This won't compile - random_stream wasn't declared
    let invalid = StreamWrite::new(
        &read_streams,
        StreamId::from_static("random-stream"),
        SomeEvent {}
    )?; // Compile error!
    
    Ok(vec![withdraw_event])
}
}

State Reconstruction

The apply method builds state by folding events:

#![allow(unused)]
fn main() {
fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>) {
    match &event.payload {
        BankEvent::AccountOpened { balance, .. } => {
            state.exists = true;
            state.balance = *balance;
        }
        BankEvent::MoneyDeposited { amount, .. } => {
            state.balance += amount;
        }
        BankEvent::MoneyWithdrawn { amount, .. } => {
            state.balance = state.balance.saturating_sub(*amount);
        }
    }
}
}

This is called for each event in sequence to rebuild current state.

Command Validation Patterns

Using the require! Macro

#![allow(unused)]
fn main() {
async fn handle(
    &self,
    read_streams: ReadStreams<Self::StreamSet>,
    state: Self::State,
    _stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
    // Business rule validation with good error messages
    require!(
        state.balance >= self.amount,
        "Insufficient funds: balance={}, requested={}",
        state.balance,
        self.amount
    );
    
    require!(
        self.amount > 0,
        "Transfer amount must be positive"
    );
    
    require!(
        self.from_account != self.to_account,
        "Cannot transfer to same account"
    );
    
    // Generate events after validation passes
    Ok(vec![/* events */])
}
}

Custom Validation Functions

#![allow(unused)]
fn main() {
impl TransferMoney {
    fn validate_transfer_limits(&self, state: &AccountState) -> CommandResult<()> {
        const DAILY_LIMIT: u64 = 10_000;
        
        let daily_total = state.transfers_today + self.amount;
        require!(
            daily_total <= DAILY_LIMIT,
            "Daily transfer limit exceeded: {} > {}",
            daily_total,
            DAILY_LIMIT
        );
        
        Ok(())
    }
}
}

Advanced Macro Features

Custom Stream Names

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]
struct ComplexCommand {
    #[stream(name = "primary")]
    main_stream: StreamId,
    
    #[stream(name = "secondary", optional = true)]
    optional_stream: Option<StreamId>,
}
}

Computed Streams

#![allow(unused)]
fn main() {
impl ComplexCommand {
    fn compute_streams(&self) -> Vec<StreamId> {
        let mut streams = vec![self.main_stream.clone()];
        
        if let Some(ref optional) = self.optional_stream {
            streams.push(optional.clone());
        }
        
        streams
    }
}
}

Command Composition

Commands can be composed for complex operations:

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]
struct CompleteOrderWorkflow {
    #[stream]
    order_id: StreamId,
    
    // Sub-commands to execute
    payment: ProcessPayment,
    fulfillment: FulfillOrder,
    notification: SendNotification,
}

impl CommandLogic for CompleteOrderWorkflow {
    // ... implementation delegates to sub-commands
}
}

Performance Optimizations

Pre-computed State

For expensive computations:

#![allow(unused)]
fn main() {
#[derive(Default)]
struct PrecomputedState {
    balance: u64,
    transaction_count: u64,
    daily_totals: HashMap<Date, u64>,  // Pre-aggregated
}

fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>) {
    // Update pre-computed values incrementally
    match &event.payload {
        BankEvent::MoneyTransferred { amount, date, .. } => {
            state.balance -= amount;
            *state.daily_totals.entry(*date).or_insert(0) += amount;
        }
        // ...
    }
}
}

Lazy State Loading

For large states:

#![allow(unused)]
fn main() {
struct LazyState {
    core: AccountCore,           // Always loaded
    history: Option<Box<TransactionHistory>>,  // Load on demand
}

async fn handle(
    &self,
    read_streams: ReadStreams<Self::StreamSet>,
    mut state: Self::State,
    _stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
    // Load history only if needed
    if self.requires_history_check() {
        state.load_history().await?;
    }
    
    // Continue...
}
}

Testing Commands

Unit Testing

#![allow(unused)]
fn main() {
#[test]
fn test_command_stream_declaration() {
    let cmd = TransferMoney {
        from_account: StreamId::from_static("account-1"),
        to_account: StreamId::from_static("account-2"),
        amount: 100,
        reference: "test".to_string(),
    };
    
    let streams = cmd.read_streams();
    assert_eq!(streams.len(), 2);
    assert!(streams.contains(&StreamId::from_static("account-1")));
    assert!(streams.contains(&StreamId::from_static("account-2")));
}
}

Testing State Reconstruction

#![allow(unused)]
fn main() {
#[test]
fn test_apply_events() {
    let cmd = TransferMoney { /* ... */ };
    let mut state = AccountState::default();
    
    let event = create_test_event(BankEvent::AccountOpened {
        balance: 1000,
        owner: "alice".to_string(),
    });
    
    cmd.apply(&mut state, &event);
    
    assert_eq!(state.balance, 1000);
    assert!(state.exists);
}
}

Common Patterns

Idempotent Commands

Make commands idempotent by checking for duplicate operations:

#![allow(unused)]
fn main() {
async fn handle(/* ... */) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
    // Check if operation was already performed
    if state.transfers.contains(&self.reference) {
        // Already processed - return success with no new events
        return Ok(vec![]);
    }
    
    // Process normally...
}
}

Command Versioning

Handle command evolution:

#![allow(unused)]
fn main() {
#[derive(Command, Clone)]
#[command(version = 2)]
struct TransferMoneyV2 {
    #[stream]
    from_account: StreamId,
    
    #[stream]
    to_account: StreamId,
    
    amount: Money,
    reference: String,
    
    // New in V2
    category: TransferCategory,
}
}

Summary

The EventCore command system provides:

  • Zero boilerplate through #[derive(Command)]
  • Type-safe stream access preventing invalid writes
  • Clear separation between infrastructure and domain logic
  • Flexible validation with the require! macro
  • Extensibility through the two-trait design

Key takeaways:

  1. Use #[derive(Command)] to eliminate boilerplate
  2. Declare streams with #[stream] attributes
  3. Implement business logic in CommandLogic
  4. Leverage type safety for compile-time guarantees
  5. Commands are just data - easy to test and reason about

Next, let’s explore Events and Event Stores