Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR implements an In-Memory Cache (IMC) feature using a sharded queue system. The implementation adds a distributed caching layer with 10 shards backed by Redis, featuring automatic cache updates via polling and TTL-based expiration.
Key Changes:
- Adds a sharded queue system with 10 Redis-backed shards and a polling mechanism that runs every 10 seconds
- Implements an in-memory cache (IMC) using a Registry pattern with TTL support for service configurations
- Modifies service configuration lookup to check IMC first before database queries
- Introduces a new
findByNameFromRedisWithDefaultfunction to cache default values when configurations are not found
Reviewed Changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| src/shard_queue/mod.rs | Module definition for the sharded queue system with global singleton handler |
| src/shard_queue/types.rs | Type definitions for shard queue items, metadata, and error types |
| src/shard_queue/registry.rs | Registry implementation for in-memory caching with TTL support and expiration cleanup |
| src/shard_queue/handler.rs | Core handler implementing sharded queue operations, polling logic, and IMC integration |
| src/types/service_configuration.rs | Updated to check IMC before database and push new configs to shard queue |
| src/redis/cache.rs | Enhanced cache lookup with IMC integration and new default value caching function |
| src/redis/commands.rs | Added get_range_from_list method for retrieving items from Redis lists |
| src/storage/types.rs | Added Serialize/Deserialize traits to ServiceConfiguration for caching |
| src/lib.rs | Registered new shard_queue module |
| src/decider/gatewaydecider/gw_scoring.rs | Refactored to use new findByNameFromRedisWithDefault API |
| src/bin/open_router.rs | Spawns shard queue polling task alongside main and metrics servers |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let default_config = crate::storage::types::ServiceConfiguration { | ||
| id: 0, // Placeholder ID since we're not storing in DB | ||
| name: key.clone(), | ||
| value: Some(serde_json::to_string(&default_value).unwrap_or_else(|_| "null".to_string())), | ||
| new_value: None, | ||
| previous_value: None, | ||
| new_value_status: None, | ||
| }; |
There was a problem hiding this comment.
The serialization logic here may cause issues. You're serializing default_value to a JSON string and storing it in the value field of ServiceConfiguration. However, the value field is Option<String>, which typically stores a string representation. When this is later retrieved and deserialized, the code expects to deserialize from this string value, which could lead to double-encoding issues (JSON string containing a JSON string). Consider whether the value field should contain the direct JSON representation or if the deserialization logic needs adjustment.
| // Remove oldest entry (simple eviction policy) | ||
| if let Some(oldest_key) = data.keys().next().cloned() { | ||
| data.remove(&oldest_key); |
There was a problem hiding this comment.
The comment says "Remove oldest entry" but HashMap::keys().next() returns an arbitrary key, not necessarily the oldest. HashMap iteration order is not guaranteed to be insertion order. Consider using a more appropriate data structure like LinkedHashMap or updating the comment to reflect the actual behavior (e.g., "Remove arbitrary entry").
| for item in &new_items { | ||
| // Store in global registry with 600 second TTL | ||
| if let Err(_) = | ||
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) |
There was a problem hiding this comment.
[nitpick] The TTL value of 600 seconds is hardcoded in multiple places. Consider defining it as a constant (e.g., const CACHE_TTL_SECONDS: u64 = 600;) to improve maintainability and make it easier to adjust if needed.
|
|
||
| let inner = ShardedQueueHandlerInner { | ||
| shard_metadata: Arc::new(Mutex::new(shard_metadata)), | ||
| loop_interval: Duration::from_secs(10), // 30 seconds for testing |
There was a problem hiding this comment.
The comment says "30 seconds for testing" but the code sets the duration to 10 seconds. Update the comment to match the actual value: "10 seconds".
| loop_interval: Duration::from_secs(10), // 30 seconds for testing | |
| loop_interval: Duration::from_secs(10), // 10 seconds |
| use crate::storage::schema_pg::service_configuration::dsl; | ||
| use diesel::associations::HasTable; | ||
| use diesel::*; | ||
| use serde_json::json; |
There was a problem hiding this comment.
The import use serde_json::json; is unused and should be removed.
| use serde_json::json; |
| pub fn size(&self) -> usize { | ||
| self.data.read().unwrap().len() |
There was a problem hiding this comment.
The size() method uses unwrap() which will panic if the lock is poisoned, while other methods properly handle lock errors. For consistency, this should return Result<usize, String> and use map_err like the other methods.
| pub fn size(&self) -> usize { | |
| self.data.read().unwrap().len() | |
| pub fn size(&self) -> Result<usize, String> { | |
| let data = self.data.read().map_err(|e| format!("Read lock error: {}", e))?; | |
| Ok(data.len()) |
| if let Err(_) = | ||
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) | ||
| { | ||
| logger::error!("Failed to store item in registry: {}", item.key); |
There was a problem hiding this comment.
The error message from the registry store operation is being discarded. Consider logging it: if let Err(e) = ... instead of if let Err(_) = ... to include the actual error message in the log for better debugging.
| if let Err(_) = | |
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) | |
| { | |
| logger::error!("Failed to store item in registry: {}", item.key); | |
| if let Err(e) = | |
| GLOBAL_SHARD_REGISTRY.store(item.key.clone(), item.value.clone(), Some(600)) | |
| { | |
| logger::error!("Failed to store item in registry: {} (error: {})", item.key, e); |
|
|
||
| if let Some(shard_meta) = metadata.get_mut(&shard_id) { | ||
| shard_meta.update_last_modified(); | ||
| logger::debug!("Updated last_modified_at for shard {}", shard_id); |
There was a problem hiding this comment.
[nitpick] If the shard metadata is not found, the update is silently skipped. While the metadata is initialized for all shards 0-9 in new(), consider adding a warning log if the metadata is unexpectedly missing to help diagnose potential issues.
| logger::debug!("Updated last_modified_at for shard {}", shard_id); | |
| logger::debug!("Updated last_modified_at for shard {}", shard_id); | |
| } else { | |
| logger::warn!( | |
| "Shard metadata for shard {} was unexpectedly missing when attempting to update last_modified_at.", | |
| shard_id | |
| ); |
| for raw_item in raw_items { | ||
| match serde_json::from_str::<ShardQueueItem>(&raw_item) { | ||
| Ok(item) => { | ||
| if item.modified_at > last_modified_at { | ||
| new_items.push(item); |
There was a problem hiding this comment.
The variable new_items is used on line 163 but is never declared. You need to add let mut new_items = Vec::new(); before the loop that starts on line 159.
| #[test] | ||
| fn test_push_and_get_sizes() { | ||
| let handler = ShardedQueueHandler::new(); | ||
|
|
||
| let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); | ||
| let result = handler.push_to_shard(item); | ||
|
|
||
| assert!(result.is_ok()); | ||
|
|
||
| let sizes = handler.get_queue_sizes().unwrap(); |
There was a problem hiding this comment.
This test calls async functions (push_to_shard and get_queue_sizes) but is not marked as an async test. The test should be annotated with #[tokio::test] instead of #[test], and the test function should be async fn.
| #[test] | |
| fn test_push_and_get_sizes() { | |
| let handler = ShardedQueueHandler::new(); | |
| let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); | |
| let result = handler.push_to_shard(item); | |
| assert!(result.is_ok()); | |
| let sizes = handler.get_queue_sizes().unwrap(); | |
| #[tokio::test] | |
| async fn test_push_and_get_sizes() { | |
| let handler = ShardedQueueHandler::new(); | |
| let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); | |
| let result = handler.push_to_shard(item).await; | |
| assert!(result.is_ok()); | |
| let sizes = handler.get_queue_sizes().await.unwrap(); |
| crate::logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); | ||
| } else { | ||
| crate::logger::debug!("Cached default value for config '{}' in IMC", key); |
There was a problem hiding this comment.
| crate::logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); | |
| } else { | |
| crate::logger::debug!("Cached default value for config '{}' in IMC", key); | |
| logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); | |
| } else { | |
| logger::debug!("Cached default value for config '{}' in IMC", key); |
| A: for<'de> Deserialize<'de>, | ||
| { | ||
| let res = service_configuration::find_config_by_name(key).await; | ||
| use crate::shard_queue::find_config_in_mem; |
| use crate::shard_queue::find_config_in_mem; | ||
|
|
||
| if let Ok(cached_value) = find_config_in_mem(&key) { | ||
| crate::logger::debug!("Cache HIT: Found config '{}' in IMC", key); |
There was a problem hiding this comment.
| crate::logger::debug!("Cache HIT: Found config '{}' in IMC", key); | |
| logger::debug!("Cache HIT: Found config '{}' in IMC", key); |
do this change in all other places also
| } | ||
| crate::logger::debug!("Cache MISS: Config '{}' not found in IMC, checking DB", key); | ||
|
|
||
| if let Ok(Some(config)) = check_database_for_service_config(key.clone()).await { |
There was a problem hiding this comment.
use the existing method service_configuration ::find_config_by_name
| None | ||
| } | ||
|
|
||
| async fn check_database_for_service_config( |
There was a problem hiding this comment.
we don't need this method as it already available in service_configuration::find_config_by_name
| let mut shard_metadata = HashMap::new(); | ||
|
|
||
| // Initialize metadata for 10 shards (0-9) | ||
| for shard_id in 0..10 { |
There was a problem hiding this comment.
fetch this value from config so that we change it using envs
|
|
||
| let inner = ShardedQueueHandlerInner { | ||
| shard_metadata: Arc::new(Mutex::new(shard_metadata)), | ||
| loop_interval: Duration::from_secs(10), // 30 seconds for testing |
There was a problem hiding this comment.
move this also to configs
| pub fn get_shard_id(&self, key: &str) -> u8 { | ||
| let mut hasher = std::collections::hash_map::DefaultHasher::new(); | ||
| key.hash(&mut hasher); | ||
| (hasher.finish() % 10) as u8 |
| crate::generics::generic_insert(&app_state.db, config).await?; | ||
|
|
||
| // Create ServiceConfiguration for shard queue (after successful DB insert) | ||
| let service_config = ServiceConfiguration { | ||
| id: 0, // We don't need the actual DB ID for IMC, using 0 as placeholder | ||
| name: name.clone(), | ||
| value, | ||
| new_value: None, | ||
| previous_value: None, | ||
| new_value_status: None, | ||
| }; |
There was a problem hiding this comment.
| crate::generics::generic_insert(&app_state.db, config).await?; | |
| // Create ServiceConfiguration for shard queue (after successful DB insert) | |
| let service_config = ServiceConfiguration { | |
| id: 0, // We don't need the actual DB ID for IMC, using 0 as placeholder | |
| name: name.clone(), | |
| value, | |
| new_value: None, | |
| previous_value: None, | |
| new_value_status: None, | |
| }; | |
| let service_config = crate::generics::generic_insert(&app_state.db, config).await?; | |
| } | ||
|
|
||
| /// IMC functions following your existing pattern for service_configuration caching | ||
| pub fn find_config_in_mem(key: &str) -> StorageResult<serde_json::Value> { |
There was a problem hiding this comment.
| pub fn find_config_in_mem(key: &str) -> StorageResult<serde_json::Value> { | |
| pub fn find_config_in_mem(key: &str) -> StorageResult< ServiceConfiguration > { |
This pull request introduces a new sharded queue configuration and implements a mechanism for caching configuration defaults via an in-memory cache (IMC) and Redis streams. It also refactors how configuration values are retrieved, ensuring that missing configs are automatically cached with default values. Additionally, new Redis stream and list operations are added to support these features.
Configuration and Caching Improvements
ShardQueueConfigstruct tosrc/config.rswith default values and integrated it intoGlobalConfig, enabling configuration of shard queue parameters via config files. [1] [2]findByNameFromRedisWithDefaultinsrc/redis/cache.rs, which retrieves a config value from cache or DB, and if missing, caches and returns a default value via the shard queue system. This change also refactorsfindByNameFromRedisHelperto check IMC before falling back to DB and pushes results to the shard queue.Redis Stream and List Operations
src/redis/commands.rs:xadd_with_maxlen,xadd_with_approximate_maxlen,xrange, andxlen, supporting efficient streaming and caching of configuration data. Also addedget_range_from_listfor list operations. [1] [2]Dependency Updates
chronocrate inCargo.tomlto include theserdefeature for improved serialization support.Module Organization
shard_queuemodule tosrc/lib.rsto support sharded queue logic.