Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 11 additions & 46 deletions java/lance-jni/src/optimize.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use jni::{
JNIEnv,
objects::{JByteArray, JMap, JObject, JValue, JValueGen},
objects::{JByteArray, JObject, JValueGen},
sys::jlong,
};
use lance::dataset::{
Expand Down Expand Up @@ -256,7 +256,7 @@ const COMPACTION_PLAN_CONSTRUCTOR_SIG: &str =
"(Ljava/util/List;JLorg/lance/compaction/CompactionOptions;)V";
const REWRITE_RESULT_CLASS: &str = "org/lance/compaction/RewriteResult";
const REWRITE_RESULT_CONSTRUCTOR_SIG: &str =
"(Lorg/lance/compaction/CompactionMetrics;Ljava/util/List;Ljava/util/List;JLjava/util/Map;[B)V";
"(Lorg/lance/compaction/CompactionMetrics;Ljava/util/List;Ljava/util/List;J[B)V";
const COMPACTION_OPTIONS_CLASS: &str = "org/lance/compaction/CompactionOptions";
const COMPACTION_OPTIONS_CONSTRUCTOR_SIG: &str = "(Ljava/util/Optional;Ljava/util/Optional;Ljava/util/Optional;Ljava/util/Optional;Ljava/util/Optional;Ljava/util/Optional;Ljava/util/Optional;Ljava/util/Optional;)V";

Expand Down Expand Up @@ -346,25 +346,8 @@ impl IntoJava for &RewriteResult {
let metrics = self.metrics.into_java(env)?;
let new_fragments = export_vec(env, &self.new_fragments)?;
let original_fragments = export_vec(env, &self.original_fragments)?;
let changed_row_addrs: JObject<'_> =
if let Some(changed_row_addrs) = &self.changed_row_addrs {
env.byte_array_from_slice(changed_row_addrs)?.into()
} else {
JObject::null()
};
let row_id_map = if let Some(row_id_map) = &self.row_id_map {
let java_map = env.new_object("java/util/HashMap", "()V", &[])?;
for (k, v) in row_id_map {
let k_obj = to_java_long_obj(env, Some(*k as i64))?;
let v_obj = to_java_long_obj(env, v.map(|val| val as i64))?;
env.call_method(
&java_map,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[JValue::Object(&k_obj), JValue::Object(&v_obj)],
)?;
}
java_map
let row_addrs: JObject<'_> = if let Some(row_addrs) = &self.row_addrs {
env.byte_array_from_slice(row_addrs)?.into()
} else {
JObject::null()
};
Expand All @@ -376,8 +359,7 @@ impl IntoJava for &RewriteResult {
JValueGen::Object(&new_fragments),
JValueGen::Object(&original_fragments),
JValueGen::Long(self.read_version as i64),
JValueGen::Object(&row_id_map),
JValueGen::Object(&changed_row_addrs),
JValueGen::Object(&row_addrs),
],
)?)
}
Expand Down Expand Up @@ -433,38 +415,21 @@ impl FromJObjectWithEnv<RewriteResult> for JObject<'_> {
import_vec_from_method(env, self, "getOriginalFragments", |env, fragment| {
fragment.extract_object(env)
})?;
let changed_row_addrs_obj: JByteArray<'_> = env
.call_method(self, "getChangedRowAddrs", "()[B", &[])?
let row_addrs_obj: JByteArray<'_> = env
.call_method(self, "getRowAddrs", "()[B", &[])?
.l()?
.into();
let changed_row_addrs = if changed_row_addrs_obj.is_null() {
let row_addrs = if row_addrs_obj.is_null() {
None
} else {
Some(env.convert_byte_array(changed_row_addrs_obj)?)
};
let row_id_map_obj = env
.call_method(self, "getRowIdMap", "()Ljava/util/Map;", &[])?
.l()?;
let row_id_map = if row_id_map_obj.is_null() {
None
} else {
let row_id_jmap = JMap::from_env(env, &row_id_map_obj)?;
let mut map = HashMap::new();
let mut iter = row_id_jmap.iter(env)?;
while let Some((key, value)) = iter.next(env)? {
let key: Option<i64> = key.extract_object(env)?;
let value: Option<i64> = value.extract_object(env)?;
map.insert(key.unwrap() as u64, value.map(|v| v as u64));
}
Some(map)
Some(env.convert_byte_array(row_addrs_obj)?)
};
Ok(RewriteResult {
metrics,
new_fragments,
read_version,
original_fragments,
row_id_map,
changed_row_addrs,
row_addrs,
})
}
}
24 changes: 7 additions & 17 deletions java/src/main/java/org/lance/compaction/RewriteResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* Rewrite Result of a single compaction task. It will be passed across different workers and be
Expand All @@ -31,25 +30,21 @@ public class RewriteResult implements Serializable {
private final List<FragmentMetadata> originalFragments;
private final long readVersion;

// null if index remap is deferred after compaction
@Nullable private final Map<Long, Long> rowIdMap;

// null if index remap is part of compaction
@Nullable private final byte[] changedRowAddrs;
// Serialized RoaringTreemap of row addresses read from the original fragments.
// null for stable row IDs.
@Nullable private final byte[] rowAddrs;

public RewriteResult(
CompactionMetrics metrics,
List<FragmentMetadata> newFragments,
List<FragmentMetadata> originalFragments,
long readVersion,
Map<Long, Long> rowIdMap,
byte[] changedRowAddrs) {
byte[] rowAddrs) {
this.metrics = metrics;
this.newFragments = newFragments;
this.originalFragments = originalFragments;
this.readVersion = readVersion;
this.rowIdMap = rowIdMap;
this.changedRowAddrs = changedRowAddrs;
this.rowAddrs = rowAddrs;
}

public long getReadVersion() {
Expand All @@ -61,8 +56,8 @@ public CompactionMetrics getMetrics() {
}

@Nullable
public byte[] getChangedRowAddrs() {
return changedRowAddrs;
public byte[] getRowAddrs() {
return rowAddrs;
}

public List<FragmentMetadata> getNewFragments() {
Expand All @@ -72,9 +67,4 @@ public List<FragmentMetadata> getNewFragments() {
public List<FragmentMetadata> getOriginalFragments() {
return originalFragments;
}

@Nullable
public Map<Long, Long> getRowIdMap() {
return rowIdMap;
}
}
Loading