Skip to content

Commit 653bfb5

Browse files
feat: isolate and track forester worker concurrency
1 parent 1982837 commit 653bfb5

27 files changed

Lines changed: 481 additions & 300 deletions

forester/src/epoch_manager.rs

Lines changed: 314 additions & 44 deletions
Large diffs are not rendered by default.

forester/src/metrics.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -440,21 +440,19 @@ pub async fn metrics_handler() -> Result<impl warp::Reply> {
440440
if let Err(e) = encoder.encode(&REGISTRY.gather(), &mut buffer) {
441441
error!("could not encode custom metrics: {}", e);
442442
};
443-
let mut res = String::from_utf8(buffer.clone()).unwrap_or_else(|e| {
443+
let mut res = String::from_utf8(buffer).unwrap_or_else(|e| {
444444
error!("custom metrics could not be from_utf8'd: {}", e);
445445
String::new()
446446
});
447-
buffer.clear();
448447

449448
let mut buffer = Vec::new();
450449
if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
451450
error!("could not encode prometheus metrics: {}", e);
452451
};
453-
let res_prometheus = String::from_utf8(buffer.clone()).unwrap_or_else(|e| {
452+
let res_prometheus = String::from_utf8(buffer).unwrap_or_else(|e| {
454453
error!("prometheus metrics could not be from_utf8'd: {}", e);
455454
String::new()
456455
});
457-
buffer.clear();
458456

459457
res.push_str(&res_prometheus);
460458
Ok(res)

forester/src/priority_fee.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ pub async fn request_priority_fee_estimate(
208208
.map_err(|error| PriorityFeeEstimateError::ClientBuild(error.clone()))?;
209209

210210
let response = http_client
211-
.post(url.clone())
211+
.post(url.as_str())
212212
.header("Content-Type", "application/json")
213213
.json(&rpc_request)
214214
.send()

forester/src/processor/v2/helpers.rs

Lines changed: 54 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ use light_client::{
1111
};
1212
use light_hasher::hash_chain::create_hash_chain_from_slice;
1313

14-
use crate::processor::v2::{common::clamp_to_u16, BatchContext};
14+
use crate::{
15+
logging::should_emit_rate_limited_warning,
16+
processor::v2::{common::clamp_to_u16, BatchContext},
17+
};
1518

1619
pub(crate) fn lock_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> MutexGuard<'a, T> {
1720
match mutex.lock() {
@@ -496,71 +499,57 @@ impl StreamingAddressQueue {
496499
if available < end || start >= end {
497500
return Ok(None);
498501
}
499-
let actual_end = end;
500502
let data = lock_recover(&self.data, "streaming_address_queue.data");
501503

502-
let min_len = [
503-
data.addresses.len(),
504-
data.low_element_values.len(),
505-
data.low_element_next_values.len(),
506-
data.low_element_indices.len(),
507-
data.low_element_next_indices.len(),
508-
]
509-
.into_iter()
510-
.min()
511-
.unwrap_or(0);
512-
if min_len < actual_end {
513-
return Err(anyhow!(
514-
"incomplete batch data: min field length {} < required end {}",
515-
min_len,
516-
actual_end
517-
));
518-
}
519-
520-
let addresses = data.addresses[start..actual_end].to_vec();
521-
if addresses.is_empty() {
522-
return Ok(None);
523-
}
524-
let expected_len = addresses.len();
525-
let Some(low_element_values) = data
526-
.low_element_values
527-
.get(start..end)
528-
.map(|slice| slice.to_vec())
529-
else {
530-
return Ok(None);
531-
};
532-
let Some(low_element_next_values) = data
533-
.low_element_next_values
534-
.get(start..end)
535-
.map(|slice| slice.to_vec())
536-
else {
537-
return Ok(None);
538-
};
539-
let Some(low_element_indices) = data
540-
.low_element_indices
541-
.get(start..end)
542-
.map(|slice| slice.to_vec())
504+
// `available` can be bumped before every parallel array is filled,
505+
// so a missing range here means "not ready yet" — return Ok(None)
506+
// and let the caller retry on the next tick.
507+
let range = start..end;
508+
let (
509+
Some(addresses),
510+
Some(low_element_values),
511+
Some(low_element_next_values),
512+
Some(low_element_indices),
513+
Some(low_element_next_indices),
514+
) = (
515+
data.addresses.get(range.clone()).map(<[_]>::to_vec),
516+
data.low_element_values
517+
.get(range.clone())
518+
.map(<[_]>::to_vec),
519+
data.low_element_next_values
520+
.get(range.clone())
521+
.map(<[_]>::to_vec),
522+
data.low_element_indices
523+
.get(range.clone())
524+
.map(<[_]>::to_vec),
525+
data.low_element_next_indices
526+
.get(range.clone())
527+
.map(<[_]>::to_vec),
528+
)
543529
else {
544530
return Ok(None);
545531
};
546-
let Some(low_element_next_indices) = data
547-
.low_element_next_indices
548-
.get(start..end)
549-
.map(|slice| slice.to_vec())
550-
else {
551-
return Ok(None);
532+
533+
// Proofs can also be unavailable if the indexer hasn't populated the
534+
// merkle nodes for this range yet — return Ok(None) and retry. Log
535+
// at warn (rate-limited) so persistent failures are still visible.
536+
let low_element_proofs = match data.reconstruct_proofs::<HEIGHT>(range) {
537+
Ok(proofs) => proofs,
538+
Err(error) => {
539+
if should_emit_rate_limited_warning(
540+
"address_queue_proofs_not_ready",
541+
std::time::Duration::from_secs(60),
542+
) {
543+
tracing::warn!(
544+
?error,
545+
start,
546+
end,
547+
"address proof reconstruction not ready, retrying"
548+
);
549+
}
550+
return Ok(None);
551+
}
552552
};
553-
if [
554-
low_element_values.len(),
555-
low_element_next_values.len(),
556-
low_element_indices.len(),
557-
low_element_next_indices.len(),
558-
]
559-
.iter()
560-
.any(|&len| len != expected_len)
561-
{
562-
return Ok(None);
563-
}
564553

565554
let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
566555
Some(hashchain) => hashchain,
@@ -582,15 +571,11 @@ impl StreamingAddressQueue {
582571
};
583572

584573
Ok(Some(AddressBatchSnapshot {
585-
low_element_values: data.low_element_values[start..actual_end].to_vec(),
586-
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
587-
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
588-
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
589-
low_element_proofs: data
590-
.reconstruct_proofs::<HEIGHT>(start..actual_end)
591-
.map_err(|error| {
592-
anyhow!("incomplete batch data: failed to reconstruct proofs: {error}")
593-
})?,
574+
low_element_values,
575+
low_element_next_values,
576+
low_element_indices,
577+
low_element_next_indices,
578+
low_element_proofs,
594579
addresses,
595580
leaves_hashchain,
596581
}))

forester/src/processor/v2/proof_worker.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,19 @@ impl ProofClients {
139139
config.polling_interval,
140140
config.max_wait_time,
141141
config.api_key.clone(),
142-
)?,
142+
),
143143
nullify_client: ProofClient::with_config(
144144
config.update_url.clone(),
145145
config.polling_interval,
146146
config.max_wait_time,
147147
config.api_key.clone(),
148-
)?,
148+
),
149149
address_append_client: ProofClient::with_config(
150150
config.address_append_url.clone(),
151151
config.polling_interval,
152152
config.max_wait_time,
153153
config.api_key.clone(),
154-
)?,
154+
),
155155
})
156156
}
157157

forester/tests/legacy/batched_state_async_indexer_test.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use light_compressed_account::{
2323
use light_compressed_token::process_transfer::{
2424
transfer_sdk::create_transfer_instruction, TokenTransferOutputData,
2525
};
26-
use light_token::compat::TokenDataWithMerkleContext;
2726
use light_program_test::accounts::test_accounts::TestAccounts;
28-
use light_prover_client::prover::spawn_prover;
2927
use light_registry::{
3028
protocol_config::state::{ProtocolConfig, ProtocolConfigPda},
3129
utils::get_protocol_config_pda_address,
@@ -34,6 +32,7 @@ use light_test_utils::{
3432
conversions::sdk_to_program_token_data, spl::create_mint_helper_with_keypair,
3533
system_program::create_invoke_instruction,
3634
};
35+
use light_token::compat::TokenDataWithMerkleContext;
3736
use rand::{prelude::SliceRandom, rngs::StdRng, Rng, SeedableRng};
3837
use serial_test::serial;
3938
use solana_program::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey};
@@ -87,7 +86,6 @@ async fn test_state_indexer_async_batched() {
8786
validator_args: vec![],
8887
}))
8988
.await;
90-
spawn_prover().await;
9189

9290
let env = TestAccounts::get_local_test_validator_accounts();
9391
let mut config = forester_config();
@@ -306,10 +304,7 @@ async fn wait_for_slot(rpc: &mut LightClient, target_slot: u64) {
306304
return;
307305
}
308306
Err(e) => {
309-
println!(
310-
"warp_to_slot unavailable ({}), falling back to polling",
311-
e
312-
);
307+
println!("warp_to_slot unavailable ({}), falling back to polling", e);
313308
}
314309
}
315310
while rpc.get_slot().await.unwrap() < target_slot {

prover/client/src/errors.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ pub enum ProverClientError {
3939

4040
#[error("Integer conversion failed: {0}")]
4141
IntegerConversion(String),
42-
4342
#[error("Hashchain mismatch: computed {computed:?} != expected {expected:?} (batch_size={batch_size}, next_index={next_index})")]
4443
HashchainMismatch {
4544
computed: [u8; 32],

prover/client/src/helpers.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::process::Command;
22

33
use light_hasher::{Hasher, Poseidon};
44
use light_sparse_merkle_tree::changelog::ChangelogEntry;
5-
use num_bigint::{BigInt, BigUint};
5+
use num_bigint::{BigInt, BigUint, Sign};
66
use num_traits::{Num, ToPrimitive};
77
use serde::Serialize;
88

@@ -35,10 +35,17 @@ pub fn convert_endianness_128(bytes: &[u8]) -> Vec<u8> {
3535
.collect::<Vec<u8>>()
3636
}
3737

38-
pub fn bigint_to_u8_32(n: &BigInt) -> Result<[u8; 32], Box<dyn std::error::Error>> {
39-
let (_, bytes_be) = n.to_bytes_be();
38+
pub fn bigint_to_u8_32(n: &BigInt) -> Result<[u8; 32], ProverClientError> {
39+
let (sign, bytes_be) = n.to_bytes_be();
40+
if sign == Sign::Minus {
41+
return Err(ProverClientError::InvalidProofData(
42+
"negative integers are not valid field elements".to_string(),
43+
));
44+
}
4045
if bytes_be.len() > 32 {
41-
Err("Number too large to fit in [u8; 32]")?;
46+
return Err(ProverClientError::InvalidProofData(
47+
"number too large to fit in [u8; 32]".to_string(),
48+
));
4249
}
4350
let mut array = [0; 32];
4451
let bytes = &bytes_be[..bytes_be.len()];

prover/client/src/proof_client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,21 @@ impl ProofClient {
8484
polling_interval: Duration,
8585
max_wait_time: Duration,
8686
api_key: Option<String>,
87-
) -> Result<Self, ProverClientError> {
87+
) -> Self {
8888
let initial_poll_delay = if api_key.is_some() {
8989
Duration::from_millis(INITIAL_POLL_DELAY_LARGE_CIRCUIT_MS)
9090
} else {
9191
Duration::from_millis(INITIAL_POLL_DELAY_SMALL_CIRCUIT_MS)
9292
};
9393

94-
Ok(Self {
94+
Self {
9595
client: build_http_client(),
9696
server_address,
9797
polling_interval,
9898
max_wait_time,
9999
api_key,
100100
initial_poll_delay,
101-
})
101+
}
102102
}
103103

104104
#[allow(unused)]

0 commit comments

Comments
 (0)