Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions crates/core/data-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,15 +664,12 @@ impl DataStore {
.await
.map_err(TruncateError::StreamMetadata)?;

let metadata_db = self.metadata_db.clone();
let object_store = self.object_store.clone();

let file_count = files.len() as u64;

futures::stream::iter(files)
.map(|file| {
let metadata_db = metadata_db.clone();
let object_store = object_store.clone();
let metadata_db = self.metadata_db.clone();
let object_store = self.object_store.clone();
async move {
// Delete from object store (treat "not found" as success)
match object_store.delete(&file.object_meta.location).await {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/datasets-registry/src/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where
.map_err(GetError::ObjectStoreReadBytes)?;

// Convert bytes to UTF-8 string
let content = String::from_utf8(bytes.to_vec())
let content = String::from_utf8(bytes.into())
.map(ManifestContent)
.map_err(GetError::Utf8Error)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/object-store/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
async fn get_string(&self, location: impl Into<Path>) -> Result<String, ObjectStoreExtError> {
let path = location.into();
let bytes = self.get_bytes(path.clone()).await?;
String::from_utf8(bytes.to_vec()).map_err(|err| ObjectStoreExtError::NotUtf8 {
String::from_utf8(bytes.into()).map_err(|err| ObjectStoreExtError::NotUtf8 {
path: path.to_string(),
source: err,
})
Expand Down
9 changes: 4 additions & 5 deletions crates/core/providers-registry/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,10 @@ async fn load_and_parse_file(
})?;

// Parse as UTF-8
let content =
String::from_utf8(bytes.to_vec()).map_err(|source| LoadFileError::InvalidUtf8 {
path: path.clone(),
source,
})?;
let content = String::from_utf8(bytes.into()).map_err(|source| LoadFileError::InvalidUtf8 {
path: path.clone(),
source,
})?;

// Parse TOML and deserialize directly to ProviderConfigRaw
let config = toml::from_str::<ProviderConfigRaw>(&content).map_err(|source| {
Expand Down
6 changes: 2 additions & 4 deletions crates/core/providers-registry/src/tests/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ async fn register_with_valid_provider_makes_immediately_available() {
.bytes()
.await
.expect("should read evm bytes");
let evm_file_contents =
String::from_utf8(evm_bytes.to_vec()).expect("should convert to string");
let evm_file_contents = String::from_utf8(evm_bytes.into()).expect("should convert to string");
let parsed_evm_provider: ProviderConfigRaw =
toml::from_str(&evm_file_contents).expect("should parse evm_mainnet file");
let parsed_evm_header = parsed_evm_provider
Expand All @@ -86,8 +85,7 @@ async fn register_with_valid_provider_makes_immediately_available() {
.bytes()
.await
.expect("should read fhs bytes");
let fhs_file_contents =
String::from_utf8(fhs_bytes.to_vec()).expect("should convert to string");
let fhs_file_contents = String::from_utf8(fhs_bytes.into()).expect("should convert to string");
let parsed_fhs_provider: ProviderConfigRaw =
toml::from_str(&fhs_file_contents).expect("should parse fhs_polygon file");
let parsed_fhs_header = parsed_fhs_provider
Expand Down
13 changes: 2 additions & 11 deletions crates/core/worker-datasets-derived/src/job_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,8 @@ pub async fn execute(
async move {
let table_name = table.table_name().to_string();

materialize_table(
ctx,
&manifest,
env.clone(),
table.clone(),
compactor,
opts.clone(),
end,
&siblings,
)
.await?;
materialize_table(ctx, &manifest, env, table, compactor, opts, end, &siblings)
.await?;

tracing::info!("materialization of `{}` completed successfully", table_name);
Ok(())
Expand Down
Loading