0015 Distributed Protocol Architecture
ADR-0015: Distributed Protocol Architecture - FIPA and SWIM Integration
Status
Proposed
Context
With the coordination-first architecture (ADR-0014), Caxton uses SWIM for cluster coordination and FIPA for agent messaging. This ADR clarifies how these protocols interact and addresses distributed systems concerns including network partitioning, consistency, and fault tolerance.
Decision
Protocol Layer Separation
Caxton implements a clear separation between coordination (SWIM) and communication (FIPA) protocols:
SWIM Protocol (Infrastructure Layer)
- Responsibility: Cluster membership and failure detection
- Scope: Caxton instance coordination
- Data: Instance liveness, agent registry, routing tables
- Consistency: Eventually consistent via gossip
- Failure Model: Crash-stop failures
FIPA Protocol (Application Layer)
- Responsibility: Agent-to-agent semantic messaging
- Scope: Business logic communication
- Data: Application messages, conversation state
- Consistency: Message ordering per conversation
- Failure Model: Handled by application
Cross-Cluster Agent Communication
pub struct DistributedMessageRouter {
// Local agent registry
local_agents: Arc<RwLock<HashMap<AgentId, LocalAgent>>>,
// Remote agent locations (learned via SWIM gossip)
remote_routes: Arc<RwLock<HashMap<AgentId, NodeId>>>,
// Cluster membership (SWIM)
cluster: SwimCluster,
// Message delivery tracking
in_flight: Arc<RwLock<HashMap<MessageId, InFlightMessage>>>,
}
impl DistributedMessageRouter {
pub async fn route_fipa_message(&self, msg: FipaMessage) -> Result<DeliveryStatus> {
let target = &msg.receiver;
// Try local delivery first
if let Some(agent) = self.local_agents.read().await.get(target) {
return agent.deliver(msg).await;
}
// Check remote routing table
if let Some(node_id) = self.remote_routes.read().await.get(target) {
// Verify node is still alive (SWIM)
if self.cluster.is_alive(node_id) {
return self.forward_to_remote(node_id, msg).await;
}
}
// Agent location unknown - initiate discovery
self.discover_and_route(msg).await
}
async fn forward_to_remote(&self, node: &NodeId, msg: FipaMessage) -> Result<DeliveryStatus> {
// Track in-flight message
self.track_message(&msg).await;
// Forward via cluster network
let result = self.cluster.send_to_node(node, Payload::FipaMessage(msg)).await;
// Update delivery status
self.update_delivery_status(&msg.message_id, result).await
}
}
Network Partition Handling
Detection Strategy
pub struct PartitionManager {
membership: SwimMembership,
config: PartitionConfig,
}
impl PartitionManager {
pub fn evaluate_partition_state(&self) -> PartitionState {
let alive = self.membership.alive_nodes();
let total = self.config.expected_cluster_size;
let quorum = total / 2 + 1;
match alive.len() {
n if n >= quorum => PartitionState::Majority {
missing: total - n,
can_write: true,
},
n if n > 0 => PartitionState::Minority {
available: n,
can_write: false,
},
_ => PartitionState::Isolated,
}
}
pub async fn handle_partition(&self, state: PartitionState) -> Result<()> {
match state {
PartitionState::Majority { .. } => {
// Continue normal operations
// Mark missing nodes as failed
self.mark_unreachable_nodes().await?;
}
PartitionState::Minority { .. } => {
// Degrade to read-only mode
self.enter_degraded_mode().await?;
// Queue writes for later
self.enable_write_queue().await?;
}
PartitionState::Isolated => {
// Local-only mode
self.disable_remote_routing().await?;
}
}
Ok(())
}
}
Healing After Partition
pub struct PartitionHealer {
detector: PartitionDetector,
synchronizer: StateSynchronizer,
}
impl PartitionHealer {
pub async fn heal_partition(&self) -> Result<()> {
// Detect partition healing
if self.detector.is_healed() {
// Exchange vector clocks
let divergence = self.synchronizer.detect_divergence().await?;
// Merge agent registries
self.merge_registries(divergence.registries).await?;
// Replay queued messages
self.replay_queued_messages().await?;
// Resume normal operations
self.exit_degraded_mode().await?;
}
Ok(())
}
}
Consistency Models
Agent Registry (Eventually Consistent)
pub struct AgentRegistry {
local: HashMap<AgentId, AgentMetadata>,
vector_clock: VectorClock,
tombstones: HashMap<AgentId, Timestamp>,
}
impl AgentRegistry {
pub async fn merge_with_peer(&mut self, peer_registry: &AgentRegistry) {
for (agent_id, peer_meta) in &peer_registry.local {
match self.local.get(agent_id) {
Some(local_meta) => {
// Resolve conflict using vector clocks
if peer_registry.vector_clock.happens_after(&self.vector_clock, agent_id) {
self.local.insert(agent_id.clone(), peer_meta.clone());
}
}
None => {
// Check if we have a tombstone
if !self.is_tombstoned(agent_id) {
self.local.insert(agent_id.clone(), peer_meta.clone());
}
}
}
}
// Merge vector clocks
self.vector_clock.merge(&peer_registry.vector_clock);
}
}
Message Ordering (Per-Conversation)
pub struct ConversationManager {
conversations: HashMap<ConversationId, Conversation>,
}
impl ConversationManager {
pub async fn handle_message(&mut self, msg: FipaMessage) -> Result<()> {
let conv_id = msg.conversation_id.as_ref()
.ok_or(Error::NoConversationId)?;
let conversation = self.conversations.entry(conv_id.clone())
.or_insert_with(|| Conversation::new(conv_id.clone()));
// Ensure message ordering within conversation
conversation.add_message(msg).await?;
// Process in order
while let Some(next_msg) = conversation.next_in_sequence() {
self.process_message(next_msg).await?;
}
Ok(())
}
}
Fault Tolerance Mechanisms
Circuit Breaker for Remote Calls
pub struct RemoteCallCircuitBreaker {
state: Arc<RwLock<CircuitState>>,
failure_threshold: usize,
timeout: Duration,
}
impl RemoteCallCircuitBreaker {
pub async fn call<F, T>(&self, f: F) -> Result<T>
where
F: Future<Output = Result<T>>,
{
let state = self.state.read().await;
match *state {
CircuitState::Open => {
Err(Error::CircuitOpen)
}
CircuitState::HalfOpen => {
// Try one request
match timeout(self.timeout, f).await {
Ok(Ok(result)) => {
self.reset().await;
Ok(result)
}
_ => {
self.trip().await;
Err(Error::CircuitTripped)
}
}
}
CircuitState::Closed => {
match timeout(self.timeout, f).await {
Ok(Ok(result)) => Ok(result),
_ => {
self.record_failure().await;
Err(Error::RemoteCallFailed)
}
}
}
}
}
}
Supervisor Trees for Agents
pub struct AgentSupervisor {
strategy: SupervisionStrategy,
children: HashMap<AgentId, AgentHandle>,
}
impl AgentSupervisor {
pub async fn supervise(&mut self) {
loop {
tokio::select! {
Some(failure) = self.receive_failure() => {
match self.strategy {
SupervisionStrategy::OneForOne => {
self.restart_agent(failure.agent_id).await;
}
SupervisionStrategy::OneForAll => {
self.restart_all_agents().await;
}
SupervisionStrategy::RestForOne => {
self.restart_dependent_agents(failure.agent_id).await;
}
}
}
}
}
}
}
Message Delivery Guarantees
Configurable Delivery Semantics
pub enum DeliveryGuarantee {
AtMostOnce, // Fire and forget (default)
AtLeastOnce, // Retry with deduplication
ExactlyOnce, // Idempotent with sequence numbers
}
pub struct MessageDelivery {
guarantee: DeliveryGuarantee,
dedup_cache: LruCache<MessageId, DeliveryStatus>,
}
impl MessageDelivery {
pub async fn deliver(&mut self, msg: FipaMessage) -> Result<DeliveryStatus> {
match self.guarantee {
DeliveryGuarantee::AtMostOnce => {
self.send_once(msg).await
}
DeliveryGuarantee::AtLeastOnce => {
// Check dedup cache
if let Some(status) = self.dedup_cache.get(&msg.message_id) {
return Ok(status.clone());
}
let status = self.send_with_retry(msg).await?;
self.dedup_cache.put(msg.message_id, status.clone());
Ok(status)
}
DeliveryGuarantee::ExactlyOnce => {
self.send_idempotent(msg).await
}
}
}
}
Consequences
Positive
- Clear separation of concerns: SWIM handles infrastructure, FIPA handles application
- Graceful degradation: System continues functioning during partitions
- Flexible consistency: Eventually consistent for coordination, stronger guarantees available when needed
- Fault isolation: Agent failures don’t affect cluster coordination
- Scalable design: Can handle thousands of agents across dozens of instances
Negative
- Complexity: Two protocols to understand and maintain
- Eventual consistency: Agent registry may be temporarily inconsistent
- Network overhead: Gossip protocol generates background traffic
- Partition handling: Requires careful consideration of business requirements
Neutral
- Standard distributed systems patterns apply
- Similar complexity to other distributed agent systems
- Trade-offs are well-understood in the industry
Implementation Priorities
- Phase 1: Basic SWIM integration for membership
- Phase 2: Agent registry gossip and routing
- Phase 3: Partition detection and handling
- Phase 4: Advanced features (consensus, exactly-once delivery)
Implementation Details
Technology Selection
SWIM Implementation
[dependencies]
# Primary choice: memberlist-rs (Rust port of HashiCorp's memberlist)
memberlist = { git = "https://github.com/vectordotdev/memberlist-rs" }
# Alternative: Build on swimmer (pure Rust SWIM)
# swimmer = "0.1"
# Network transport
quinn = "0.10" # QUIC for better performance
tokio = { version = "1.0", features = ["full"] }
Message Serialization
# MessagePack for efficiency and schema evolution
rmp-serde = "1.1" # MessagePack serialization
serde = { version = "1.0", features = ["derive"] }
# Alternative: Protocol Buffers for stricter schemas
# prost = "0.12"
# prost-build = "0.12"
Network Transport
pub enum TransportLayer {
// TCP for reliability (default)
Tcp(TcpConfig),
// QUIC for performance (recommended)
Quic(QuicConfig),
// Unix sockets for local testing
Unix(UnixConfig),
}
pub struct TransportConfig {
// TCP Configuration
tcp: TcpConfig {
nodelay: true, // Disable Nagle's algorithm
keepalive: Some(30s), // TCP keepalive
buffer_size: 64KB, // Socket buffer size
},
// QUIC Configuration (recommended for production)
quic: QuicConfig {
max_streams: 100,
idle_timeout: 30s,
congestion_control: CongestionControl::Bbr,
},
}
SWIM Library Integration
use memberlist::{Memberlist, Config, Node, NodeState};
pub struct SwimCluster {
memberlist: Arc<Memberlist>,
delegate: Arc<CaxtonDelegate>,
}
impl SwimCluster {
pub async fn new(config: ClusterConfig) -> Result<Self> {
let mut ml_config = Config::default();
// Configure SWIM parameters
ml_config.gossip_interval = config.gossip_interval;
ml_config.gossip_nodes = config.gossip_fanout;
ml_config.probe_interval = config.probe_interval;
ml_config.probe_timeout = config.probe_timeout;
// Set up delegates for custom behavior
let delegate = Arc::new(CaxtonDelegate::new());
ml_config.delegate = Some(delegate.clone());
// Initialize memberlist
let memberlist = Memberlist::create(ml_config).await?;
Ok(Self {
memberlist: Arc::new(memberlist),
delegate,
})
}
pub async fn join(&self, seeds: Vec<String>) -> Result<()> {
self.memberlist.join(seeds).await?;
Ok(())
}
}
// Custom delegate for Caxton-specific behavior
struct CaxtonDelegate {
agent_registry: Arc<RwLock<AgentRegistry>>,
event_handler: Arc<EventHandler>,
}
impl memberlist::Delegate for CaxtonDelegate {
fn node_meta(&self, limit: usize) -> Vec<u8> {
// Include agent count and capabilities in metadata
let meta = NodeMetadata {
agent_count: self.agent_registry.read().len(),
capabilities: self.capabilities(),
version: env!("CARGO_PKG_VERSION"),
};
rmp_serde::to_vec(&meta).unwrap()
}
fn notify_msg(&self, msg: &[u8]) {
// Handle custom messages (agent updates, routing info)
if let Ok(update) = rmp_serde::from_slice::<AgentUpdate>(msg) {
self.handle_agent_update(update);
}
}
fn get_broadcast(&self, overhead: usize, limit: usize) -> Option<Vec<u8>> {
// Broadcast agent registry changes
self.agent_registry.read().get_pending_broadcasts(limit)
}
}
Message Serialization Format
use serde::{Serialize, Deserialize};
use rmp_serde;
// MessagePack serialization for FIPA messages
#[derive(Serialize, Deserialize)]
pub struct WireFipaMessage {
// Header fields (compact representation)
#[serde(rename = "p")]
performative: u8, // Enum as u8 for compactness
#[serde(rename = "s")]
sender: CompactAgentId,
#[serde(rename = "r")]
receiver: CompactAgentId,
#[serde(rename = "c", skip_serializing_if = "Option::is_none")]
conversation_id: Option<Uuid>,
// Payload
#[serde(rename = "b")]
body: Vec<u8>, // Pre-serialized content
}
// Compact agent ID representation
#[derive(Serialize, Deserialize)]
pub struct CompactAgentId {
#[serde(rename = "n")]
node: u32, // Node index in cluster
#[serde(rename = "a")]
agent: u32, // Agent index on node
}
impl WireFipaMessage {
pub fn serialize(&self) -> Result<Vec<u8>> {
rmp_serde::to_vec_named(self)
.map_err(|e| Error::Serialization(e))
}
pub fn deserialize(data: &[u8]) -> Result<Self> {
rmp_serde::from_slice(data)
.map_err(|e| Error::Deserialization(e))
}
}
Network Transport Details
// QUIC transport for better performance
pub struct QuicTransport {
endpoint: quinn::Endpoint,
connections: Arc<RwLock<HashMap<NodeId, quinn::Connection>>>,
}
impl QuicTransport {
pub async fn new(config: QuicConfig) -> Result<Self> {
let mut endpoint_config = quinn::ServerConfig::with_crypto(
Arc::new(rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(cert_chain, key)?)
);
// Configure transport parameters
let mut transport_config = quinn::TransportConfig::default();
transport_config.max_concurrent_uni_streams(0_u8.into());
transport_config.max_concurrent_bidi_streams(100_u8.into());
transport_config.max_idle_timeout(Some(Duration::from_secs(30).try_into()?));
endpoint_config.transport = Arc::new(transport_config);
let endpoint = quinn::Endpoint::server(endpoint_config, addr)?;
Ok(Self {
endpoint,
connections: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn send(&self, node: &NodeId, data: Vec<u8>) -> Result<()> {
let conn = self.get_or_create_connection(node).await?;
let mut stream = conn.open_uni().await?;
stream.write_all(&data).await?;
stream.finish().await?;
Ok(())
}
}
Bootstrap Configuration
# Cluster bootstrap configuration
coordination:
cluster:
# SWIM protocol settings
swim:
# Use memberlist-rs library
implementation: memberlist-rs
# Gossip parameters
gossip_interval: 200ms
gossip_fanout: 3
gossip_to_dead: 3
# Failure detection
probe_interval: 1s
probe_timeout: 500ms
suspicion_multiplier: 4
# Network transport
transport:
type: quic # tcp | quic | unix
bind_addr: 0.0.0.0:7946
advertise_addr: auto # auto-detect or specify
# Message serialization
serialization:
format: messagepack # messagepack | protobuf
compression: lz4 # none | lz4 | zstd
# Security
security:
encryption: true
auth_key: ${CLUSTER_AUTH_KEY}