Chapter 5.4: Distributed Systems
EventCore excels in distributed systems where multiple services need to coordinate while maintaining consistency. This chapter covers patterns for building resilient, scalable distributed event-sourced architectures.
Distributed EventCore Architecture
Service Boundaries
Each service owns its event streams and commands:
#![allow(unused)] fn main() { // User Service #[derive(Command, Clone)] struct CreateUser { #[stream] user_id: StreamId, email: Email, profile: UserProfile, } // Order Service #[derive(Command, Clone)] struct CreateOrder { #[stream] order_id: StreamId, #[stream] customer_id: StreamId, // References user from User Service items: Vec<OrderItem>, } // Payment Service #[derive(Command, Clone)] struct ProcessPayment { #[stream] payment_id: StreamId, #[stream] order_id: StreamId, // References order from Order Service amount: Money, method: PaymentMethod, } }
Event Publishing
Services publish events for other services to consume:
#![allow(unused)] fn main() { use eventcore::distributed::{EventPublisher, EventSubscriber}; #[async_trait] trait EventPublisher { async fn publish(&self, event: &StoredEvent) -> Result<(), PublishError>; } struct MessageBusPublisher { bus: MessageBus, topic_mapping: HashMap<String, String>, } impl MessageBusPublisher { async fn publish_event<E>(&self, event: &StoredEvent<E>) -> Result<(), PublishError> where E: Serialize, { let topic = self.topic_mapping .get(&E::event_type()) .ok_or(PublishError::UnknownEventType)?; let message = DistributedEvent { event_id: event.id, event_type: E::event_type(), stream_id: event.stream_id.clone(), version: event.version, payload: serde_json::to_value(&event.payload)?, metadata: event.metadata.clone(), occurred_at: event.occurred_at, published_at: Utc::now(), service_id: self.service_id(), }; self.bus.publish(topic, &message).await?; Ok(()) } fn service_id(&self) -> String { std::env::var("SERVICE_ID").unwrap_or_else(|_| "unknown".to_string()) } } #[derive(Debug, Serialize, Deserialize)] struct DistributedEvent { event_id: EventId, event_type: String, stream_id: StreamId, version: EventVersion, payload: serde_json::Value, metadata: EventMetadata, occurred_at: DateTime<Utc>, published_at: DateTime<Utc>, service_id: String, } }
Event Subscription
Services subscribe to events from other services:
#![allow(unused)] fn main() { #[async_trait] trait EventSubscriber { async fn subscribe<F>(&self, topic: &str, handler: F) -> Result<(), SubscribeError> where F: Fn(DistributedEvent) -> BoxFuture<'_, Result<(), HandleError>> + Send + Sync + 'static; } struct OrderEventHandler { executor: CommandExecutor, } impl OrderEventHandler { async fn handle_user_events(&self, event: DistributedEvent) -> Result<(), HandleError> { match event.event_type.as_str() { "UserRegistered" => { let user_registered: UserRegisteredEvent = serde_json::from_value(event.payload)?; // Create customer profile in order service let command = CreateCustomerProfile { customer_id: StreamId::from(format!("customer-{}", user_registered.user_id)), user_id: user_registered.user_id, email: user_registered.email, preferences: CustomerPreferences::default(), }; self.executor.execute(&command).await?; } "UserUpdated" => { // Handle user updates let user_updated: UserUpdatedEvent = serde_json::from_value(event.payload)?; let command = UpdateCustomerProfile { customer_id: StreamId::from(format!("customer-{}", user_updated.user_id)), email: user_updated.email, profile_updates: user_updated.profile_changes, }; self.executor.execute(&command).await?; } _ => { // Unknown event type - log and ignore tracing::debug!("Ignoring unknown event type: {}", event.event_type); } } Ok(()) } } // Setup subscription async fn setup_event_subscriptions( subscriber: &impl EventSubscriber, handler: OrderEventHandler, ) -> Result<(), SubscribeError> { // Subscribe to user events subscriber.subscribe("user-events", move |event| { let handler = handler.clone(); Box::pin(async move { handler.handle_user_events(event).await }) }).await?; // Subscribe to payment events subscriber.subscribe("payment-events", move |event| { let handler = handler.clone(); Box::pin(async move { handler.handle_payment_events(event).await }) }).await?; Ok(()) } }
Distributed Transactions
Handle distributed transactions with the saga pattern:
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct DistributedOrderSaga { #[stream] saga_id: StreamId, order_details: OrderDetails, customer_id: UserId, } #[derive(Default)] struct DistributedSagaState { order_created: bool, payment_reserved: bool, inventory_reserved: bool, shipping_scheduled: bool, completed: bool, compensation_needed: bool, failed_step: Option<String>, } impl CommandLogic for DistributedOrderSaga { type State = DistributedSagaState; type Event = SagaEvent; async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, state: Self::State, _stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { if state.compensation_needed { self.handle_compensation(&read_streams, &state).await } else { self.handle_forward_flow(&read_streams, &state).await } } } impl DistributedOrderSaga { async fn handle_forward_flow( &self, read_streams: &ReadStreams<DistributedOrderSagaStreamSet>, state: &DistributedSagaState, ) -> CommandResult<Vec<StreamWrite<DistributedOrderSagaStreamSet, SagaEvent>>> { match (state.order_created, state.payment_reserved, state.inventory_reserved, state.shipping_scheduled) { (false, _, _, _) => { // Step 1: Create order Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::OrderCreationRequested { order_details: self.order_details.clone(), } )? ]) } (true, false, _, _) => { // Step 2: Reserve payment Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::PaymentReservationRequested { customer_id: self.customer_id, amount: self.order_details.total_amount(), } )? ]) } (true, true, false, _) => { // Step 3: Reserve inventory Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::InventoryReservationRequested { items: self.order_details.items.clone(), } )? ]) } (true, true, true, false) => { // Step 4: Schedule shipping Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::ShippingScheduleRequested { order_id: self.order_details.order_id, shipping_address: self.order_details.shipping_address.clone(), } )? ]) } (true, true, true, true) => { // All steps completed Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::SagaCompleted, )? ]) } } } async fn handle_compensation( &self, read_streams: &ReadStreams<DistributedOrderSagaStreamSet>, state: &DistributedSagaState, ) -> CommandResult<Vec<StreamWrite<DistributedOrderSagaStreamSet, SagaEvent>>> { // Compensate in reverse order if state.shipping_scheduled { Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::ShippingCancellationRequested, )? ]) } else if state.inventory_reserved { Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::InventoryReleaseRequested, )? ]) } else if state.payment_reserved { Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::PaymentReleaseRequested, )? ]) } else if state.order_created { Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::OrderCancellationRequested, )? ]) } else { Ok(vec![ StreamWrite::new( read_streams, self.saga_id.clone(), SagaEvent::CompensationCompleted, )? ]) } } } // External service integration struct ExternalServiceClient { http_client: reqwest::Client, service_url: String, timeout: Duration, } impl ExternalServiceClient { async fn create_order(&self, order: &OrderDetails) -> Result<OrderId, ServiceError> { let response = self.http_client .post(&format!("{}/orders", self.service_url)) .json(order) .timeout(self.timeout) .send() .await?; if response.status().is_success() { let result: CreateOrderResponse = response.json().await?; Ok(result.order_id) } else { Err(ServiceError::RequestFailed { status: response.status(), body: response.text().await.unwrap_or_default(), }) } } async fn cancel_order(&self, order_id: OrderId) -> Result<(), ServiceError> { let response = self.http_client .delete(&format!("{}/orders/{}", self.service_url, order_id)) .timeout(self.timeout) .send() .await?; if !response.status().is_success() { return Err(ServiceError::RequestFailed { status: response.status(), body: response.text().await.unwrap_or_default(), }); } Ok(()) } } }
Event Sourcing Across Services
Cross-Service Projections
Build projections that consume events from multiple services:
#![allow(unused)] fn main() { struct CrossServiceOrderProjection { orders: HashMap<OrderId, OrderView>, event_store: Arc<dyn EventStore>, user_service_client: UserServiceClient, payment_service_client: PaymentServiceClient, } #[derive(Debug, Clone)] struct OrderView { order_id: OrderId, customer_info: CustomerInfo, items: Vec<OrderItem>, payment_status: PaymentStatus, shipping_status: ShippingStatus, total_amount: Money, created_at: DateTime<Utc>, updated_at: DateTime<Utc>, } #[async_trait] impl Projection for CrossServiceOrderProjection { type Event = DistributedEvent; type Error = ProjectionError; async fn apply(&mut self, event: &StoredEvent<Self::Event>) -> Result<(), Self::Error> { match event.payload.event_type.as_str() { "OrderCreated" => { let order_created: OrderCreatedEvent = serde_json::from_value(event.payload.payload.clone())?; // Get customer info from user service let customer_info = self.user_service_client .get_customer_info(order_created.customer_id) .await?; let order_view = OrderView { order_id: order_created.order_id, customer_info, items: order_created.items, payment_status: PaymentStatus::Pending, shipping_status: ShippingStatus::NotStarted, total_amount: order_created.total_amount, created_at: event.occurred_at, updated_at: event.occurred_at, }; self.orders.insert(order_created.order_id, order_view); } "PaymentProcessed" => { let payment_processed: PaymentProcessedEvent = serde_json::from_value(event.payload.payload.clone())?; if let Some(order) = self.orders.get_mut(&payment_processed.order_id) { order.payment_status = PaymentStatus::Completed; order.updated_at = event.occurred_at; } } "ShipmentDispatched" => { let shipment_dispatched: ShipmentDispatchedEvent = serde_json::from_value(event.payload.payload.clone())?; if let Some(order) = self.orders.get_mut(&shipment_dispatched.order_id) { order.shipping_status = ShippingStatus::Dispatched; order.updated_at = event.occurred_at; } } _ => {} // Ignore other events } Ok(()) } } }
Event Federation
Federate events across service boundaries:
#![allow(unused)] fn main() { struct EventFederationHub { publishers: HashMap<String, Box<dyn EventPublisher>>, subscribers: HashMap<String, Vec<Box<dyn EventSubscriber>>>, routing_rules: RoutingRules, } #[derive(Debug, Clone)] struct RoutingRules { routes: Vec<RoutingRule>, } #[derive(Debug, Clone)] struct RoutingRule { source_service: String, event_pattern: String, target_services: Vec<String>, transformation: Option<String>, } impl EventFederationHub { async fn route_event(&self, event: &DistributedEvent) -> Result<(), FederationError> { let applicable_rules = self.routing_rules .routes .iter() .filter(|rule| { rule.source_service == event.service_id && self.matches_pattern(&event.event_type, &rule.event_pattern) }); for rule in applicable_rules { let transformed_event = if let Some(ref transformation) = rule.transformation { self.transform_event(event, transformation)? } else { event.clone() }; for target_service in &rule.target_services { if let Some(publisher) = self.publishers.get(target_service) { publisher.publish_federated_event(&transformed_event).await?; } } } Ok(()) } fn matches_pattern(&self, event_type: &str, pattern: &str) -> bool { // Simple pattern matching - could be more sophisticated pattern == "*" || pattern == event_type || (pattern.ends_with("*") && event_type.starts_with(&pattern[..pattern.len()-1])) } fn transform_event(&self, event: &DistributedEvent, transformation: &str) -> Result<DistributedEvent, FederationError> { // Apply transformation rules match transformation { "user_to_customer" => { let mut transformed = event.clone(); transformed.event_type = transformed.event_type.replace("User", "Customer"); Ok(transformed) } "anonymize_pii" => { let mut transformed = event.clone(); // Remove PII from payload if let Some(email) = transformed.payload.get_mut("email") { *email = serde_json::Value::String("***@***.***".to_string()); } Ok(transformed) } _ => Err(FederationError::UnknownTransformation(transformation.to_string())), } } } }
Service Discovery and Health
Service Registry
#![allow(unused)] fn main() { #[async_trait] trait ServiceRegistry { async fn register_service(&self, service: ServiceInfo) -> Result<(), RegistryError>; async fn discover_services(&self, service_type: &str) -> Result<Vec<ServiceInfo>, RegistryError>; async fn health_check(&self, service_id: &str) -> Result<HealthStatus, RegistryError>; } #[derive(Debug, Clone)] struct ServiceInfo { id: String, name: String, service_type: String, version: String, endpoints: HashMap<String, String>, health_check_url: String, capabilities: Vec<String>, metadata: HashMap<String, String>, registered_at: DateTime<Utc>, } struct ConsulServiceRegistry { consul_client: ConsulClient, } impl ConsulServiceRegistry { async fn register_eventcore_service(&self) -> Result<(), RegistryError> { let service = ServiceInfo { id: format!("eventcore-{}", uuid::Uuid::new_v4()), name: "order-service".to_string(), service_type: "eventcore".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), endpoints: hashmap! { "http".to_string() => "http://localhost:8080".to_string(), "grpc".to_string() => "grpc://localhost:8081".to_string(), "events".to_string() => "kafka://localhost:9092/order-events".to_string(), }, health_check_url: "http://localhost:8080/health".to_string(), capabilities: vec![ "event-sourcing".to_string(), "order-management".to_string(), "payment-processing".to_string(), ], metadata: hashmap! { "environment".to_string() => "production".to_string(), "region".to_string() => "us-east-1".to_string(), }, registered_at: Utc::now(), }; self.register_service(service).await } } }
Circuit Breaker for Service Calls
#![allow(unused)] fn main() { struct ServiceCircuitBreaker { state: Arc<RwLock<CircuitBreakerState>>, config: CircuitBreakerConfig, } #[derive(Debug)] struct CircuitBreakerConfig { failure_threshold: u32, timeout: Duration, retry_timeout: Duration, } #[derive(Debug)] enum CircuitBreakerState { Closed { failure_count: u32 }, Open { failed_at: DateTime<Utc> }, HalfOpen, } impl ServiceCircuitBreaker { async fn call<F, T, E>(&self, operation: F) -> Result<T, CircuitBreakerError<E>> where F: Future<Output = Result<T, E>>, { // Check circuit state { let state = self.state.read().await; match *state { CircuitBreakerState::Open { failed_at } => { if Utc::now() - failed_at < self.config.retry_timeout { return Err(CircuitBreakerError::CircuitOpen); } // Transition to half-open } _ => {} } } // Update to half-open if we were open { let mut state = self.state.write().await; if matches!(*state, CircuitBreakerState::Open { .. }) { *state = CircuitBreakerState::HalfOpen; } } // Execute operation with timeout match tokio::time::timeout(self.config.timeout, operation).await { Ok(Ok(result)) => { // Success - reset circuit let mut state = self.state.write().await; *state = CircuitBreakerState::Closed { failure_count: 0 }; Ok(result) } Ok(Err(e)) => { // Operation failed self.record_failure().await; Err(CircuitBreakerError::OperationFailed(e)) } Err(_) => { // Timeout self.record_failure().await; Err(CircuitBreakerError::Timeout) } } } async fn record_failure(&self) { let mut state = self.state.write().await; match *state { CircuitBreakerState::Closed { failure_count } => { let new_count = failure_count + 1; if new_count >= self.config.failure_threshold { *state = CircuitBreakerState::Open { failed_at: Utc::now() }; } else { *state = CircuitBreakerState::Closed { failure_count: new_count }; } } CircuitBreakerState::HalfOpen => { *state = CircuitBreakerState::Open { failed_at: Utc::now() }; } _ => {} } } } #[derive(Debug, thiserror::Error)] enum CircuitBreakerError<E> { #[error("Circuit breaker is open")] CircuitOpen, #[error("Operation timed out")] Timeout, #[error("Operation failed: {0}")] OperationFailed(E), } }
Distributed Monitoring
Distributed Tracing
#![allow(unused)] fn main() { use opentelemetry::{global, trace::{TraceContextExt, Tracer}}; use tracing_opentelemetry::OpenTelemetrySpanExt; #[derive(Clone)] struct DistributedCommandExecutor { inner: CommandExecutor, tracer: Box<dyn Tracer + Send + Sync>, } impl DistributedCommandExecutor { async fn execute_with_tracing<C: Command>( &self, command: &C, parent_context: Option<SpanContext>, ) -> CommandResult<ExecutionResult> { let span = self.tracer .span_builder(format!("execute_command_{}", std::any::type_name::<C>())) .with_kind(SpanKind::Internal) .start(&self.tracer); if let Some(parent) = parent_context { span.set_parent(parent); } let _guard = span.enter(); span.set_attribute("command.type", std::any::type_name::<C>()); span.set_attribute("service.name", self.service_name()); match self.inner.execute(command).await { Ok(result) => { span.set_attribute("command.success", true); span.set_attribute("events.written", result.events_written.len() as i64); Ok(result) } Err(e) => { span.set_attribute("command.success", false); span.set_attribute("error.message", e.to_string()); Err(e) } } } } // Distributed event with trace context #[derive(Debug, Serialize, Deserialize)] struct TracedDistributedEvent { #[serde(flatten)] event: DistributedEvent, trace_id: String, span_id: String, } impl From<(&StoredEvent, &SpanContext)> for TracedDistributedEvent { fn from((event, context): (&StoredEvent, &SpanContext)) -> Self { Self { event: event.into(), trace_id: context.trace_id().to_string(), span_id: context.span_id().to_string(), } } } }
Metrics Collection
#![allow(unused)] fn main() { use prometheus::{Counter, Histogram, Gauge, Registry}; #[derive(Clone)] struct DistributedMetrics { registry: Registry, // Command metrics commands_total: Counter, command_duration: Histogram, command_errors: Counter, // Event metrics events_published: Counter, events_consumed: Counter, event_lag: Gauge, // Service metrics service_health: Gauge, active_connections: Gauge, } impl DistributedMetrics { fn new(service_name: &str) -> Self { let registry = Registry::new(); let commands_total = Counter::new( "eventcore_commands_total", "Total commands executed" ).unwrap(); let command_duration = Histogram::new( "eventcore_command_duration_seconds", "Command execution duration" ).unwrap(); let command_errors = Counter::new( "eventcore_command_errors_total", "Total command errors" ).unwrap(); let events_published = Counter::new( "eventcore_events_published_total", "Total events published" ).unwrap(); let events_consumed = Counter::new( "eventcore_events_consumed_total", "Total events consumed" ).unwrap(); let event_lag = Gauge::new( "eventcore_event_lag_seconds", "Event processing lag" ).unwrap(); let service_health = Gauge::new( "eventcore_service_health", "Service health status (0=down, 1=up)" ).unwrap(); let active_connections = Gauge::new( "eventcore_active_connections", "Number of active connections" ).unwrap(); // Register all metrics registry.register(Box::new(commands_total.clone())).unwrap(); registry.register(Box::new(command_duration.clone())).unwrap(); registry.register(Box::new(command_errors.clone())).unwrap(); registry.register(Box::new(events_published.clone())).unwrap(); registry.register(Box::new(events_consumed.clone())).unwrap(); registry.register(Box::new(event_lag.clone())).unwrap(); registry.register(Box::new(service_health.clone())).unwrap(); registry.register(Box::new(active_connections.clone())).unwrap(); Self { registry, commands_total, command_duration, command_errors, events_published, events_consumed, event_lag, service_health, active_connections, } } fn record_command_executed(&self, command_type: &str, duration: Duration, success: bool) { self.commands_total .with_label_values(&[command_type]) .inc(); self.command_duration .with_label_values(&[command_type]) .observe(duration.as_secs_f64()); if !success { self.command_errors .with_label_values(&[command_type]) .inc(); } } fn record_event_published(&self, event_type: &str) { self.events_published .with_label_values(&[event_type]) .inc(); } fn record_event_consumed(&self, event_type: &str, lag: Duration) { self.events_consumed .with_label_values(&[event_type]) .inc(); self.event_lag .with_label_values(&[event_type]) .set(lag.as_secs_f64()); } async fn export_metrics(&self) -> String { use prometheus::Encoder; let encoder = prometheus::TextEncoder::new(); let metric_families = self.registry.gather(); encoder.encode_to_string(&metric_families).unwrap() } } }
Testing Distributed Systems
#![allow(unused)] fn main() { #[cfg(test)] mod distributed_tests { use super::*; use testcontainers::*; #[tokio::test] async fn test_distributed_saga() { // Setup test environment with multiple services let docker = clients::Cli::default(); let kafka_container = docker.run(images::kafka::Kafka::default()); let postgres_container = docker.run(images::postgres::Postgres::default()); // Start services let user_service = start_user_service(&postgres_container).await; let order_service = start_order_service(&postgres_container).await; let payment_service = start_payment_service(&postgres_container).await; // Setup event routing let event_hub = EventFederationHub::new(&kafka_container); // Execute distributed saga let saga = DistributedOrderSaga { saga_id: StreamId::new(), order_details: create_test_order(), customer_id: create_test_customer(&user_service).await, }; let result = order_service.execute_saga(&saga).await; // Verify all services were coordinated correctly assert!(result.is_ok()); // Verify final state across services let order = order_service.get_order(saga.order_details.order_id).await?; assert_eq!(order.status, OrderStatus::Completed); let payment = payment_service.get_payment(saga.order_details.order_id).await?; assert_eq!(payment.status, PaymentStatus::Completed); } #[tokio::test] async fn test_service_failure_compensation() { // Similar setup but simulate payment service failure // Verify compensation is triggered // Verify order is cancelled // Verify inventory is released } } }
Best Practices
- Design for independence - Services should be loosely coupled
- Use event-driven communication - Prefer async events over sync calls
- Implement circuit breakers - Protect against cascading failures
- Monitor everything - Comprehensive observability is critical
- Plan for failure - Design compensation strategies upfront
- Version everything - Events, services, and APIs
- Test across services - Include distributed testing
- Document service contracts - Clear event schemas and APIs
Summary
Distributed EventCore systems:
- ✅ Service boundaries - Clear ownership of streams and commands
- ✅ Event-driven - Async communication between services
- ✅ Fault tolerant - Circuit breakers and compensation
- ✅ Observable - Distributed tracing and metrics
- ✅ Scalable - Independent scaling of services
Key patterns:
- Own your streams - each service owns its event streams
- Publish events - share state changes via events
- Use sagas - coordinate distributed transactions
- Monitor health - track service health and performance
- Plan for failure - implement circuit breakers and compensation
Next, let’s explore Performance Optimization →