Chapter 4.4: Authentication and Authorization
Security is critical for event-sourced systems. This chapter covers authentication (who you are) and authorization (what you can do) patterns for EventCore APIs.
Authentication Strategies
JWT Authentication
JSON Web Tokens are stateless and work well with EventCore:
#![allow(unused)] fn main() { use jsonwebtoken::{encode, decode, Header, Algorithm, Validation, EncodingKey, DecodingKey}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] struct Claims { sub: String, // Subject (user ID) exp: usize, // Expiration time iat: usize, // Issued at roles: Vec<String>, // User roles permissions: Vec<String>, // Specific permissions } #[derive(Clone)] struct JwtConfig { secret: String, issuer: String, audience: String, access_token_duration: Duration, refresh_token_duration: Duration, } impl JwtConfig { fn create_access_token(&self, user: &User) -> Result<String, ApiError> { let now = Utc::now(); let exp = now + self.access_token_duration; let claims = Claims { sub: user.id.to_string(), exp: exp.timestamp() as usize, iat: now.timestamp() as usize, roles: user.roles.clone(), permissions: user.permissions.clone(), }; encode( &Header::default(), &claims, &EncodingKey::from_secret(self.secret.as_ref()), ) .map_err(|_| ApiError::internal("Failed to create token")) } fn validate_token(&self, token: &str) -> Result<Claims, ApiError> { let mut validation = Validation::new(Algorithm::HS256); validation.set_issuer(&[&self.issuer]); validation.set_audience(&[&self.audience]); decode::<Claims>( token, &DecodingKey::from_secret(self.secret.as_ref()), &validation, ) .map(|data| data.claims) .map_err(|e| match e.kind() { jsonwebtoken::errors::ErrorKind::ExpiredSignature => { ApiError::unauthorized("Token expired") } _ => ApiError::unauthorized("Invalid token"), }) } } }
Login Endpoint
#![allow(unused)] fn main() { #[derive(Debug, Deserialize)] struct LoginRequest { email: String, password: String, } #[derive(Debug, Serialize)] struct LoginResponse { access_token: String, refresh_token: String, token_type: String, expires_in: u64, } async fn login( State(state): State<AppState>, Json(request): Json<LoginRequest>, ) -> Result<Json<LoginResponse>, ApiError> { // Validate credentials let email = Email::try_new(request.email) .map_err(|_| ApiError::bad_request("Invalid email"))?; // Execute authentication command let command = AuthenticateUser { email: email.clone(), password: Password::from(request.password), }; let result = state.executor .execute(&command) .await .map_err(|_| ApiError::unauthorized("Invalid credentials"))?; // Get user from projection let user = state.projections .read() .await .get::<UserProjection>() .unwrap() .get_user_by_email(&email) .await? .ok_or_else(|| ApiError::unauthorized("Invalid credentials"))?; // Create tokens let access_token = state.jwt_config.create_access_token(&user)?; let refresh_token = state.jwt_config.create_refresh_token(&user)?; // Store refresh token (for revocation) let store_command = StoreRefreshToken { user_id: user.id.clone(), token_hash: hash_token(&refresh_token), expires_at: Utc::now() + state.jwt_config.refresh_token_duration, }; state.executor.execute(&store_command).await?; Ok(Json(LoginResponse { access_token, refresh_token, token_type: "Bearer".to_string(), expires_in: state.jwt_config.access_token_duration.as_secs(), })) } }
Authentication Middleware
#![allow(unused)] fn main() { use axum::{ extract::{FromRequestParts, Request}, middleware::{self, Next}, response::Response, }; #[derive(Debug, Clone)] pub struct AuthenticatedUser { pub id: UserId, pub roles: Vec<String>, pub permissions: Vec<String>, } #[async_trait] impl<S> FromRequestParts<S> for AuthenticatedUser where S: Send + Sync, { type Rejection = ApiError; async fn from_request_parts( parts: &mut http::request::Parts, state: &S, ) -> Result<Self, Self::Rejection> { // Get JWT config from extensions (set by middleware) let jwt_config = parts .extensions .get::<JwtConfig>() .ok_or_else(|| ApiError::internal("JWT config not found"))?; // Extract token from Authorization header let token = extract_bearer_token(&parts.headers)?; // Validate token let claims = jwt_config.validate_token(token)?; Ok(AuthenticatedUser { id: UserId::try_new(claims.sub)?, roles: claims.roles, permissions: claims.permissions, }) } } fn extract_bearer_token(headers: &HeaderMap) -> Result<&str, ApiError> { headers .get(AUTHORIZATION) .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")) .ok_or_else(|| ApiError::unauthorized("Missing or invalid Authorization header")) } // Optional authentication extractor pub struct OptionalAuth(pub Option<AuthenticatedUser>); #[async_trait] impl<S> FromRequestParts<S> for OptionalAuth where S: Send + Sync, { type Rejection = Infallible; async fn from_request_parts( parts: &mut http::request::Parts, state: &S, ) -> Result<Self, Self::Rejection> { Ok(OptionalAuth( AuthenticatedUser::from_request_parts(parts, state) .await .ok() )) } } }
Authorization Patterns
Role-Based Access Control (RBAC)
#![allow(unused)] fn main() { #[derive(Debug, Clone, PartialEq, Eq, Hash)] enum Role { Admin, Manager, Employee, Guest, } impl AuthenticatedUser { pub fn has_role(&self, role: &str) -> bool { self.roles.contains(&role.to_string()) } pub fn has_any_role(&self, roles: &[&str]) -> bool { roles.iter().any(|role| self.has_role(role)) } pub fn has_all_roles(&self, roles: &[&str]) -> bool { roles.iter().all(|role| self.has_role(role)) } } // Authorization guard async fn require_role( user: &AuthenticatedUser, role: &str, ) -> Result<(), ApiError> { if !user.has_role(role) { return Err(ApiError::forbidden( format!("Requires {} role", role) )); } Ok(()) } // In handlers async fn admin_endpoint( user: AuthenticatedUser, // other params... ) -> Result<Json<AdminData>, ApiError> { require_role(&user, "admin").await?; // Admin-only logic... } }
Permission-Based Access Control
#![allow(unused)] fn main() { #[derive(Debug, Clone, PartialEq, Eq, Hash)] enum Permission { // Task permissions CreateTask, ReadTask, UpdateTask, DeleteTask, AssignTask, // User permissions CreateUser, ReadUser, UpdateUser, DeleteUser, // Admin permissions ViewAnalytics, ManageSystem, } impl AuthenticatedUser { pub fn has_permission(&self, permission: &str) -> bool { self.permissions.contains(&permission.to_string()) } pub fn can(&self, action: Permission) -> bool { self.has_permission(&action.to_string()) } } // Permission checking in handlers async fn create_task_handler( user: AuthenticatedUser, Json(request): Json<CreateTaskRequest>, ) -> Result<Json<CreateTaskResponse>, ApiError> { if !user.can(Permission::CreateTask) { return Err(ApiError::forbidden("Cannot create tasks")); } // Create task... } }
Resource-Based Access Control
#![allow(unused)] fn main() { #[async_trait] trait ResourceAuthorizer { async fn can_read(&self, user: &AuthenticatedUser, resource_id: &str) -> bool; async fn can_write(&self, user: &AuthenticatedUser, resource_id: &str) -> bool; async fn can_delete(&self, user: &AuthenticatedUser, resource_id: &str) -> bool; } struct TaskAuthorizer { projection: Arc<TaskProjection>, } #[async_trait] impl ResourceAuthorizer for TaskAuthorizer { async fn can_read(&self, user: &AuthenticatedUser, task_id: &str) -> bool { // Admins can read all if user.has_role("admin") { return true; } // Check if user owns or is assigned to task if let Ok(Some(task)) = self.projection.get_task(task_id).await { return task.created_by == user.id || task.assigned_to == Some(user.id.clone()); } false } async fn can_write(&self, user: &AuthenticatedUser, task_id: &str) -> bool { // Similar logic for write permissions if user.has_role("admin") || user.has_role("manager") { return true; } // Check ownership or assignment if let Ok(Some(task)) = self.projection.get_task(task_id).await { return task.assigned_to == Some(user.id.clone()); } false } async fn can_delete(&self, user: &AuthenticatedUser, task_id: &str) -> bool { // Only admins and creators can delete if user.has_role("admin") { return true; } if let Ok(Some(task)) = self.projection.get_task(task_id).await { return task.created_by == user.id; } false } } }
Command Authorization
Embed authorization in commands:
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct UpdateTask { #[stream] task_id: StreamId, title: Option<TaskTitle>, description: Option<TaskDescription>, // Who is making the change updated_by: UserId, } impl CommandLogic for UpdateTask { // ... other implementations async fn handle( &self, read_streams: ReadStreams<Self::StreamSet>, state: Self::State, _stream_resolver: &mut StreamResolver, ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> { // Check authorization within command require!( state.can_user_update_task(&self.updated_by), "User {} cannot update task {}", self.updated_by, self.task_id ); // Proceed with update... } } // State includes authorization data impl TaskState { fn can_user_update_task(&self, user_id: &UserId) -> bool { // Task creator can always update if self.created_by == *user_id { return true; } // Assigned user can update if self.assigned_to == Some(user_id.clone()) { return true; } // Check roles (would need to be passed in state) false } } }
API Key Authentication
For service-to-service communication:
#![allow(unused)] fn main() { #[derive(Debug, Clone)] struct ApiKey { key: String, service_name: String, permissions: Vec<String>, rate_limit: Option<u32>, } #[async_trait] impl<S> FromRequestParts<S> for ApiKey where S: Send + Sync, { type Rejection = ApiError; async fn from_request_parts( parts: &mut http::request::Parts, state: &S, ) -> Result<Self, Self::Rejection> { let key = parts .headers .get("X-API-Key") .and_then(|v| v.to_str().ok()) .ok_or_else(|| ApiError::unauthorized("Missing API key"))?; // Look up API key (from cache/database) let api_key = validate_api_key(key).await?; Ok(api_key) } } async fn validate_api_key(key: &str) -> Result<ApiKey, ApiError> { // Hash the key for lookup let key_hash = hash_api_key(key); // Look up in projection/cache let api_key = get_api_key_by_hash(&key_hash) .await? .ok_or_else(|| ApiError::unauthorized("Invalid API key"))?; // Check if expired if api_key.expires_at < Utc::now() { return Err(ApiError::unauthorized("API key expired")); } Ok(api_key) } }
OAuth2 Integration
For third-party authentication:
#![allow(unused)] fn main() { use oauth2::{ AuthorizationCode, AuthUrl, ClientId, ClientSecret, CsrfToken, PkceCodeChallenge, RedirectUrl, TokenResponse, TokenUrl, }; #[derive(Clone)] struct OAuth2Config { client_id: ClientId, client_secret: ClientSecret, auth_url: AuthUrl, token_url: TokenUrl, redirect_url: RedirectUrl, } async fn oauth_login( State(oauth): State<OAuth2Config>, Query(params): Query<HashMap<String, String>>, ) -> Result<Redirect, ApiError> { let client = BasicClient::new( oauth.client_id, Some(oauth.client_secret), oauth.auth_url, Some(oauth.token_url), ) .set_redirect_uri(oauth.redirect_url); // Generate PKCE challenge let (pkce_challenge, pkce_verifier) = PkceCodeChallenge::new_random_sha256(); // Generate authorization URL let (auth_url, csrf_token) = client .authorize_url(CsrfToken::new_random) .add_scope(Scope::new("read:user".to_string())) .set_pkce_challenge(pkce_challenge) .url(); // Store CSRF token and PKCE verifier (in session/cache) store_oauth_state(&csrf_token, &pkce_verifier).await?; Ok(Redirect::to(auth_url.as_str())) } async fn oauth_callback( State(state): State<AppState>, Query(params): Query<OAuthCallbackParams>, ) -> Result<Json<LoginResponse>, ApiError> { // Verify CSRF token let (stored_csrf, pkce_verifier) = get_oauth_state(¶ms.state).await?; if stored_csrf != params.state { return Err(ApiError::bad_request("Invalid state parameter")); } // Exchange code for token let token_result = exchange_code_for_token( &state.oauth_config, ¶ms.code, &pkce_verifier, ).await?; // Get user info from provider let user_info = fetch_user_info(&token_result.access_token()).await?; // Create or update user in EventCore let command = CreateOrUpdateOAuthUser { provider: "github".to_string(), provider_user_id: user_info.id, email: user_info.email, name: user_info.name, }; state.executor.execute(&command).await?; // Create JWT tokens let user = get_user_by_email(&user_info.email).await?; let access_token = state.jwt_config.create_access_token(&user)?; Ok(Json(LoginResponse { access_token, // ... other fields })) } }
Session Management
Track active sessions:
#![allow(unused)] fn main() { #[derive(Command, Clone)] struct CreateSession { #[stream] user_id: StreamId, #[stream] session_id: StreamId, ip_address: IpAddr, user_agent: String, expires_at: DateTime<Utc>, } #[derive(Command, Clone)] struct RevokeSession { #[stream] session_id: StreamId, #[stream] user_id: StreamId, reason: RevocationReason, } // Session validation middleware async fn validate_session( State(state): State<AppState>, user: AuthenticatedUser, request: Request, next: Next, ) -> Result<Response, ApiError> { let session_id = extract_session_id(&request)?; // Check if session is valid let session = state.projections .read() .await .get::<SessionProjection>() .unwrap() .get_session(&session_id) .await? .ok_or_else(|| ApiError::unauthorized("Invalid session"))?; // Verify session belongs to user if session.user_id != user.id { return Err(ApiError::unauthorized("Session mismatch")); } // Check expiration if session.expires_at < Utc::now() { return Err(ApiError::unauthorized("Session expired")); } // Check if revoked if session.revoked { return Err(ApiError::unauthorized("Session revoked")); } Ok(next.run(request).await) } }
Security Headers
Add security headers to all responses:
#![allow(unused)] fn main() { async fn security_headers_middleware( request: Request, next: Next, ) -> Response { let mut response = next.run(request).await; let headers = response.headers_mut(); // Prevent clickjacking headers.insert( "X-Frame-Options", HeaderValue::from_static("DENY"), ); // XSS protection headers.insert( "X-Content-Type-Options", HeaderValue::from_static("nosniff"), ); // CSP headers.insert( "Content-Security-Policy", HeaderValue::from_static("default-src 'self'"), ); // HSTS headers.insert( "Strict-Transport-Security", HeaderValue::from_static("max-age=31536000; includeSubDomains"), ); response } }
Testing Authentication
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; fn create_test_token(user_id: &str, roles: Vec<&str>) -> String { let claims = Claims { sub: user_id.to_string(), exp: (Utc::now() + Duration::hours(1)).timestamp() as usize, iat: Utc::now().timestamp() as usize, roles: roles.into_iter().map(|s| s.to_string()).collect(), permissions: vec![], }; encode( &Header::default(), &claims, &EncodingKey::from_secret(TEST_SECRET.as_ref()), ).unwrap() } #[tokio::test] async fn test_authentication_required() { let app = create_test_app(); // No token let response = app .oneshot( Request::builder() .uri("/api/v1/protected") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::UNAUTHORIZED); } #[tokio::test] async fn test_role_authorization() { let app = create_test_app(); // User token without admin role let token = create_test_token("user123", vec!["user"]); let response = app .oneshot( Request::builder() .uri("/api/v1/admin/users") .header("Authorization", format!("Bearer {}", token)) .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!(response.status(), StatusCode::FORBIDDEN); } } }
Best Practices
- Use HTTPS always - Never send tokens over unencrypted connections
- Short token lifetimes - Access tokens should expire quickly
- Refresh tokens - Use refresh tokens for long-lived sessions
- Store hashes - Never store plaintext tokens or passwords
- Audit everything - Log all authentication/authorization events
- Principle of least privilege - Grant minimal necessary permissions
- Defense in depth - Layer multiple security mechanisms
- Regular reviews - Audit permissions and access regularly
Summary
Authentication and authorization in EventCore:
- ✅ Flexible strategies - JWT, API keys, OAuth2
- ✅ Strong typing - Type-safe user and permission models
- ✅ Event sourced - Authentication events provide audit trail
- ✅ Performance - Caching for fast authorization checks
- ✅ Testable - Easy to test security rules
Key patterns:
- Authenticate early in the request pipeline
- Embed authorization in commands
- Use projections for fast permission lookups
- Audit all security events
- Test security thoroughly
Next, let’s explore API Versioning →