Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -95,6 +96,181 @@ private void insertTestDocument(String docId) throws IOException {
}
}

@Nested
class BulkUpsertConsistencyTest {
@ParameterizedTest(name = "{0}: bulkUpsert multiple documents")
@ArgumentsSource(AllStoresProvider.class)
void testBulkUpsert(String storeName) throws Exception {
String docId1 = generateDocId("bulk-1");
String docId2 = generateDocId("bulk-2");

Collection collection = getCollection(storeName);

Map<Key, Document> documents = new HashMap<>();
documents.put(createKey(docId1), createTestDocument(docId1));
documents.put(createKey(docId2), createTestDocument(docId2));

boolean result = collection.bulkUpsert(documents);
assertTrue(result);

for (String docId : List.of(docId1, docId2)) {
Query query = buildQueryById(docId);
try (CloseableIterator<Document> iterator = collection.find(query)) {
assertTrue(iterator.hasNext());
Document doc = iterator.next();
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());

assertEquals("TestItem", json.get("item").asText());
assertEquals(100, json.get("price").asInt());
assertEquals(50, json.get("quantity").asInt());
assertTrue(json.get("in_stock").asBoolean());

JsonNode tagsNode = json.get("tags");
assertNotNull(tagsNode);
assertEquals(2, tagsNode.size());
}
}
}

@ParameterizedTest(name = "{0}: bulkUpsert merges fields (does not replace entire document)")
@ArgumentsSource(AllStoresProvider.class)
void testBulkUpsertMergesFields(String storeName) throws Exception {
String docId1 = generateDocId("bulk-merge-1");
String docId2 = generateDocId("bulk-merge-2");

Collection collection = getCollection(storeName);

// Step 1: Insert initial documents with all fields
Map<Key, Document> initialDocs = new HashMap<>();
initialDocs.put(createKey(docId1), createTestDocument(docId1));
initialDocs.put(createKey(docId2), createTestDocument(docId2));

boolean insertResult = collection.bulkUpsert(initialDocs);
assertTrue(insertResult);

// Step 2: Upsert with partial documents (only some fields)
Map<Key, Document> partialDocs = new HashMap<>();

// Partial doc for docId1 - only update item and price
ObjectNode partial1 = OBJECT_MAPPER.createObjectNode();
partial1.put("id", getKeyString(docId1));
partial1.put("item", "UpdatedItem1");
partial1.put("price", 999);
partialDocs.put(createKey(docId1), new JSONDocument(partial1));

// Partial doc for docId2 - only update quantity and in_stock
ObjectNode partial2 = OBJECT_MAPPER.createObjectNode();
partial2.put("id", getKeyString(docId2));
partial2.put("quantity", 999);
partial2.put("in_stock", false);
partialDocs.put(createKey(docId2), new JSONDocument(partial2));

boolean upsertResult = collection.bulkUpsert(partialDocs);
assertTrue(upsertResult);

// Step 3: Verify that fields were merged, not replaced
// Doc1: item and price should be updated, other fields should be preserved
Query query1 = buildQueryById(docId1);
try (CloseableIterator<Document> iter = collection.find(query1)) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());

// Updated fields
assertEquals("UpdatedItem1", json.get("item").asText());
assertEquals(999, json.get("price").asInt());

// Preserved fields (should still have original values)
assertEquals(50, json.get("quantity").asInt());
assertTrue(json.get("in_stock").asBoolean());
assertEquals(1000000000000L, json.get("big_number").asLong());

// Arrays and JSONB should be preserved
JsonNode tagsNode = json.get("tags");
assertNotNull(tagsNode);
assertEquals(2, tagsNode.size());

JsonNode propsNode = json.get("props");
assertNotNull(propsNode);
assertEquals("TestBrand", propsNode.get("brand").asText());
}

// Doc2: quantity and in_stock should be updated, other fields should be preserved
Query query2 = buildQueryById(docId2);
try (CloseableIterator<Document> iter = collection.find(query2)) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());

// Updated fields
assertEquals(999, json.get("quantity").asInt());
assertFalse(json.get("in_stock").asBoolean());

// Preserved fields (should still have original values)
assertEquals("TestItem", json.get("item").asText());
assertEquals(100, json.get("price").asInt());
assertEquals(1000000000000L, json.get("big_number").asLong());

// Arrays and JSONB should be preserved
JsonNode tagsNode = json.get("tags");
assertNotNull(tagsNode);
assertEquals(2, tagsNode.size());

JsonNode propsNode = json.get("props");
assertNotNull(propsNode);
assertEquals("TestBrand", propsNode.get("brand").asText());
}
}

@ParameterizedTest(name = "{0}: bulkUpsert skips documents with invalid fields gracefully")
@ArgumentsSource(AllStoresProvider.class)
void testBulkUpsertSkipsInvalidFields(String storeName) throws Exception {
String docId1 = generateDocId("bulk-skip-1");
String docId2 = generateDocId("bulk-skip-2");
String docId3 = generateDocId("bulk-skip-3");

Collection collection = getCollection(storeName);

Map<Key, Document> documents = new LinkedHashMap<>();

// First document - valid
documents.put(createKey(docId1), createTestDocument(docId1));

ObjectNode invalidFieldDoc = OBJECT_MAPPER.createObjectNode();
invalidFieldDoc.put("id", getKeyString(docId2));
invalidFieldDoc.put("item", "PartialItem");
invalidFieldDoc.put("price", 200);
invalidFieldDoc.put("quantity", 20);
invalidFieldDoc.put("in_stock", false);
invalidFieldDoc.putArray("numbers").add("not-a-number").add("also-not-a-number");
documents.put(createKey(docId2), new JSONDocument(invalidFieldDoc));

// Third document - valid
documents.put(createKey(docId3), createTestDocument(docId3));

boolean result = collection.bulkUpsert(documents);
assertTrue(result);

for (String docId : List.of(docId1, docId2, docId3)) {
Query query = buildQueryById(docId);
try (CloseableIterator<Document> iter = collection.find(query)) {
assertTrue(iter.hasNext());
}
}

if (storeName.equals(POSTGRES_FLAT_STORE)) {
Query query2 = buildQueryById(docId2);
try (CloseableIterator<Document> iter = collection.find(query2)) {
assertTrue(iter.hasNext());
JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson());
// The 'numbers' field should be null/missing since it was skipped
assertTrue(json.get("numbers") == null || json.get("numbers").isNull());
// But other fields should be present
assertEquals("PartialItem", json.get("item").asText());
assertEquals(200, json.get("price").asInt());
}
}
}
}

@Nested
@DisplayName("Upsert Consistency Tests")
class UpsertConsistencyTests {
Expand Down Expand Up @@ -212,40 +388,6 @@ void testUpsertExistingDoc(String storeName) throws Exception {
}
}

@ParameterizedTest(name = "{0}: bulkUpsert multiple documents")
@ArgumentsSource(AllStoresProvider.class)
void testBulkUpsert(String storeName) throws Exception {
String docId1 = generateDocId("bulk-1");
String docId2 = generateDocId("bulk-2");

Collection collection = getCollection(storeName);

Map<Key, Document> documents = new HashMap<>();
documents.put(createKey(docId1), createTestDocument(docId1));
documents.put(createKey(docId2), createTestDocument(docId2));

boolean result = collection.bulkUpsert(documents);
assertTrue(result);

for (String docId : List.of(docId1, docId2)) {
Query query = buildQueryById(docId);
try (CloseableIterator<Document> iterator = collection.find(query)) {
assertTrue(iterator.hasNext());
Document doc = iterator.next();
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());

assertEquals("TestItem", json.get("item").asText());
assertEquals(100, json.get("price").asInt());
assertEquals(50, json.get("quantity").asInt());
assertTrue(json.get("in_stock").asBoolean());

JsonNode tagsNode = json.get("tags");
assertNotNull(tagsNode);
assertEquals(2, tagsNode.size());
}
}
}

@ParameterizedTest(name = "{0}: upsert with non-existing fields (schema mismatch)")
@ArgumentsSource(AllStoresProvider.class)
void testUpsertNonExistingFields(String storeName) throws Exception {
Expand Down Expand Up @@ -1087,4 +1229,43 @@ void testMultipleUpdatesOnSameFieldThrowsException(String storeName) throws IOEx
}
}
}

@Nested
@DisplayName("Delete Consistency Tests")
class DeleteConsistencyTests {

@ParameterizedTest(name = "{0}: delete existing key returns true")
@ArgumentsSource(AllStoresProvider.class)
void testDeleteExistingKey(String storeName) throws Exception {
String docId = generateDocId("delete-existing");
Key key = createKey(docId);
Collection collection = getCollection(storeName);

Document document = createTestDocument(docId);
collection.upsert(key, document);

Query query = buildQueryById(docId);
try (CloseableIterator<Document> iterator = collection.find(query)) {
assertTrue(iterator.hasNext());
}

boolean result = collection.delete(key);
assertTrue(result);

try (CloseableIterator<Document> iterator = collection.find(query)) {
assertFalse(iterator.hasNext());
}
}

@ParameterizedTest(name = "{0}: delete on non-existent key returns false")
@ArgumentsSource(AllStoresProvider.class)
void testDeleteNonExistentKey(String storeName) {
Collection collection = getCollection(storeName);

Key nonExistentKey = createKey("non-existent-key-" + System.nanoTime());

boolean result = collection.delete(nonExistentKey);
assertFalse(result);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"id": "default:bulk-replace-return-1",
"item": "Original1",
"price": 100,
"quantity": 50,
"tags": ["electronics", "sale"],
"numbers": [10, 20, 30],
"props": {
"brand": "OriginalBrand1",
"color": "red",
"nested": {
"key": "value1"
}
},
"sales": {
"total": 500,
"region": "US"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"id": "default:bulk-replace-return-2",
"item": "Original2",
"price": 200,
"quantity": 75,
"tags": ["clothing", "premium"],
"scores": [1.5, 2.5, 3.5],
"props": {
"brand": "OriginalBrand2",
"size": "large"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"id": "default:bulk-replace-return-1",
"item": "Updated1",
"price": 999,
"tags": ["newTag1"],
"props": {
"brand": "NewBrand1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"id": "default:bulk-replace-return-2",
"item": "Updated2",
"price": 888
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"id": "default:bulk-replace-return-1",
"item": "Updated1",
"price": 999,
"tags": ["newTag1"],
"props": {
"brand": "NewBrand1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"id": "default:bulk-replace-return-2",
"item": "Updated2",
"price": 888
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ public interface Collection {
* store.
*
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
* the existing fields are modified is implementation specific. For example, upserting <code>
* { "foo2": "bar2" }
* the existing fields are modified is implementation specific. For example, upserting <code> {
* "foo2": "bar2" }
* </code> if a document <code>
* { "foo1": "bar1" }
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
Expand All @@ -42,8 +42,8 @@ public interface Collection {
* store.
*
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
* the existing fields are modified is implementation specific. For example, upserting <code>
* { "foo2": "bar2" }
* the existing fields are modified is implementation specific. For example, upserting <code> {
* "foo2": "bar2" }
* </code> if a document <code>
* { "foo1": "bar1" }
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
Expand Down Expand Up @@ -285,6 +285,17 @@ default boolean bulkCreateOrReplace(Map<Key, Document> documents) {
throw new UnsupportedOperationException("bulkCreateOrReplace is not supported");
}

/**
* Method to bulkCreateOrReplace the given documents and return the previous copies of those
* documents. This helps the clients to see how the documents were prior to upserting them and do
* that in one less round trip.
*/
default CloseableIterator<Document> bulkCreateOrReplaceReturnOlderDocuments(
Map<Key, Document> documents) throws IOException {
throw new UnsupportedOperationException(
"bulkCreateOrReplaceReturnOlderDocuments is not supported!");
}

/**
* Atomically create a new document if the key does not exist in the collection or, replace the
* existing document if the key exists in the collection and return the created/replaced document
Expand Down
Loading
Loading