ARCHITECTURE
Caxton System Architecture
Version: 1.0 Date: 2025-08-04 Status: Design Phase
Executive Summary
Caxton is a production-ready multi-agent orchestration server that provides WebAssembly-based agent isolation, FIPA-compliant messaging, and comprehensive observability. This document defines the complete system architecture, domain model, and implementation patterns following type-driven development principles.
Table of Contents
- System Overview
- Domain Model
- Component Architecture
- Agent Lifecycle Management
- FIPA Message Flow
- Security Architecture
- Observability Integration
- Performance & Scalability
- Type Safety & Error Handling
- Deployment Architecture
System Overview
High-Level Architecture
┌─────────────────────────────────────────────────────────────┐
│ Management Layer │
├─────────────────┬───────────────────┬───────────────────────┤
│ CLI Tool │ Web Dashboard │ gRPC/REST API │
│ (caxton) │ (Future) │ (Management) │
└─────────────────┴───────────────────┴───────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Caxton Server Process │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Management API Layer │ │
│ │ • Authentication • Authorization │ │
│ │ • Agent Deployment • Configuration │ │
│ └─────────────────┬─────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────▼─────────────────────────────────────┐ │
│ │ Agent Runtime Environment │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │Agent A │ │Agent B │ │Agent C │ │Agent N │ │ │
│ │ │(WASM) │ │(WASM) │ │(WASM) │ │(WASM) │ │ │
│ │ │Sandbox │ │Sandbox │ │Sandbox │ │Sandbox │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ │ └────────────┼────────────┼────────────┘ │ │
│ │ │ │ │ │
│ │ ┌─────────────────▼────────────▼─────────────────┐ │ │
│ │ │ FIPA Message Bus │ │ │
│ │ │ • Message Routing • Protocol Handling │ │ │
│ │ │ • Conversation Mgmt • Error Recovery │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Observability Layer │ │
│ │ • Structured Logging (tracing crate) │ │
│ │ • Metrics (Prometheus) │ │
│ │ • Distributed Tracing (OpenTelemetry) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Coordination Layer │ │
│ │ • Cluster Membership (SWIM Protocol) │ │
│ │ • Agent Registry (Gossip) │ │
│ │ • Local State (SQLite) │ │
│ │ • Partition Detection & Handling │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Core Principles
- Type-Driven Design: All illegal states are unrepresentable through the type system
- Observability First: Every operation is traced, logged, and measured
- WebAssembly Isolation: Complete sandboxing between agents
- FIPA Compliance: Standard agent communication protocols
- Minimal Core: Small, focused core with rich extension ecosystem
- Coordination Over State: Use lightweight protocols instead of shared databases (see ADR-0014)
Protocol Layering
- SWIM Protocol (Infrastructure): Handles cluster membership, failure detection, and routing information dissemination
- FIPA Protocol (Application): Manages semantic agent-to-agent messaging with conversation tracking
- Clear Separation: These protocols operate at different layers and complement each other (see ADR-0015)
Domain Model
Core Domain Types
// Agent Identity and Lifecycle
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AgentId(NonZeroU64);
#[derive(Debug, Clone)]
pub struct Agent<State> {
id: AgentId,
name: AgentName,
wasm_module: WasmModule,
resources: ResourceLimits,
_state: PhantomData<State>,
}
// Agent States - Make illegal state transitions impossible
pub struct Unloaded;
pub struct Loaded;
pub struct Running;
pub struct Draining;
pub struct Failed;
impl Agent<Unloaded> {
pub fn load(self, module: WasmModule) -> Result<Agent<Loaded>, LoadError> {
// Only unloaded agents can be loaded
}
}
impl Agent<Loaded> {
pub fn start(self) -> Result<Agent<Running>, StartError> {
// Only loaded agents can start
}
}
impl Agent<Running> {
pub fn drain(self) -> Result<Agent<Draining>, DrainError> {
// Only running agents can be drained
}
pub fn handle_message(&self, msg: FipaMessage) -> Result<(), ProcessingError> {
// Only running agents can process messages
}
}
// FIPA Message Domain Model
#[derive(Debug, Clone)]
pub struct FipaMessage {
pub id: MessageId,
pub performative: Performative,
pub sender: AgentId,
pub receiver: AgentId,
pub conversation_id: ConversationId,
pub reply_with: Option<ReplyWith>,
pub in_reply_to: Option<InReplyTo>,
pub content: MessageContent,
pub ontology: Option<Ontology>,
pub language: Option<Language>,
// Observability context
pub trace_id: TraceId,
pub span_id: SpanId,
pub timestamp: SystemTime,
}
#[derive(Debug, Clone, PartialEq)]
pub enum Performative {
// Information exchange
Inform,
Query,
Request,
// Negotiation
Propose,
AcceptProposal,
RejectProposal,
// Contract Net Protocol
Cfp, // Call for Proposals
// Error handling
NotUnderstood,
Failure,
// Conversation management
Cancel,
}
// Resource Management
#[derive(Debug, Clone)]
pub struct ResourceLimits {
pub max_memory_bytes: ByteSize,
pub max_cpu_millis: CpuMillis,
pub max_execution_time: Duration,
pub max_message_size: ByteSize,
}
// Deployment and Versioning
#[derive(Debug, Clone)]
pub struct Deployment {
pub id: DeploymentId,
pub agent_id: AgentId,
pub version: Version,
pub strategy: DeploymentStrategy,
pub state: DeploymentState,
pub created_at: SystemTime,
}
#[derive(Debug, Clone)]
pub enum DeploymentStrategy {
Direct,
BlueGreen { warm_up_duration: Duration },
Canary {
stages: Vec<CanaryStage>,
rollback_conditions: RollbackConditions,
},
Shadow {
duration: Duration,
comparison_metrics: Vec<MetricComparison>,
},
}
#[derive(Debug, Clone)]
pub struct CanaryStage {
pub percentage: Percentage,
pub duration: Duration,
}
#[derive(Debug, Clone, PartialEq)]
pub enum DeploymentState {
Validating,
Deploying,
Running,
Draining,
Failed { error: DeploymentError },
RolledBack { reason: RollbackReason },
}
Message Flow State Machine
// Conversation State Management
#[derive(Debug, Clone)]
pub struct Conversation {
pub id: ConversationId,
pub initiator: AgentId,
pub participants: NonEmpty<AgentId>,
pub protocol: InteractionProtocol,
pub state: ConversationState,
pub messages: Vec<FipaMessage>,
pub created_at: SystemTime,
pub expires_at: Option<SystemTime>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum InteractionProtocol {
RequestReply,
ContractNet,
Auction,
Negotiation,
Subscribe,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ConversationState {
Initiated,
InProgress,
Completed { outcome: ConversationOutcome },
Failed { error: ConversationError },
Expired,
}
Component Architecture
Agent Runtime Environment
pub struct AgentRuntime {
agents: HashMap<AgentId, Arc<Agent<Running>>>,
wasm_engine: WasmEngine,
message_router: Arc<MessageRouter>,
resource_manager: Arc<ResourceManager>,
observability: Arc<ObservabilityLayer>,
}
impl AgentRuntime {
pub async fn spawn_agent(
&mut self,
config: AgentConfig
) -> Result<AgentId, SpawnError> {
let span = tracing::info_span!("agent_spawn", %config.name);
let _enter = span.enter();
// 1. Validate WASM module
let module = self.validate_wasm_module(&config.wasm_bytes).await?;
// 2. Create agent instance
let agent = Agent::new(config.name, module)
.load(config.wasm_bytes)?
.start()?;
// 3. Register with router
self.message_router.register_agent(agent.id(), &agent).await?;
// 4. Track resources
self.resource_manager.track_agent(agent.id(), config.resources)?;
// 5. Store running agent
let agent_id = agent.id();
self.agents.insert(agent_id, Arc::new(agent));
// 6. Emit telemetry
self.observability.record_agent_spawned(agent_id);
Ok(agent_id)
}
#[instrument(skip(self, message))]
pub async fn route_message(
&self,
message: FipaMessage
) -> Result<(), RoutingError> {
// Message routing with full observability
let agent = self.agents.get(&message.receiver)
.ok_or(RoutingError::AgentNotFound(message.receiver))?;
// Create child span for message processing
let span = tracing::info_span!(
"message_process",
agent_id = %message.receiver,
message_id = %message.id,
performative = ?message.performative,
);
agent.handle_message(message).instrument(span).await
}
}
Message Router
#[async_trait]
pub trait MessageRouter: Send + Sync {
async fn route(&self, message: FipaMessage) -> Result<(), RoutingError>;
async fn register_agent(&self, id: AgentId, agent: &Agent<Running>) -> Result<(), RegistrationError>;
async fn unregister_agent(&self, id: AgentId) -> Result<(), RegistrationError>;
}
pub struct FipaMessageRouter {
routing_table: Arc<RwLock<HashMap<AgentId, Arc<Agent<Running>>>>>,
conversation_manager: Arc<ConversationManager>,
message_store: Arc<dyn MessageStore>,
observability: Arc<ObservabilityLayer>,
}
impl FipaMessageRouter {
#[instrument(skip(self, message))]
pub async fn route(&self, message: FipaMessage) -> Result<(), RoutingError> {
// 1. Validate message
self.validate_message(&message)?;
// 2. Store for persistence
self.message_store.store_message(&message).await?;
// 3. Update conversation state
self.conversation_manager
.update_conversation(&message)
.await?;
// 4. Find target agent
let agents = self.routing_table.read().await;
let target_agent = agents.get(&message.receiver)
.ok_or(RoutingError::AgentNotFound(message.receiver))?;
// 5. Deliver message
target_agent.handle_message(message).await?;
// 6. Record metrics
self.observability.record_message_routed(&message);
Ok(())
}
}
Agent Lifecycle Management
Lifecycle State Machine
pub struct AgentLifecycleManager {
state_store: Arc<dyn StateStore>,
deployment_manager: Arc<DeploymentManager>,
resource_manager: Arc<ResourceManager>,
observability: Arc<ObservabilityLayer>,
}
impl AgentLifecycleManager {
pub async fn deploy_agent(
&self,
config: DeploymentConfig
) -> Result<DeploymentId, DeploymentError> {
let deployment_id = DeploymentId::new();
let span = tracing::info_span!(
"agent_deploy",
%deployment_id,
agent_name = %config.agent_name,
strategy = ?config.strategy
);
let _enter = span.enter();
// Create deployment record
let deployment = Deployment {
id: deployment_id,
agent_id: config.agent_id,
version: config.version,
strategy: config.strategy.clone(),
state: DeploymentState::Validating,
created_at: SystemTime::now(),
};
// Store deployment
self.state_store.store_deployment(&deployment).await?;
// Execute deployment strategy
match config.strategy {
DeploymentStrategy::Direct => {
self.execute_direct_deployment(deployment_id, config).await
},
DeploymentStrategy::BlueGreen { .. } => {
self.execute_blue_green_deployment(deployment_id, config).await
},
DeploymentStrategy::Canary { .. } => {
self.execute_canary_deployment(deployment_id, config).await
},
DeploymentStrategy::Shadow { .. } => {
self.execute_shadow_deployment(deployment_id, config).await
},
}
}
#[instrument(skip(self, config))]
async fn execute_canary_deployment(
&self,
deployment_id: DeploymentId,
config: DeploymentConfig
) -> Result<DeploymentId, DeploymentError> {
// 1. Validation phase
self.update_deployment_state(deployment_id, DeploymentState::Validating).await?;
let validation_result = self.validate_agent(&config).await?;
if !validation_result.passed {
return Err(DeploymentError::ValidationFailed(validation_result.errors));
}
// 2. Begin canary deployment
self.update_deployment_state(deployment_id, DeploymentState::Deploying).await?;
if let DeploymentStrategy::Canary { stages, rollback_conditions } = &config.strategy {
for stage in stages {
// Deploy to percentage of traffic
self.deploy_canary_stage(deployment_id, stage).await?;
// Monitor for rollback conditions
let monitoring_result = self.monitor_canary_stage(
deployment_id,
stage,
rollback_conditions
).await?;
if monitoring_result.should_rollback {
self.rollback_deployment(deployment_id, monitoring_result.reason).await?;
return Err(DeploymentError::RolledBack(monitoring_result.reason));
}
}
}
// 3. Complete deployment
self.update_deployment_state(deployment_id, DeploymentState::Running).await?;
self.observability.record_deployment_completed(deployment_id);
Ok(deployment_id)
}
}
FIPA Message Flow
Protocol Implementation
pub struct ContractNetProtocol {
conversation_manager: Arc<ConversationManager>,
message_router: Arc<dyn MessageRouter>,
timeout_manager: Arc<TimeoutManager>,
}
impl ContractNetProtocol {
#[instrument(skip(self, task))]
pub async fn initiate_contract_net(
&self,
initiator: AgentId,
task: Task,
participants: NonEmpty<AgentId>
) -> Result<ContractResult, ProtocolError> {
let conversation_id = ConversationId::new();
let span = tracing::info_span!(
"contract_net_protocol",
%conversation_id,
%initiator,
participant_count = participants.len()
);
let _enter = span.enter();
// 1. Create conversation
let conversation = Conversation {
id: conversation_id,
initiator,
participants: participants.clone(),
protocol: InteractionProtocol::ContractNet,
state: ConversationState::Initiated,
messages: Vec::new(),
created_at: SystemTime::now(),
expires_at: Some(SystemTime::now() + Duration::from_secs(300)),
};
self.conversation_manager.create_conversation(conversation).await?;
// 2. Send Call for Proposals
let cfp_message = FipaMessage {
id: MessageId::new(),
performative: Performative::Cfp,
sender: initiator,
receiver: AgentId::broadcast(), // Special broadcast ID
conversation_id,
reply_with: Some(ReplyWith::new()),
in_reply_to: None,
content: MessageContent::Task(task),
ontology: Some(Ontology::ContractNet),
language: Some(Language::Json),
trace_id: Span::current().id().unwrap_or_default(),
span_id: SpanId::new(),
timestamp: SystemTime::now(),
};
// Broadcast to all participants
for participant in &participants {
let mut participant_message = cfp_message.clone();
participant_message.receiver = *participant;
self.message_router.route(participant_message).await?;
}
// 3. Collect proposals with timeout
let proposals = self.collect_proposals(
conversation_id,
participants.len(),
Duration::from_secs(30)
).await?;
// 4. Evaluate and select winner
let winning_proposal = self.evaluate_proposals(proposals)?;
// 5. Send accept-proposal to winner
let accept_message = FipaMessage {
id: MessageId::new(),
performative: Performative::AcceptProposal,
sender: initiator,
receiver: winning_proposal.sender,
conversation_id,
reply_with: Some(ReplyWith::new()),
in_reply_to: Some(InReplyTo::from(winning_proposal.id)),
content: MessageContent::Acceptance(winning_proposal.content.clone()),
ontology: Some(Ontology::ContractNet),
language: Some(Language::Json),
trace_id: Span::current().id().unwrap_or_default(),
span_id: SpanId::new(),
timestamp: SystemTime::now(),
};
self.message_router.route(accept_message).await?;
// 6. Send reject-proposal to others
for proposal in &proposals {
if proposal.sender != winning_proposal.sender {
let reject_message = FipaMessage {
id: MessageId::new(),
performative: Performative::RejectProposal,
sender: initiator,
receiver: proposal.sender,
conversation_id,
reply_with: None,
in_reply_to: Some(InReplyTo::from(proposal.id)),
content: MessageContent::Rejection("Proposal not selected".to_string()),
ontology: Some(Ontology::ContractNet),
language: Some(Language::Json),
trace_id: Span::current().id().unwrap_or_default(),
span_id: SpanId::new(),
timestamp: SystemTime::now(),
};
self.message_router.route(reject_message).await?;
}
}
// 7. Complete conversation
self.conversation_manager.complete_conversation(
conversation_id,
ConversationOutcome::ContractAwarded {
winner: winning_proposal.sender,
task: task.clone(),
}
).await?;
Ok(ContractResult {
conversation_id,
winner: winning_proposal.sender,
proposal: winning_proposal,
})
}
}
Security Architecture
WebAssembly Sandboxing
pub struct WasmSandbox {
engine: wasmtime::Engine,
store: wasmtime::Store<WasmContext>,
instance: wasmtime::Instance,
resource_limiter: ResourceLimiter,
}
#[derive(Debug)]
pub struct WasmContext {
agent_id: AgentId,
resource_limits: ResourceLimits,
mcp_tools: Arc<McpToolRegistry>,
observability: Arc<ObservabilityLayer>,
}
impl WasmSandbox {
pub fn new(
agent_id: AgentId,
wasm_bytes: &[u8],
resource_limits: ResourceLimits
) -> Result<Self, SandboxError> {
// Create engine with security configurations
let mut config = wasmtime::Config::new();
config.wasm_simd(false); // Disable SIMD for security
config.wasm_reference_types(false); // Disable ref types
config.wasm_bulk_memory(false); // Disable bulk memory
config.consume_fuel(true); // Enable fuel for CPU limiting
let engine = wasmtime::Engine::new(&config)?;
// Create store with resource limits
let context = WasmContext {
agent_id,
resource_limits: resource_limits.clone(),
mcp_tools: Arc::new(McpToolRegistry::new()),
observability: Arc::new(ObservabilityLayer::new()),
};
let mut store = wasmtime::Store::new(&engine, context);
store.limiter(|ctx| &mut ctx.resource_limits);
// Set fuel limit for CPU control
store.set_fuel(resource_limits.max_cpu_millis.into())?;
// Load and instantiate module
let module = wasmtime::Module::new(&engine, wasm_bytes)?;
let instance = wasmtime::Instance::new(&mut store, &module, &[])?;
Ok(Self {
engine,
store,
instance,
resource_limiter: ResourceLimiter::new(resource_limits),
})
}
#[instrument(skip(self, message))]
pub async fn handle_message(
&mut self,
message: FipaMessage
) -> Result<Option<FipaMessage>, SandboxError> {
// Serialize message for WASM
let message_bytes = serde_json::to_vec(&message)?;
// Get WASM function
let handle_message = self.instance
.get_typed_func::<(i32, i32), i32>(&mut self.store, "handle_message")?;
// Allocate memory in WASM instance
let memory = self.instance.get_memory(&mut self.store, "memory")
.ok_or(SandboxError::NoMemoryExport)?;
let message_ptr = self.allocate_in_wasm(memory, &message_bytes)?;
// Set fuel before execution
self.store.set_fuel(self.resource_limiter.remaining_cpu())?;
// Call WASM function with timeout
let result = tokio::time::timeout(
self.resource_limiter.max_execution_time(),
async {
handle_message.call_async(
&mut self.store,
(message_ptr, message_bytes.len() as i32)
).await
}
).await??;
// Check remaining fuel
let consumed_fuel = self.resource_limiter.max_cpu_millis() - self.store.get_fuel()?;
self.resource_limiter.consume_cpu(consumed_fuel)?;
// Handle result
match result {
0 => Ok(None), // No response
1 => {
// Agent produced a response
let response_bytes = self.get_response_from_wasm(memory)?;
let response_message: FipaMessage = serde_json::from_slice(&response_bytes)?;
Ok(Some(response_message))
},
error_code => Err(SandboxError::AgentError(error_code)),
}
}
}
Resource Management
pub struct ResourceLimiter {
max_memory_bytes: ByteSize,
max_cpu_millis: CpuMillis,
max_execution_time: Duration,
consumed_memory: AtomicU64,
consumed_cpu: AtomicU64,
start_time: SystemTime,
}
impl ResourceLimiter {
pub fn consume_memory(&self, bytes: u64) -> Result<(), ResourceError> {
let current = self.consumed_memory.fetch_add(bytes, Ordering::SeqCst);
if current + bytes > self.max_memory_bytes.as_u64() {
Err(ResourceError::MemoryLimitExceeded {
limit: self.max_memory_bytes,
attempted: ByteSize::from(current + bytes),
})
} else {
Ok(())
}
}
pub fn consume_cpu(&self, millis: u64) -> Result<(), ResourceError> {
let current = self.consumed_cpu.fetch_add(millis, Ordering::SeqCst);
if current + millis > self.max_cpu_millis.as_u64() {
Err(ResourceError::CpuLimitExceeded {
limit: self.max_cpu_millis,
attempted: CpuMillis::from(current + millis),
})
} else {
Ok(())
}
}
}
Observability Integration
Structured Logging and Tracing
pub struct ObservabilityLayer {
tracer: opentelemetry::global::Tracer,
metrics_registry: Arc<MetricsRegistry>,
event_store: Arc<dyn EventStore>,
}
impl ObservabilityLayer {
pub fn record_agent_event<T>(&self, event: AgentEvent<T>)
where T: Serialize + Send + Sync + 'static {
// Structured logging
tracing::info!(
agent_id = %event.agent_id,
event_type = std::any::type_name::<T>(),
timestamp = ?event.timestamp,
trace_id = %event.trace_id,
"Agent event recorded"
);
// Store event for analysis
self.event_store.store_event(event);
// Update metrics
self.metrics_registry.increment_counter(
"caxton_agent_events_total",
&[("agent_id", &event.agent_id.to_string())]
);
}
pub fn record_message_metrics(&self, message: &FipaMessage, duration: Duration) {
// Histogram for message processing time
self.metrics_registry.record_histogram(
"caxton_message_processing_duration_seconds",
duration.as_secs_f64(),
&[
("performative", &format!("{:?}", message.performative)),
("sender", &message.sender.to_string()),
("receiver", &message.receiver.to_string()),
]
);
// Counter for message throughput
self.metrics_registry.increment_counter(
"caxton_messages_processed_total",
&[
("performative", &format!("{:?}", message.performative)),
("status", "success"),
]
);
}
}
// Agent Event Types for Observability
#[derive(Debug, Serialize)]
pub struct AgentEvent<T> {
pub agent_id: AgentId,
pub timestamp: SystemTime,
pub trace_id: TraceId,
pub span_id: SpanId,
pub event_data: T,
}
#[derive(Debug, Serialize)]
pub struct LifecycleEvent {
pub old_state: Option<String>,
pub new_state: String,
pub reason: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct CommunicationEvent {
pub message_id: MessageId,
pub conversation_id: ConversationId,
pub performative: Performative,
pub direction: MessageDirection,
}
#[derive(Debug, Serialize)]
pub struct PerformanceEvent {
pub cpu_usage: f64,
pub memory_usage: ByteSize,
pub message_queue_size: usize,
pub active_conversations: usize,
}
Performance & Scalability
Scalability Targets
- Single Instance: 1,000+ concurrent agents
- Message Throughput: 100,000+ messages/second
- Latency: < 1ms p99 for local message routing
- Memory Efficiency: < 1MB per idle agent
- Startup Time: < 100ms for new agent deployment
Performance Optimizations
pub struct PerformanceOptimizedRuntime {
// Agent pool for reusing WASM instances
agent_pool: Arc<AgentPool>,
// Message batching for throughput
message_batcher: Arc<MessageBatcher>,
// Lock-free message queue
message_queue: Arc<crossbeam::queue::SegQueue<FipaMessage>>,
// CPU-intensive work executor
cpu_executor: Arc<tokio::task::JoinSet<()>>,
// I/O bound work executor
io_executor: Arc<tokio::task::JoinSet<()>>,
}
impl PerformanceOptimizedRuntime {
pub async fn process_message_batch(&self) -> Result<(), ProcessingError> {
let batch = self.message_batcher.get_batch().await?;
// Process messages in parallel
let mut tasks = Vec::new();
for message in batch {
let agent_pool = Arc::clone(&self.agent_pool);
let task = tokio::spawn(async move {
let agent = agent_pool.get_agent(message.receiver).await?;
agent.handle_message(message).await
});
tasks.push(task);
}
// Wait for all messages to complete
futures::future::try_join_all(tasks).await?;
Ok(())
}
}
// Agent Pool for Instance Reuse
pub struct AgentPool {
available_agents: Arc<Mutex<HashMap<AgentId, VecDeque<Agent<Running>>>>>,
max_pool_size: usize,
}
impl AgentPool {
pub async fn get_agent(&self, agent_id: AgentId) -> Result<Agent<Running>, PoolError> {
let mut pool = self.available_agents.lock().await;
if let Some(agents) = pool.get_mut(&agent_id) {
if let Some(agent) = agents.pop_front() {
return Ok(agent);
}
}
// No pooled instance available, create new one
self.create_fresh_agent(agent_id).await
}
pub async fn return_agent(&self, agent: Agent<Running>) -> Result<(), PoolError> {
let mut pool = self.available_agents.lock().await;
let agent_id = agent.id();
let agents = pool.entry(agent_id).or_insert_with(VecDeque::new);
if agents.len() < self.max_pool_size {
agents.push_back(agent);
}
// Otherwise, drop the agent to reclaim memory
Ok(())
}
}
Type Safety & Error Handling
Comprehensive Error Model
// Top-level Error Type
#[derive(Debug, Error)]
pub enum CaxtonError {
#[error("Agent error: {0}")]
Agent(#[from] AgentError),
#[error("Message routing error: {0}")]
Routing(#[from] RoutingError),
#[error("Deployment error: {0}")]
Deployment(#[from] DeploymentError),
#[error("Resource error: {0}")]
Resource(#[from] ResourceError),
#[error("Security error: {0}")]
Security(#[from] SecurityError),
#[error("Configuration error: {0}")]
Configuration(#[from] ConfigurationError),
#[error("Observability error: {0}")]
Observability(#[from] ObservabilityError),
}
// Domain-specific Error Types
#[derive(Debug, Error)]
pub enum AgentError {
#[error("Agent {agent_id} not found")]
NotFound { agent_id: AgentId },
#[error("Agent {agent_id} is in state {current_state}, cannot perform {operation}")]
InvalidState {
agent_id: AgentId,
current_state: String,
operation: String,
},
#[error("WASM execution failed: {reason}")]
WasmExecutionFailed { reason: String },
#[error("Agent {agent_id} exceeded resource limit: {limit_type}")]
ResourceLimitExceeded {
agent_id: AgentId,
limit_type: String,
},
}
#[derive(Debug, Error)]
pub enum RoutingError {
#[error("No route found for agent {agent_id}")]
NoRoute { agent_id: AgentId },
#[error("Message {message_id} delivery failed: {reason}")]
DeliveryFailed {
message_id: MessageId,
reason: String,
},
#[error("Invalid message format: {details}")]
InvalidMessageFormat { details: String },
#[error("Conversation {conversation_id} not found")]
ConversationNotFound { conversation_id: ConversationId },
}
// Railway-Oriented Programming Pattern
pub type CaxtonResult<T> = Result<T, CaxtonError>;
pub trait CaxtonResultExt<T> {
fn with_context<F>(self, f: F) -> CaxtonResult<T>
where
F: FnOnce() -> String;
}
impl<T, E> CaxtonResultExt<T> for Result<T, E>
where
E: Into<CaxtonError>
{
fn with_context<F>(self, f: F) -> CaxtonResult<T>
where
F: FnOnce() -> String
{
self.map_err(|e| {
let context = f();
tracing::error!(error = %e.into(), context = %context, "Operation failed");
e.into()
})
}
}
Smart Constructors for Type Safety
// Smart constructor pattern ensures validation
impl AgentId {
pub fn new() -> Self {
Self(NonZeroU64::new(rand::random()).expect("random u64 is non-zero"))
}
// Parse from string with validation
pub fn parse(s: &str) -> Result<Self, ParseError> {
let id = s.parse::<u64>()
.map_err(|_| ParseError::InvalidFormat)?;
let non_zero_id = NonZeroU64::new(id)
.ok_or(ParseError::ZeroId)?;
Ok(Self(non_zero_id))
}
}
impl MessageId {
pub fn new() -> Self {
Self(Uuid::new_v4())
}
pub fn parse(s: &str) -> Result<Self, ParseError> {
let uuid = Uuid::parse_str(s)
.map_err(|_| ParseError::InvalidUuid)?;
Ok(Self(uuid))
}
}
// Percentage type that ensures valid range
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct Percentage(f64);
impl Percentage {
pub fn new(value: f64) -> Result<Self, ValidationError> {
if !(0.0..=100.0).contains(&value) {
return Err(ValidationError::InvalidPercentage { value });
}
Ok(Self(value))
}
pub fn from_ratio(ratio: f64) -> Result<Self, ValidationError> {
if !(0.0..=1.0).contains(&ratio) {
return Err(ValidationError::InvalidRatio { ratio });
}
Ok(Self(ratio * 100.0))
}
pub fn as_ratio(&self) -> f64 {
self.0 / 100.0
}
pub fn as_percentage(&self) -> f64 {
self.0
}
}
Deployment Architecture
Production Deployment Pattern
# docker-compose.yml for production deployment
version: '3.8'
services:
caxton-server:
image: caxton/caxton:latest
ports:
- "8080:8080" # Management API
- "9090:9090" # Metrics endpoint
environment:
- CAXTON_CONFIG_PATH=/etc/caxton/config.yaml
- RUST_LOG=info
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:14268
volumes:
- ./config/caxton.yaml:/etc/caxton/config.yaml:ro
- caxton-data:/var/lib/caxton
healthcheck:
test: ["CMD", "caxton", "health"]
interval: 30s
timeout: 10s
retries: 3
restart: unless-stopped
depends_on:
- postgres
- jaeger
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=caxton
- POSTGRES_USER=caxton
- POSTGRES_PASSWORD=secure_password
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "14268:14268" # HTTP collector
environment:
- COLLECTOR_OTLP_ENABLED=true
volumes:
caxton-data:
postgres-data:
Kubernetes Deployment
# k8s/deployment.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: caxton-server
labels:
app: caxton
spec:
serviceName: caxton
replicas: 3
selector:
matchLabels:
app: caxton
template:
metadata:
labels:
app: caxton
spec:
containers:
- name: caxton
image: caxton/caxton:latest
ports:
- containerPort: 8080
name: api
- containerPort: 9090
name: metrics
env:
- name: CAXTON_CONFIG_PATH
value: /etc/caxton/config.yaml
- name: RUST_LOG
value: info
volumeMounts:
- name: config
mountPath: /etc/caxton
- name: data
mountPath: /var/lib/caxton
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
volumes:
- name: config
configMap:
name: caxton-config
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 10Gi
Summary
This architecture provides:
- Type Safety: Comprehensive domain modeling with phantom types and smart constructors
- Observability: Built-in tracing, metrics, and structured logging
- Security: WebAssembly sandboxing with resource limits
- Scalability: Performance-optimized runtime with agent pooling
- Reliability: Comprehensive error handling and fault tolerance
- Production Ready: Full deployment and operational patterns
The architecture follows type-driven development principles, making illegal states unrepresentable while providing comprehensive observability and security for production multi-agent systems.