Add v1/orders POST endpoint to get orders in batches#4048
Add v1/orders POST endpoint to get orders in batches#4048
Conversation
Adds POST handler for `v1/orders` endpoint that requires a list of order uids and responds with a vector of their data. Has a hardcoded limit of 5000 orders per request.
There was a problem hiding this comment.
Code Review
This pull request introduces a new endpoint v1/get_orders to retrieve orders in batches using a POST request with a limit of 5000 orders per request. The code adds a new module get_orders_by_uid.rs, modifies api.rs to include the new route, and updates database/orders.rs and orderbook.rs to support fetching multiple orders. I found a missing test case for the MAX_ORDERS_LIMIT validation.
|
Tested on sepolia staging: (queries twice for the same order) response |
|
By the way, we can have a discussion on what the endpoint itself should be. I got a couple suggestions from Claude
based on the above suggestions I have opted for a custom not-order-id-like name |
There was a problem hiding this comment.
Code Review
The implementation adds a new endpoint to fetch orders in batches. A critical issue was found in crates/orderbook/src/database/orders.rs where await is used on a stream, which will cause a compilation error. The implementation also inefficiently collects two full vectors into memory; a suggestion is provided to fix the bug and improve performance by chaining the streams.
fafk
left a comment
There was a problem hiding this comment.
LGTM. The endpoint should be documented in openapi.yml too I think.
|
This pull request has been marked as stale because it has been inactive a while. Please update this pull request or it will be automatically closed. |
|
Reminder: Please consider backward compatibility when modifying the API specification.
Caused by: |
There was a problem hiding this comment.
Code Review
This pull request introduces a new endpoint for fetching multiple orders by their UIDs. It has a critical potential Denial of Service vulnerability due to database connection pool exhaustion when streaming results. Additionally, the API route is incorrectly defined as GET instead of POST, the database logic for fetching multiple orders has a lifetime issue and silently ignores errors, and the new end-to-end test contains a compilation error.
| async fn many_orders(&self, uids: &[OrderUid]) -> Result<BoxStream<'static, Result<Order>>> { | ||
| let _timer = super::Metrics::get() | ||
| .database_queries | ||
| .with_label_values(&["many_orders"]) | ||
| .start_timer(); | ||
| let uids = uids.iter().map(|uid| ByteArray(uid.0)).collect::<Vec<_>>(); | ||
| let mut ex = self.pool.acquire().await?; | ||
|
|
||
| Ok(async_stream::try_stream! { | ||
| { | ||
| let mut stream = orders::many_full_orders_with_quotes(&mut ex, uids.as_slice()).await; | ||
| while let Some(order) = stream.next().await { | ||
| let Ok(order) = order else { continue }; | ||
| let (order, quote) = order.into_order_and_quote(); | ||
| yield full_order_with_quote_into_model_order(order, quote.as_ref())?; | ||
| } | ||
| } | ||
| { | ||
| let mut stream = database::jit_orders::get_many_by_id(&mut ex, uids.as_slice()).await; | ||
| while let Some(order) = stream.next().await { | ||
| let Ok(order) = order else { continue }; | ||
| yield full_order_into_model_order(order)?; | ||
| } | ||
| } | ||
| }.boxed()) | ||
| } |
There was a problem hiding this comment.
The many_orders function acquires a database connection and captures it within an async_stream. This can lead to a Denial of Service vulnerability due to database connection pool exhaustion if the stream is used as a response body and clients hold connections open. Additionally, the database connection ex is a local variable that is dropped, but the returned 'static stream captures ex by reference, which will lead to a use-after-free error and likely a compilation error due to lifetime mismatch. It is recommended to collect the results into a Vec<Order> and return the collection instead of a stream to ensure the database connection is released promptly.
There was a problem hiding this comment.
This has been chosen as a trade-off between collecting all order data in-memory. This allows for streaming return of response.
There was a problem hiding this comment.
Additionally, the database connection ex is a local variable that is dropped, but the returned 'static stream captures ex by reference, which will lead to a use-after-free error and likely a compilation error due to lifetime mismatch.
Isn't that an important concern?
There was a problem hiding this comment.
This is not an actual concern, since async-stream macro expands into an async move block into which the value is moved.
Excerpt from cargo expand on the function:
let mut ex = __self.pool.acquire().await?;
Ok({
let (mut __yield_tx, __yield_rx) =
unsafe { ::async_stream::__private::yielder::pair() };
::async_stream::__private::AsyncStream::new(__yield_rx,
async move
{
'__async_stream_private_check_scope:
{
{
let mut stream =
orders::many_full_orders_with_quotes(&mut ex,
uids.as_slice()).await;| { | ||
| let mut stream = orders::many_full_orders_with_quotes(&mut ex, uids.as_slice()).await; | ||
| while let Some(order) = stream.next().await { | ||
| let Ok(order) = order else { continue }; |
There was a problem hiding this comment.
Database errors from the stream are silently ignored. If an error occurs while fetching from the database, let Ok(order) = order else { continue }; will skip the erroneous item and continue, potentially returning an incomplete list of orders without any error indication. The error should be propagated using the ? operator. This also applies to the loop on line 341.
| let Ok(order) = order else { continue }; | |
| let order = order?; |
There was a problem hiding this comment.
There is no way to propagate errors as we are returning an array of orders that were fetched. The end-user of the API can cross-reference which orders were not returned.
There was a problem hiding this comment.
Wdym? full_order_with_quote_into_model_order returns a result. If stream.next() returned an Err, we should tell the client that this order is missing in the response, not because it doesn't exist, but because we encountered an error during the process. Silencing the error doesn't fit here at all.
There was a problem hiding this comment.
Okay, I will re-work the response format to include which orders had an error on this stage.
It is not feasible to figure out which orders were missing though (as this would require internally mapping the requested vs returned orders) which I believe it's best done at the request side, not as part of handling this request.
Adjusts ORDER_UID_LIMIT to be 128 which allows it to actually work instead of having the MAX_JSON_BODY_PAYLOAD always take effect. The MAX_JSON_BODY_PAYLOAD is set to 16384. OrderUid is 56 bytes, when encoded as Json array of 0x hex prefixed strings each takes around 116 bytes. The new value for ORDER_UID_LIMIT is 128. (128 * 116 = 14848)
| pub const ORDER_UID_LIMIT: usize = 1024; | ||
| /// OrderUid is 56 bytes. When hex encoded as 0x prefixes Json string it is 116. | ||
| /// Chosen to be under the MAX_JSON_BODY_PAYLOAD size of 1024 * 16 | ||
| pub const ORDER_UID_LIMIT: usize = 128; |
There was a problem hiding this comment.
I am not sure I am following. The PR description mentions 5000 orders. Here we have a limit of 128 orders.
There was a problem hiding this comment.
I updated the PR description now.
I have been doing some tests on the allowable payload size and if we don't want to bump up the MAX_JSON_BODY_PAYLOAD size of 16kb then this is the limit of how many orders we can put into a single request.
Maybe we want to remove the limiting altogether since the body size limit would be hit first?
There was a problem hiding this comment.
I don't understand the idea here. AAVE requested an api to receive batches of up to 5k orders. Now you are saying we shouldn't exceed 16kb for a request. That doesn't match. Also, if we end up with the 16kb request limit, it doesn't make any sense to introduce that complex streaming solution.
There was a problem hiding this comment.
Maybe we want to remove the limiting altogether since the body size limit would be hit first?
Removing limiting is not an option for untrusted endpoints. The simple solution here would be to bump the limit.
A more complicated solution would be to have a filter that decides whether a request gets the small limit or the big limit, would of course require coordination, etc.
My point being: don't remove the limits, bump them
There was a problem hiding this comment.
I have confirmed with Aave on the call that 128 orders per request is fine. We can always revisit the limit at a later point in time.
There was a problem hiding this comment.
I have confirmed with Aave on the call that 128 orders per request is fine. We can always revisit the limit at a later point in time.
Ok, then this stream solution doesn't make sense here. IMO, it is overkill. I originally suggested it for the 5k orders request.
There was a problem hiding this comment.
I would say let's keep this approach anyway, since It's lighter on our services' resources and if Aave would ever like us to expand the max request body limit/requested order count, it will just be a change of the constants.
There was a problem hiding this comment.
I would say let's keep this approach anyway, since It's lighter on our services' resources and if Aave would ever like us to expand the max request body limit/requested order count, it will just be a change of the constants.
I want to strongly avoid overcomplicating an already non-trivial codebase. This stream communication seems absolutely unnecessary for a simple response that can contain up to 128 orders. The get_orders API has a page limit of 1000 with no streaming!
| fn streaming_response(orders: impl Stream<Item = Order> + Send + 'static) -> Response { | ||
| let json_stream = streaming_json_array( | ||
| orders.filter_map(async move |order| serde_json::to_string(&order).ok()), | ||
| ) | ||
| .map(|s| Ok::<_, std::convert::Infallible>(s.into_bytes())); | ||
| ( | ||
| [("content-type", "application/json")], | ||
| body::Body::from_stream(json_stream), | ||
| ) | ||
| .into_response() | ||
| } |
There was a problem hiding this comment.
Is AAVE fine with this approach?
| { | ||
| let mut stream = database::jit_orders::get_many_by_id(&mut ex, uids.as_slice()).await; | ||
| while let Some(order) = stream.next().await { | ||
| let Ok(order) = order else { continue }; |
There was a problem hiding this comment.
Ditto (the error needs to be propogated to the client the same way as on the line below)
There was a problem hiding this comment.
I will rework the error handling.
There was a problem hiding this comment.
Before wasting time on polishing this streaming solution, let's agree on the overall design.
jmg-duarte
left a comment
There was a problem hiding this comment.
The PR looks almost there, but considering you're in touch with AAVE, I'd suggest reviewing the shape of the response for them as JSON is not a stream-friendly format at all
| pub const ORDER_UID_LIMIT: usize = 1024; | ||
| /// OrderUid is 56 bytes. When hex encoded as 0x prefixes Json string it is 116. | ||
| /// Chosen to be under the MAX_JSON_BODY_PAYLOAD size of 1024 * 16 | ||
| pub const ORDER_UID_LIMIT: usize = 128; |
There was a problem hiding this comment.
Maybe we want to remove the limiting altogether since the body size limit would be hit first?
Removing limiting is not an option for untrusted endpoints. The simple solution here would be to bump the limit.
A more complicated solution would be to have a filter that decides whether a request gets the small limit or the big limit, would of course require coordination, etc.
My point being: don't remove the limits, bump them
| fn streaming_json_array( | ||
| elements: impl Stream<Item = String> + Send + 'static, | ||
| ) -> impl Stream<Item = String> + Send + 'static { | ||
| let mut first = true; | ||
| stream::once(async { "[".to_string() }) | ||
| .chain(elements.map(move |element| { | ||
| let prefix = if first { "" } else { "," }; | ||
| first = false; | ||
| format!("{prefix}{element}") | ||
| })) | ||
| .chain(stream::once(async { "]".to_string() })) | ||
| } |
There was a problem hiding this comment.
To me this looks like a complicated way of implementing a JSONL, would AAVE be ok with that instead? It's much more stream friendly and it would be also easier for them to parse since each line is:
- delimited with the new line
- a valid json object that they can parse (as opposed to waiting for the whole array)
There was a problem hiding this comment.
I will go ahead and make the change, while waiting for Aave's response on the matter since I presume they have no preference and JSONL is indeed more fitting here.
Co-authored-by: José Duarte <duarte.gmj@gmail.com>
Co-authored-by: José Duarte <duarte.gmj@gmail.com>
Description
Aave wants to track specific orders in bulk, knowing their ids.
Changes
Adds POST handler for
v1/orders/lookupendpoint that requires a list of order uids and responds with a vector of their data. Has a hardcoded limit of 128 orders per request (to fit the MAX_JSON_BODY_PAYLOAD size).How to test
Test on staging, query multiple orders using this API.