-
Notifications
You must be signed in to change notification settings - Fork 154
Shared native price cache #4136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the native price estimation by splitting CachingNativePriceEstimator into Cache, CachingNativePriceEstimator, and NativePriceUpdater. This change is intended to reduce API usage by sharing the cache in the autopilot service. The implementation is sound, but a critical validation check for the cache configuration was removed during this refactoring. This could lead to excessive API calls, negating the benefits of the cache. A specific comment has been added to address this, and the suggestion to panic on configuration error aligns with established repository rules for fail-fast behavior in critical background tasks.
4fba623 to
fc3b701
Compare
# Conflicts: # crates/e2e/tests/e2e/autopilot_leader.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the native price estimation by splitting the CachingNativePriceEstimator to improve architecture and reduce redundant API calls. However, it introduces a resource exhaustion vulnerability by removing the limit on background updates. An attacker could flood the orderbook with solvable orders for a large number of unique tokens, leading to excessive API usage and a potential Denial of Service. Additionally, there is a bug where malformed 0.0 prices can be returned from the cache. Finally, there is an opportunity to reduce code duplication by introducing a shared helper function for creating the CachingNativePriceEstimator.
| let outdated_entries = cache.outdated_tokens(max_age, Instant::now()); | ||
|
|
||
| tracing::trace!(count = outdated_entries.len(), "outdated prices to fetch"); | ||
|
|
||
| metrics | ||
| .native_price_cache_outdated_entries | ||
| .set(i64::try_from(outdated_entries.len()).unwrap_or(i64::MAX)); | ||
|
|
||
| if outdated_entries.is_empty() { | ||
| return; | ||
| } | ||
|
|
||
| let timeout = self.estimator.0.quote_timeout; | ||
| let mut stream = | ||
| self.estimator | ||
| .0 | ||
| .estimate_prices_and_update_cache(&outdated_entries, max_age, timeout); | ||
| while stream.next().await.is_some() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of the limit on the number of tokens updated in a single maintenance cycle creates a potential resource exhaustion vulnerability. An attacker can flood the orderbook with solvable orders for a large number of unique tokens, forcing the autopilot's background task to attempt price fetches for all these tokens, leading to excessive API usage and a potential Denial of Service. Re-introduce a limit on the number of tokens updated per cycle and implement a priority-based update mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That must be a very expensive attack to create thousands of orders with unique token pairs, where each sell token should have sufficient balance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request successfully refactors the native price estimation system by introducing a shared Cache, a CachingNativePriceEstimator, and a NativePriceUpdater. This aligns with the stated goal of splitting the estimator into focused components and sharing the cache across different parts of the application (API and auction competition). The changes correctly implement the new architecture, including the removal of the implicit priority-based update system and the --native-price-cache-max-update-size flag. The relocation of configuration checks and updates to test cases are consistent with the refactoring. No critical issues or direct logic errors were found in this refactoring.
34dcce5 to
dad1fc5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small notes. Everything makes sense to me, with the small exception of Martin's comment that I still don't fully grasp Martin explained on Slack, his idea makes perfect sense now
| let cached = Self::get_ready_to_use_cached_price( | ||
| *token, | ||
| now, | ||
| &mut cache, | ||
| &self.0.max_age, | ||
| create_missing_entries, | ||
| ); | ||
| let label = if cached.is_some() { "hits" } else { "misses" }; | ||
| CacheMetrics::get() | ||
| .native_price_cache_access | ||
| .with_label_values(&[label]) | ||
| .inc_by(1); | ||
| if let Some(result) = cached { | ||
| results.insert(*token, result.result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally separate methods for the hits/missed on the metrics would make this much shorter and easier to read
| let cached = Self::get_ready_to_use_cached_price( | |
| *token, | |
| now, | |
| &mut cache, | |
| &self.0.max_age, | |
| create_missing_entries, | |
| ); | |
| let label = if cached.is_some() { "hits" } else { "misses" }; | |
| CacheMetrics::get() | |
| .native_price_cache_access | |
| .with_label_values(&[label]) | |
| .inc_by(1); | |
| if let Some(result) = cached { | |
| results.insert(*token, result.result); | |
| } | |
| if let Some(cached) = Self::get_ready_to_use_cached_price( | |
| *token, | |
| now, | |
| &mut cache, | |
| &self.0.max_age, | |
| create_missing_entries, | |
| ) { | |
| results.insert(*token, result.result); | |
| CacheMetrics::get() | |
| .native_price_cache_access | |
| .with_label_values(&["hits"]) | |
| .inc_by(1); | |
| } else { | |
| CacheMetrics::get() | |
| .native_price_cache_access | |
| .with_label_values(&["misses"]) | |
| .inc_by(1); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't your version add more lines of code and create code duplication in terms of incrementing the metric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suggestion alone yes, but in the comment I suggest creating a method for the hits and the misses metrics
Regardless, it's just a suggestion
| metrics | ||
| .native_price_cache_background_updates | ||
| .inc_by(outdated_entries.len() as u64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this metric only count attempts at updating native price?
Some results of native price estimation are not cacheable - would it be useful to track successes/failures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a separate metric for success/failures:
| price_estimates: IntCounterVec, |
MartinquaXD
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks alright to me. I believe my suggestion doesn't violate any invariants but since I'm not sure I'll already approve the current state.
Wanted to mention to test it again in prod but I saw you already did that. 👍
| ) -> Option<CachedResult> { | ||
| Self::get_cached_price(token, now, cache, max_age, create_missing_entry) | ||
| .filter(|cached| cached.is_ready()) | ||
| Self::get_cached_price(token, now, cache, max_age).filter(|cached| cached.is_ready()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function stood out to me as being kind of pointless. There is only 1 case where we currently do not also require is_ready() and that instance can be avoided by changing how a cached entry gets updated.
I originally wanted to just comment on this but I realized that explaining the issue is more complicated than just offering a diff. It gets rid of ~27 lines by reducing the API surface.
diff
diff --git a/crates/shared/src/price_estimation/native_price_cache.rs b/crates/shared/src/price_estimation/native_price_cache.rs
index 416ce8206..201d3dc3a 100644
--- a/crates/shared/src/price_estimation/native_price_cache.rs
+++ b/crates/shared/src/price_estimation/native_price_cache.rs
@@ -104,25 +104,30 @@ struct CachedResult {
const ACCUMULATIVE_ERRORS_THRESHOLD: u32 = 5;
impl CachedResult {
- fn new(
- result: CacheEntry,
- updated_at: Instant,
- requested_at: Instant,
- current_accumulative_errors_count: u32,
- ) -> Self {
- let estimator_internal_errors_count =
- matches!(result, Err(PriceEstimationError::EstimatorInternal(_)))
- .then_some(current_accumulative_errors_count + 1)
- .unwrap_or_default();
+ fn new(result: CacheEntry) -> Self {
+ let now = Instant::now();
+ let is_accumulating_error =
+ matches!(result, Err(PriceEstimationError::EstimatorInternal(_)));
Self {
result,
- updated_at,
- requested_at,
- accumulative_errors_count: estimator_internal_errors_count,
+ updated_at: now,
+ requested_at: now,
+ accumulative_errors_count: u32::from(is_accumulating_error),
}
}
+ fn update(&mut self, result: CacheEntry) {
+ let now = Instant::now();
+ self.requested_at = now;
+ self.updated_at = now;
+ self.accumulative_errors_count = match result {
+ Err(PriceEstimationError::EstimatorInternal(_)) => self.accumulative_errors_count + 1,
+ _ => 0,
+ };
+ self.result = result;
+ }
+
/// The result is not ready if the estimator has returned an internal error
/// and consecutive errors are less than
/// `ESTIMATOR_INTERNAL_ERRORS_THRESHOLD`.
@@ -170,12 +175,12 @@ impl Cache {
let updated_at = Self::random_updated_at(max_age, now, &mut rng);
Some((
token,
- CachedResult::new(
- Ok(from_normalized_price(price)?),
+ CachedResult {
+ result: Ok(from_normalized_price(price)?),
updated_at,
- now,
- Default::default(),
- ),
+ requested_at: now,
+ accumulative_errors_count: 0,
+ },
))
})
.collect::<HashMap<_, _>>();
@@ -186,7 +191,7 @@ impl Cache {
}))
}
- pub fn max_age(&self) -> Duration {
+ fn max_age(&self) -> Duration {
self.0.max_age
}
@@ -199,14 +204,10 @@ impl Cache {
now - Duration::from_secs(age)
}
- pub fn len(&self) -> usize {
+ fn len(&self) -> usize {
self.0.data.lock().unwrap().len()
}
- pub fn is_empty(&self) -> bool {
- self.0.data.lock().unwrap().is_empty()
- }
-
fn get_cached_price(
token: Address,
now: Instant,
@@ -216,20 +217,11 @@ impl Cache {
let entry = cache.get_mut(&token)?;
entry.requested_at = now;
let is_recent = now.saturating_duration_since(entry.updated_at) < *max_age;
- is_recent.then_some(entry.clone())
- }
-
- fn get_ready_to_use_cached_price(
- token: Address,
- now: Instant,
- cache: &mut MutexGuard<HashMap<Address, CachedResult>>,
- max_age: &Duration,
- ) -> Option<CachedResult> {
- Self::get_cached_price(token, now, cache, max_age).filter(|cached| cached.is_ready())
+ (is_recent && entry.is_ready()).then_some(entry.clone())
}
/// Only returns prices that are currently cached.
- pub fn get_cached_prices(
+ fn get_cached_prices(
&self,
tokens: &[Address],
) -> HashMap<Address, Result<f64, PriceEstimationError>> {
@@ -237,8 +229,7 @@ impl Cache {
let mut cache = self.0.data.lock().unwrap();
let mut results = HashMap::default();
for token in tokens {
- let cached =
- Self::get_ready_to_use_cached_price(*token, now, &mut cache, &self.0.max_age);
+ let cached = Self::get_cached_price(*token, now, &mut cache, &self.0.max_age);
let label = if cached.is_some() { "hits" } else { "misses" };
CacheMetrics::get()
.native_price_cache_access
@@ -251,8 +242,12 @@ impl Cache {
results
}
- fn insert(&self, token: Address, result: CachedResult) {
- self.0.data.lock().unwrap().insert(token, result);
+ fn insert(&self, token: Address, result: CacheEntry) {
+ let mut cache = self.0.data.lock().unwrap();
+ cache
+ .entry(token)
+ .and_modify(|value| value.update(result.clone()))
+ .or_insert_with(|| CachedResult::new(result));
}
}
@@ -314,19 +309,14 @@ impl CachingNativePriceEstimator {
I::IntoIter: Send + 'a,
{
let estimates = tokens.into_iter().map(move |token| async move {
- let current_accumulative_errors_count = {
- // check if the price is cached by now
- let now = Instant::now();
+ // check if the price is cached by now
+ let now = Instant::now();
+ {
let mut cache = self.0.cache.0.data.lock().unwrap();
-
- match Cache::get_cached_price(token, now, &mut cache, &max_age) {
- Some(cached) if cached.is_ready() => {
- return (token, cached.result);
- }
- Some(cached) => cached.accumulative_errors_count,
- None => Default::default(),
+ if let Some(cached) = Cache::get_cached_price(token, now, &mut cache, &max_age) {
+ return (token, cached.result);
}
- };
+ }
let approximation = self
.0
@@ -344,11 +334,7 @@ impl CachingNativePriceEstimator {
// update price in cache
if should_cache(&result) {
- let now = Instant::now();
- self.0.cache.insert(
- token,
- CachedResult::new(result.clone(), now, now, current_accumulative_errors_count),
- );
+ self.0.cache.insert(token, result.clone());
};
(token, result)
@@ -362,20 +348,12 @@ impl CachingNativePriceEstimator {
&self.0.cache
}
- /// Only returns prices that are currently cached.
- fn get_cached_prices(
- &self,
- tokens: &[Address],
- ) -> HashMap<Address, Result<f64, PriceEstimationError>> {
- self.0.cache.get_cached_prices(tokens)
- }
-
pub async fn fetch_prices(
&self,
tokens: &[Address],
timeout: Duration,
) -> HashMap<Address, NativePriceEstimateResult> {
- let mut prices = self.get_cached_prices(tokens);
+ let mut prices = self.0.cache.get_cached_prices(tokens);
if timeout.is_zero() {
return prices;
}
@@ -413,12 +391,7 @@ impl NativePriceEstimating for CachingNativePriceEstimator {
let cached = {
let now = Instant::now();
let mut cache = self.0.cache.0.data.lock().unwrap();
- Cache::get_ready_to_use_cached_price(
- token,
- now,
- &mut cache,
- &self.0.cache.0.max_age,
- )
+ Cache::get_cached_price(token, now, &mut cache, &self.0.cache.0.max_age)
};
let label = if cached.is_some() { "hits" } else { "misses" };
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Applied the diff 🙌
Description
Replaces #4044. Once we started forwarding native price estimates from the orderbook to autopilot, CoinGecko API usage went up. This happened because the estimator moved to autopilot, which now handles all requests and also relies on CoinGecko.
This PR refactors native price estimation in a way described below, according to the diagram created by @MartinquaXD:

Changes
CachingNativePriceEstimatorinto three focused components: a passiveCache(shared data store), aCachingNativePriceEstimator(on-demand price fetching with caching), and aNativePriceUpdater(background maintenance worker).Cacheinstance used by both the API-facing estimator and the auctioncompetition estimator, eliminating duplicate price fetches for the same tokens.
--api-native-price-estimatorsflag to the autopilot, allowing the API endpoint to use different native price estimator sources than the auction pipeline (falls back to--native-price-estimators, if unset).high_priority+replace_high_priority) with an explicit token tracking viaset_tokens_to_update(), called during the solvable orders cache building in the autopilot.Cache+CachingNativePriceEstimator.CachingNativePriceEstimatorsthat share a single instance ofCache. One of the estimators is wrapped with theNativePriceUpdater, which is used in the auction competition. Another one serves the autopilot's API request without a maintenance task.--native-price-cache-max-update-sizeflag (dead code after the refactoring removed the truncation logic).How to test
Existing tests + staging and prod.
Follow-up tasks