Chapter 4.3: Query Endpoints
Query endpoints serve read requests from your projections. Unlike commands that modify state, queries are side-effect free and can be cached, making them perfect for high-performance read operations.
Query Architecture
HTTP Request → Authenticate → Authorize → Query Projection → Format Response
↑
Read Model Store
Basic Query Pattern
Simple Query Endpoint
#![allow(unused)] fn main() { use axum::{ extract::{State, Path, Query as QueryParams}, Json, }; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize)] struct ListTasksQuery { #[serde(default)] status: Option<TaskStatus>, #[serde(default)] assigned_to: Option<String>, #[serde(default = "default_page")] page: u32, #[serde(default = "default_page_size")] page_size: u32, } fn default_page() -> u32 { 1 } fn default_page_size() -> u32 { 20 } #[derive(Debug, Serialize)] struct ListTasksResponse { tasks: Vec<TaskSummary>, pagination: PaginationInfo, } #[derive(Debug, Serialize)] struct TaskSummary { id: String, title: String, status: TaskStatus, assigned_to: Option<String>, created_at: DateTime<Utc>, updated_at: DateTime<Utc>, } #[derive(Debug, Serialize)] struct PaginationInfo { page: u32, page_size: u32, total_items: u64, total_pages: u32, } async fn list_tasks( State(state): State<AppState>, QueryParams(query): QueryParams<ListTasksQuery>, ) -> Result<Json<ListTasksResponse>, ApiError> { // Get projection let projection = state.projections .read() .await .get::<TaskListProjection>() .ok_or_else(|| ApiError::internal("Task projection not available"))?; // Apply filters let mut tasks = projection.get_all_tasks(); if let Some(status) = query.status { tasks.retain(|t| t.status == status); } if let Some(assigned_to) = query.assigned_to { tasks.retain(|t| t.assigned_to.as_ref() == Some(&assigned_to)); } // Calculate pagination let total_items = tasks.len() as u64; let total_pages = ((total_items as f32) / (query.page_size as f32)).ceil() as u32; // Apply pagination let start = ((query.page - 1) * query.page_size) as usize; let end = (start + query.page_size as usize).min(tasks.len()); let page_tasks = tasks[start..end].to_vec(); Ok(Json(ListTasksResponse { tasks: page_tasks.into_iter().map(Into::into).collect(), pagination: PaginationInfo { page: query.page, page_size: query.page_size, total_items, total_pages, }, })) } }
Advanced Query Patterns
Filtering and Sorting
#![allow(unused)] fn main() { #[derive(Debug, Deserialize)] #[serde(rename_all = "snake_case")] enum SortField { CreatedAt, UpdatedAt, Title, Priority, DueDate, } #[derive(Debug, Deserialize)] #[serde(rename_all = "snake_case")] enum SortOrder { Asc, Desc, } #[derive(Debug, Deserialize)] struct AdvancedTaskQuery { // Filters #[serde(default)] status: Option<Vec<TaskStatus>>, #[serde(default)] assigned_to: Option<Vec<String>>, #[serde(default)] created_after: Option<DateTime<Utc>>, #[serde(default)] created_before: Option<DateTime<Utc>>, #[serde(default)] search: Option<String>, // Sorting #[serde(default = "default_sort_field")] sort_by: SortField, #[serde(default = "default_sort_order")] sort_order: SortOrder, // Pagination #[serde(default)] cursor: Option<String>, #[serde(default = "default_limit")] limit: u32, } fn default_sort_field() -> SortField { SortField::CreatedAt } fn default_sort_order() -> SortOrder { SortOrder::Desc } fn default_limit() -> u32 { 50 } async fn search_tasks( State(state): State<AppState>, QueryParams(query): QueryParams<AdvancedTaskQuery>, ) -> Result<Json<CursorPaginatedResponse<TaskSummary>>, ApiError> { let projection = state.projections .read() .await .get::<TaskSearchProjection>() .ok_or_else(|| ApiError::internal("Search projection not available"))?; // Build query let mut search_query = SearchQuery::new(); if let Some(statuses) = query.status { search_query = search_query.with_status_in(statuses); } if let Some(assignees) = query.assigned_to { search_query = search_query.with_assignee_in(assignees); } if let Some(after) = query.created_after { search_query = search_query.created_after(after); } if let Some(before) = query.created_before { search_query = search_query.created_before(before); } if let Some(search_text) = query.search { search_query = search_query.with_text_search(search_text); } // Apply sorting search_query = match query.sort_by { SortField::CreatedAt => search_query.sort_by_created_at(query.sort_order), SortField::UpdatedAt => search_query.sort_by_updated_at(query.sort_order), SortField::Title => search_query.sort_by_title(query.sort_order), SortField::Priority => search_query.sort_by_priority(query.sort_order), SortField::DueDate => search_query.sort_by_due_date(query.sort_order), }; // Apply cursor pagination if let Some(cursor) = query.cursor { search_query = search_query.after_cursor(Cursor::decode(&cursor)?); } search_query = search_query.limit(query.limit); // Execute query let results = projection.search(search_query).await?; Ok(Json(results)) } }
Aggregation Queries
#![allow(unused)] fn main() { #[derive(Debug, Serialize)] struct TaskStatistics { total_tasks: u64, tasks_by_status: HashMap<TaskStatus, u64>, tasks_by_assignee: Vec<AssigneeStats>, completion_rate: f64, average_completion_time: Option<Duration>, overdue_tasks: u64, } #[derive(Debug, Serialize)] struct AssigneeStats { assignee_id: String, assignee_name: String, total_tasks: u64, completed_tasks: u64, in_progress_tasks: u64, } async fn get_task_statistics( State(state): State<AppState>, QueryParams(query): QueryParams<DateRangeQuery>, ) -> Result<Json<TaskStatistics>, ApiError> { let projection = state.projections .read() .await .get::<TaskAnalyticsProjection>() .ok_or_else(|| ApiError::internal("Analytics projection not available"))?; let stats = projection.calculate_statistics( query.start_date, query.end_date, ).await?; Ok(Json(stats)) } // Time-series data #[derive(Debug, Serialize)] struct TimeSeriesData { period: String, data_points: Vec<DataPoint>, } #[derive(Debug, Serialize)] struct DataPoint { timestamp: DateTime<Utc>, value: f64, metadata: Option<serde_json::Value>, } async fn get_task_completion_trend( State(state): State<AppState>, QueryParams(query): QueryParams<TimeSeriesQuery>, ) -> Result<Json<TimeSeriesData>, ApiError> { let projection = state.projections .read() .await .get::<TaskMetricsProjection>() .ok_or_else(|| ApiError::internal("Metrics projection not available"))?; let data = projection.get_completion_trend( query.start_date, query.end_date, query.granularity, ).await?; Ok(Json(data)) } }
GraphQL Integration
For complex queries, GraphQL can be more efficient:
#![allow(unused)] fn main() { use async_graphql::{ Context, Object, Schema, EmptyMutation, EmptySubscription, ID, Result as GraphQLResult, }; struct QueryRoot; #[Object] impl QueryRoot { async fn task(&self, ctx: &Context<'_>, id: ID) -> GraphQLResult<Option<Task>> { let projection = ctx.data::<Arc<TaskProjection>>()?; Ok(projection.get_task(&id.to_string()).await?) } async fn tasks( &self, ctx: &Context<'_>, filter: Option<TaskFilter>, sort: Option<TaskSort>, pagination: Option<PaginationInput>, ) -> GraphQLResult<TaskConnection> { let projection = ctx.data::<Arc<TaskProjection>>()?; let query = build_query(filter, sort, pagination); let results = projection.query(query).await?; Ok(TaskConnection::from(results)) } async fn user(&self, ctx: &Context<'_>, id: ID) -> GraphQLResult<Option<User>> { let projection = ctx.data::<Arc<UserProjection>>()?; Ok(projection.get_user(&id.to_string()).await?) } } // GraphQL types #[derive(async_graphql::SimpleObject)] struct Task { id: ID, title: String, description: String, status: TaskStatus, assigned_to: Option<User>, created_at: DateTime<Utc>, updated_at: DateTime<Utc>, } #[derive(async_graphql::InputObject)] struct TaskFilter { status: Option<Vec<TaskStatus>>, assigned_to: Option<Vec<ID>>, created_after: Option<DateTime<Utc>>, search: Option<String>, } // Axum handler async fn graphql_handler( State(state): State<AppState>, user: Option<AuthenticatedUser>, req: GraphQLRequest, ) -> GraphQLResponse { let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription) .data(state.projections.clone()) .data(user) .finish(); schema.execute(req.into_inner()).await.into() } }
Caching Strategies
Response Caching
#![allow(unused)] fn main() { use axum::http::header::{CACHE_CONTROL, ETAG, IF_NONE_MATCH}; use sha2::{Sha256, Digest}; #[derive(Clone)] struct CacheConfig { public_max_age: Duration, private_max_age: Duration, stale_while_revalidate: Duration, } async fn cached_query_handler<F, Fut, T>( headers: HeaderMap, cache_config: CacheConfig, query_fn: F, ) -> Response where F: FnOnce() -> Fut, Fut: Future<Output = Result<T, ApiError>>, T: Serialize, { // Execute query let result = match query_fn().await { Ok(data) => data, Err(e) => return e.into_response(), }; // Serialize response let body = match serde_json::to_vec(&result) { Ok(bytes) => bytes, Err(_) => return ApiError::internal("Serialization failed").into_response(), }; // Calculate ETag let mut hasher = Sha256::new(); hasher.update(&body); let etag = format!("\"{}\"", hex::encode(hasher.finalize())); // Check If-None-Match if let Some(if_none_match) = headers.get(IF_NONE_MATCH) { if if_none_match.to_str().ok() == Some(&etag) { return StatusCode::NOT_MODIFIED.into_response(); } } // Build response with caching headers Response::builder() .status(StatusCode::OK) .header(CONTENT_TYPE, "application/json") .header(ETAG, &etag) .header( CACHE_CONTROL, format!( "public, max-age={}, stale-while-revalidate={}", cache_config.public_max_age.as_secs(), cache_config.stale_while_revalidate.as_secs() ) ) .body(Body::from(body)) .unwrap() } // Usage async fn get_public_statistics( State(state): State<AppState>, headers: HeaderMap, ) -> Response { cached_query_handler( headers, CacheConfig { public_max_age: Duration::from_secs(300), // 5 minutes private_max_age: Duration::from_secs(0), stale_while_revalidate: Duration::from_secs(60), }, || async { let projection = state.projections .read() .await .get::<PublicStatsProjection>() .ok_or_else(|| ApiError::internal("Stats not available"))?; projection.get_current_stats().await }, ).await } }
Query Result Caching
#![allow(unused)] fn main() { use moka::future::Cache; #[derive(Clone)] struct QueryCache { cache: Cache<String, CachedResult>, } #[derive(Clone)] struct CachedResult { data: Vec<u8>, cached_at: DateTime<Utc>, ttl: Duration, } impl QueryCache { fn new() -> Self { Self { cache: Cache::builder() .max_capacity(10_000) .time_to_live(Duration::from_secs(300)) .build(), } } async fn get_or_compute<F, Fut, T>( &self, key: &str, ttl: Duration, compute_fn: F, ) -> Result<T, ApiError> where F: FnOnce() -> Fut, Fut: Future<Output = Result<T, ApiError>>, T: Serialize + DeserializeOwned, { // Check cache if let Some(cached) = self.cache.get(key).await { if Utc::now() - cached.cached_at < cached.ttl { return serde_json::from_slice(&cached.data) .map_err(|_| ApiError::internal("Cache deserialization failed")); } } // Compute result let result = compute_fn().await?; // Cache result let data = serde_json::to_vec(&result) .map_err(|_| ApiError::internal("Cache serialization failed"))?; self.cache.insert( key.to_string(), CachedResult { data, cached_at: Utc::now(), ttl, } ).await; Ok(result) } } }
Real-time Queries with SSE
Server-Sent Events for live updates:
#![allow(unused)] fn main() { use axum::response::sse::{Event, Sse}; use futures::stream::Stream; use tokio_stream::StreamExt; async fn task_updates_stream( State(state): State<AppState>, user: AuthenticatedUser, ) -> Sse<impl Stream<Item = Result<Event, ApiError>>> { let stream = async_stream::stream! { let mut subscription = state.projections .read() .await .get::<TaskProjection>() .unwrap() .subscribe_to_updates(user.id) .await; while let Some(update) = subscription.next().await { let event = match update { TaskUpdate::Created(task) => { Event::default() .event("task-created") .json_data(task) .unwrap() } TaskUpdate::Updated(task) => { Event::default() .event("task-updated") .json_data(task) .unwrap() } TaskUpdate::Deleted(task_id) => { Event::default() .event("task-deleted") .data(task_id) } }; yield Ok(event); } }; Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(30)) .text("keep-alive") ) } }
Query Performance Optimization
N+1 Query Prevention
#![allow(unused)] fn main() { // Bad: N+1 queries async fn get_tasks_with_assignees_bad( projection: &TaskProjection, ) -> Result<Vec<TaskWithAssignee>, ApiError> { let tasks = projection.get_all_tasks().await?; let mut results = Vec::new(); for task in tasks { // This makes a separate query for each task! let assignee = if let Some(assignee_id) = &task.assigned_to { projection.get_user(assignee_id).await? } else { None }; results.push(TaskWithAssignee { task, assignee, }); } Ok(results) } // Good: Batch loading async fn get_tasks_with_assignees_good( projection: &TaskProjection, ) -> Result<Vec<TaskWithAssignee>, ApiError> { let tasks = projection.get_all_tasks().await?; // Collect all assignee IDs let assignee_ids: HashSet<_> = tasks .iter() .filter_map(|t| t.assigned_to.as_ref()) .cloned() .collect(); // Load all assignees in one query let assignees = projection .get_users_by_ids(assignee_ids.into_iter().collect()) .await?; // Build results let assignee_map: HashMap<_, _> = assignees .into_iter() .map(|u| (u.id.clone(), u)) .collect(); Ok(tasks.into_iter().map(|task| { let assignee = task.assigned_to .as_ref() .and_then(|id| assignee_map.get(id)) .cloned(); TaskWithAssignee { task, assignee } }).collect()) } }
Query Complexity Limits
#![allow(unused)] fn main() { use async_graphql::{extensions::ComplexityLimit, ValidationResult}; struct QueryComplexity; impl QueryComplexity { fn calculate_complexity(query: &GraphQLQuery) -> u32 { // Simple heuristic: count fields and multiply by depth let field_count = count_fields(query); let max_depth = calculate_max_depth(query); field_count * max_depth } } // In GraphQL schema let schema = Schema::build(QueryRoot, EmptyMutation, EmptySubscription) .extension(ComplexityLimit::new(1000)) // Max complexity .finish(); // For REST endpoints #[derive(Debug)] struct QueryComplexityGuard { max_items: u32, max_depth: u32, } impl QueryComplexityGuard { fn validate(&self, query: &AdvancedTaskQuery) -> Result<(), ApiError> { // Check pagination limits if query.limit > self.max_items { return Err(ApiError::bad_request( format!("Limit cannot exceed {}", self.max_items) )); } // Check filter complexity let filter_count = query.status.as_ref().map(|s| s.len()).unwrap_or(0) + query.assigned_to.as_ref().map(|a| a.len()).unwrap_or(0); if filter_count > 100 { return Err(ApiError::bad_request( "Too many filter values" )); } Ok(()) } } }
Security Considerations
Query Authorization
#![allow(unused)] fn main() { #[async_trait] trait QueryAuthorizer { async fn can_view_task(&self, user: &AuthenticatedUser, task_id: &str) -> bool; async fn can_view_user_tasks(&self, user: &AuthenticatedUser, target_user_id: &str) -> bool; async fn can_view_statistics(&self, user: &AuthenticatedUser) -> bool; } struct RoleBasedAuthorizer; #[async_trait] impl QueryAuthorizer for RoleBasedAuthorizer { async fn can_view_task(&self, user: &AuthenticatedUser, task_id: &str) -> bool { // Admin can see all if user.has_role("admin") { return true; } // Others can only see their own tasks or tasks they created // Would need to check task details... true } async fn can_view_user_tasks(&self, user: &AuthenticatedUser, target_user_id: &str) -> bool { // Users can see their own tasks if user.id.to_string() == target_user_id { return true; } // Managers can see their team's tasks user.has_role("manager") || user.has_role("admin") } async fn can_view_statistics(&self, user: &AuthenticatedUser) -> bool { user.has_role("manager") || user.has_role("admin") } } // Use in handlers async fn get_user_tasks( State(state): State<AppState>, Path(user_id): Path<String>, user: AuthenticatedUser, ) -> Result<Json<Vec<TaskSummary>>, ApiError> { // Check authorization if !state.authorizer.can_view_user_tasks(&user, &user_id).await { return Err(ApiError::forbidden("Cannot view tasks for this user")); } // Continue with query... } }
Rate Limiting
#![allow(unused)] fn main() { use governor::{Quota, RateLimiter}; #[derive(Clone)] struct RateLimitConfig { anonymous_quota: Quota, authenticated_quota: Quota, admin_quota: Quota, } async fn rate_limit_middleware( State(limiter): State<Arc<RateLimiter<String>>>, user: Option<AuthenticatedUser>, request: Request, next: Next, ) -> Result<Response, ApiError> { let key = match &user { Some(u) => u.id.to_string(), None => request .headers() .get("x-forwarded-for") .and_then(|h| h.to_str().ok()) .unwrap_or("anonymous") .to_string(), }; limiter .check_key(&key) .map_err(|_| ApiError::too_many_requests("Rate limit exceeded"))?; Ok(next.run(request).await) } }
Testing Query Endpoints
#![allow(unused)] fn main() { #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_pagination() { let state = create_test_state_with_tasks(100).await; // First page let response = list_tasks( State(state.clone()), QueryParams(ListTasksQuery { page: 1, page_size: 20, ..Default::default() }), ).await.unwrap(); assert_eq!(response.0.tasks.len(), 20); assert_eq!(response.0.pagination.total_items, 100); assert_eq!(response.0.pagination.total_pages, 5); // Last page let response = list_tasks( State(state), QueryParams(ListTasksQuery { page: 5, page_size: 20, ..Default::default() }), ).await.unwrap(); assert_eq!(response.0.tasks.len(), 20); } #[tokio::test] async fn test_caching_headers() { let state = create_test_state().await; let response = get_public_statistics( State(state), HeaderMap::new(), ).await; assert_eq!(response.status(), StatusCode::OK); assert!(response.headers().contains_key(ETAG)); assert!(response.headers().contains_key(CACHE_CONTROL)); let cache_control = response.headers() .get(CACHE_CONTROL) .unwrap() .to_str() .unwrap(); assert!(cache_control.contains("max-age=300")); } } }
Best Practices
- Use projections - Don’t query event streams directly
- Paginate results - Never return unbounded lists
- Cache aggressively - Read queries are perfect for caching
- Validate query parameters - Prevent resource exhaustion
- Monitor performance - Track slow queries
- Use appropriate protocols - REST for simple, GraphQL for complex
- Implement authorization - Check permissions for all queries
- Version your API - Queries can evolve independently
Summary
Query endpoints in EventCore applications:
- ✅ Projection-based - Read from optimized projections
- ✅ Performant - Caching and optimization built-in
- ✅ Flexible - Support REST, GraphQL, and real-time
- ✅ Secure - Authorization and rate limiting
- ✅ Testable - Easy to test in isolation
Key patterns:
- Read from projections, not event streams
- Implement proper pagination
- Cache responses appropriately
- Validate and limit query complexity
- Authorize access to data
- Monitor query performance
Next, let’s explore Authentication and Authorization →