diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 9ca1e31e8..3111cc639 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -40,7 +40,6 @@ import org.hypertrace.core.documentstore.postgres.PostgresDatastore; import org.hypertrace.core.documentstore.query.Query; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -68,34 +67,9 @@ public static void init() throws IOException { postgresDatastore.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); } - private static void executeInsertStatements() { - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try { - String jsonContent = readFileFromResource(INSERT_STATEMENTS_FILE).orElseThrow(); - JsonNode rootNode = OBJECT_MAPPER.readTree(jsonContent); - JsonNode statementsNode = rootNode.get("statements"); - - if (statementsNode == null || !statementsNode.isArray()) { - throw new RuntimeException("Invalid JSON format: 'statements' array not found"); - } - - try (Connection connection = pgDatastore.getPostgresClient()) { - for (JsonNode statementNode : statementsNode) { - String statement = statementNode.asText().trim(); - if (!statement.isEmpty()) { - try (PreparedStatement preparedStatement = connection.prepareStatement(statement)) { - preparedStatement.executeUpdate(); - } catch (Exception e) { - LOGGER.error("Failed to execute INSERT statement: {}", e.getMessage(), e); - throw e; - } - } - } - } - LOGGER.info("Inserted initial data into: {}", FLAT_COLLECTION_NAME); - } catch (Exception e) { - LOGGER.error("Failed to execute INSERT statements: {}", e.getMessage(), e); - } + @AfterAll + public static void shutdown() { + shutdownPostgres(); } @BeforeEach @@ -105,16 +79,6 @@ public void setupData() { executeInsertStatements(); } - @AfterEach - public void cleanup() { - // Data is cleared in @BeforeEach, but cleanup here for safety - } - - @AfterAll - public static void shutdown() { - shutdownPostgres(); - } - @Nested @DisplayName("Upsert Operations") class UpsertTests { @@ -185,65 +149,6 @@ void testUpsertMergesWithExistingDocument() throws Exception { }); } - @Test - @DisplayName("Upsert vs CreateOrReplace: upsert preserves, createOrReplace resets to default") - void testUpsertVsCreateOrReplaceBehavior() throws Exception { - String docId1 = generateDocId("test"); - String docId2 = generateDocId("test"); - - // Setup: Create two identical documents - ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); - initialNode.put("item", "Original Item"); - initialNode.put("price", 100); - initialNode.put("quantity", 50); - - ObjectNode doc1 = initialNode.deepCopy(); - doc1.put("id", docId1); - ObjectNode doc2 = initialNode.deepCopy(); - doc2.put("id", docId2); - - Key key1 = new SingleValueKey(DEFAULT_TENANT, docId1); - Key key2 = new SingleValueKey(DEFAULT_TENANT, docId2); - - flatCollection.upsert(key1, new JSONDocument(doc1)); - flatCollection.upsert(key2, new JSONDocument(doc2)); - - // Now update both with partial documents (only item field) - ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode(); - partialUpdate.put("item", "Updated Item"); - - ObjectNode partial1 = partialUpdate.deepCopy(); - partial1.put("id", docId1); - ObjectNode partial2 = partialUpdate.deepCopy(); - partial2.put("id", docId2); - - // Use upsert for doc1 - should PRESERVE price and quantity - flatCollection.upsert(key1, new JSONDocument(partial1)); - - // Use createOrReplace for doc2 - should RESET price and quantity to NULL (default) - flatCollection.createOrReplace(key2, new JSONDocument(partial2)); - - // Verify upsert preserved original values - queryAndAssert( - key1, - rs -> { - assertTrue(rs.next()); - assertEquals("Updated Item", rs.getString("item")); - assertEquals(100, rs.getInt("price")); // PRESERVED - assertEquals(50, rs.getInt("quantity")); // PRESERVED - }); - - // Verify createOrReplace reset to defaults - queryAndAssert( - key2, - rs -> { - assertTrue(rs.next()); - assertEquals("Updated Item", rs.getString("item")); - assertNull(rs.getObject("price")); // RESET to NULL - assertNull(rs.getObject("quantity")); // RESET to NULL - }); - } - @Test @DisplayName("Should skip unknown fields in upsert (default SKIP strategy)") void testUpsertSkipsUnknownFields() throws Exception { @@ -285,954 +190,1023 @@ void testUpsertAndReturn() { } @Nested - @DisplayName("Create Operations") - class CreateTests { - + class BulkUpsertTests { @Test - @DisplayName("Should create document with all supported data types") - void testCreateWithAllDataTypes() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - String docId = generateDocId("test"); + @DisplayName("Should bulk upsert multiple new documents") + void testBulkUpsertNewDocuments() throws Exception { + Map bulkMap = new LinkedHashMap<>(); - objectNode.put("id", docId); - objectNode.put("item", "Comprehensive Test Item"); - objectNode.put("price", 999); - objectNode.put("quantity", "50"); - objectNode.put("big_number", 9223372036854775807L); - objectNode.put("rating", 4.5f); - objectNode.put("weight", 123.456789); - objectNode.put("in_stock", true); - objectNode.put("date", 1705315800000L); - objectNode.put("created_date", "2024-01-15"); - objectNode.putArray("tags").add("electronics").add("sale").add("featured"); - objectNode.put("categoryTags", "single-category"); - objectNode.putArray("numbers").add(10).add(20).add(30); - objectNode.putArray("scores").add(1.5).add(2.5).add(3.5); - objectNode.putArray("flags").add(true).add(false).add(true); + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "BulkItem101"); + node1.put("price", 101); + node1.put("quantity", 10); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-101"), new JSONDocument(node1)); - ObjectNode propsNode = OBJECT_MAPPER.createObjectNode(); - propsNode.put("color", "blue"); - propsNode.put("size", "large"); - propsNode.put("weight", 2.5); - propsNode.put("warranty", true); - propsNode.putObject("nested").put("key", "value"); - objectNode.set("props", propsNode); + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "BulkItem102"); + node2.put("price", 102); + node2.put("quantity", 20); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-102"), new JSONDocument(node2)); - ObjectNode salesNode = OBJECT_MAPPER.createObjectNode(); - salesNode.put("total", 1000); - salesNode.put("region", "US"); - objectNode.set("sales", salesNode); + ObjectNode node3 = OBJECT_MAPPER.createObjectNode(); + node3.put("item", "BulkItem103"); + node3.put("price", 103); + node3.put("in_stock", true); + ObjectNode props = OBJECT_MAPPER.createObjectNode(); + props.put("color", "red"); + props.put("size", "large"); + node3.set("props", props); + node3.putArray("tags").add("electronics").add("sale"); + node3.putArray("numbers").add(1).add(2).add(3); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-103"), new JSONDocument(node3)); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + boolean result = flatCollection.bulkUpsert(bulkMap); - CreateResult result = flatCollection.create(key, document); + assertTrue(result); - assertTrue(result.isSucceed()); - assertFalse(result.isPartial()); - assertTrue(result.getSkippedFields().isEmpty()); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-101"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkItem101", rs.getString("item")); + assertEquals(101, rs.getInt("price")); + assertEquals(10, rs.getInt("quantity")); + }); - // Verify all data types were inserted correctly queryAndAssert( - key, + new SingleValueKey(DEFAULT_TENANT, "bulk-102"), rs -> { assertTrue(rs.next()); + assertEquals("BulkItem102", rs.getString("item")); + assertEquals(102, rs.getInt("price")); + assertEquals(20, rs.getInt("quantity")); + }); - assertEquals("Comprehensive Test Item", rs.getString("item")); - assertEquals(999, rs.getInt("price")); - assertEquals(50, rs.getInt("quantity")); - assertEquals(9223372036854775807L, rs.getLong("big_number")); - assertEquals(4.5f, rs.getFloat("rating"), 0.01f); - assertEquals(123.456789, rs.getDouble("weight"), 0.0001); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-103"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkItem103", rs.getString("item")); + assertEquals(103, rs.getInt("price")); assertTrue(rs.getBoolean("in_stock")); - assertEquals(1705315800000L, rs.getTimestamp("date").getTime()); // epoch millis - assertNotNull(rs.getDate("created_date")); + // Verify JSONB column + JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); + assertEquals("red", propsResult.get("color").asText()); + assertEquals("large", propsResult.get("size").asText()); + + // Verify array columns String[] tags = (String[]) rs.getArray("tags").getArray(); - assertEquals(3, tags.length); + assertEquals(2, tags.length); assertEquals("electronics", tags[0]); - assertEquals("sale", tags[1]); - assertEquals("featured", tags[2]); - - // Single value auto-converted to array - String[] categoryTags = (String[]) rs.getArray("categoryTags").getArray(); - assertEquals(1, categoryTags.length); - assertEquals("single-category", categoryTags[0]); Integer[] numbers = (Integer[]) rs.getArray("numbers").getArray(); assertEquals(3, numbers.length); - assertEquals(10, numbers[0]); - assertEquals(20, numbers[1]); - assertEquals(30, numbers[2]); - - Double[] scores = (Double[]) rs.getArray("scores").getArray(); - assertEquals(3, scores.length); - assertEquals(1.5, scores[0], 0.01); - - Boolean[] flags = (Boolean[]) rs.getArray("flags").getArray(); - assertEquals(3, flags.length); - assertTrue(flags[0]); - assertFalse(flags[1]); - - String propsJson = rs.getString("props"); - assertNotNull(propsJson); - JsonNode propsResult = OBJECT_MAPPER.readTree(propsJson); - assertEquals("blue", propsResult.get("color").asText()); - assertEquals("large", propsResult.get("size").asText()); - assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); - assertTrue(propsResult.get("warranty").asBoolean()); - assertEquals("value", propsResult.get("nested").get("key").asText()); - - String salesJson = rs.getString("sales"); - assertNotNull(salesJson); - JsonNode salesResult = OBJECT_MAPPER.readTree(salesJson); - assertEquals(1000, salesResult.get("total").asInt()); - assertEquals("US", salesResult.get("region").asText()); + assertEquals(1, numbers[0]); }); } @Test - @DisplayName("Should throw DuplicateDocumentException when creating with existing key") - void testCreateDuplicateDocument() throws Exception { + @DisplayName("Should bulk upsert updating existing documents") + void testBulkUpsertUpdatesExistingDocuments() throws Exception { + // First create some documents + String docId1 = "bulk-update-1"; + String docId2 = "bulk-update-2"; - String docId = generateDocId("test"); - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "dup-doc-200"); - objectNode.put("item", "First Item"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); + initial1.put("item", "Original1"); + initial1.put("price", 100); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); - CreateResult createResult = flatCollection.create(key, document); - Preconditions.checkArgument( - createResult.isSucceed(), - "Preconditions failure: Could not create doc with id: " + docId); + ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); + initial2.put("item", "Original2"); + initial2.put("price", 200); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); - ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); - objectNode2.put("id", "dup-doc-200"); - objectNode2.put("item", "Second Item"); - Document document2 = new JSONDocument(objectNode2); + // Now bulk upsert with updates + Map bulkMap = new LinkedHashMap<>(); - assertThrows(DuplicateDocumentException.class, () -> flatCollection.create(key, document2)); - } + ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); + updated1.put("item", "Updated1"); + updated1.put("price", 999); + updated1.put("quantity", 50); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); - @ParameterizedTest - @DisplayName( - "When MissingColumnStrategy is Throw, should throw an exception for unknown fields. Unknown fields are those fields that are not found in the schema but are present in the doc") - @ArgumentsSource(MissingColumnStrategyProvider.class) - void testUnknownFieldsAsPerMissingColumnStrategy(MissingColumnStrategy missingColumnStrategy) - throws Exception { + ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); + updated2.put("item", "Updated2"); + updated2.put("price", 888); + updated2.put("in_stock", true); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); - String docId = generateDocId("test"); + boolean result = flatCollection.bulkUpsert(bulkMap); - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", docId); - objectNode.put("item", "Item"); - objectNode.put("unknown_column", "should throw"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + assertTrue(result); - if (missingColumnStrategy == MissingColumnStrategy.THROW) { - Collection collection = - getFlatCollectionWithStrategy(MissingColumnStrategy.THROW.toString()); - assertThrows(SchemaMismatchException.class, () -> collection.create(key, document)); - // Verify no document was inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = '%s'", - FLAT_COLLECTION_NAME, key)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals(0, rs.getInt(1)); - } - } else { - CreateResult result = flatCollection.create(key, document); - // for SKIP - assertTrue(result.isSucceed()); - // this is a partial write because unknown_column was not written to - assertTrue(result.isPartial()); - assertTrue(result.getSkippedFields().contains("unknown_column")); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated1", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + }); - queryAndAssert( - key, - rs -> { - assertTrue(rs.next()); - assertEquals("Item", rs.getString("item")); - }); - } + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated2", rs.getString("item")); + assertEquals(888, rs.getInt("price")); + assertTrue(rs.getBoolean("in_stock")); + }); } @Test - @DisplayName( - "Should use default SKIP strategy when missingColumnStrategy config is empty string") - void testEmptyMissingColumnStrategyConfigUsesDefault() throws Exception { - Collection collectionWithEmptyStrategy = getFlatCollectionWithStrategy(""); + @DisplayName("Should bulk upsert with mixed inserts and updates") + void testBulkUpsertMixedInsertAndUpdate() throws Exception { + // Create one existing document + String existingId = "bulk-mixed-existing"; + ObjectNode existing = OBJECT_MAPPER.createObjectNode(); + existing.put("item", "ExistingItem"); + existing.put("price", 100); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(existing)); - // Test that it uses default SKIP strategy (unknown fields are skipped, not thrown) - String docId = generateDocId("test"); - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", docId); - objectNode.put("item", "Test Item"); - objectNode.put("unknown_field", "should be skipped with default SKIP strategy"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + // Bulk upsert: update existing + insert new + Map bulkMap = new LinkedHashMap<>(); - CreateResult result = collectionWithEmptyStrategy.create(key, document); + ObjectNode updatedExisting = OBJECT_MAPPER.createObjectNode(); + updatedExisting.put("item", "UpdatedExisting"); + updatedExisting.put("price", 555); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(updatedExisting)); - // With default SKIP strategy, unknown fields are skipped - assertTrue(result.isSucceed()); - assertTrue(result.isPartial()); - assertTrue(result.getSkippedFields().contains("unknown_field")); - } + ObjectNode newDoc = OBJECT_MAPPER.createObjectNode(); + newDoc.put("item", "NewItem"); + newDoc.put("price", 777); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-mixed-new"), new JSONDocument(newDoc)); - @Test - @DisplayName("Should use default SKIP strategy when missingColumnStrategy config is invalid") - void testInvalidMissingColumnStrategyConfigUsesDefault() throws Exception { - Collection collectionWithInvalidStrategy = getFlatCollectionWithStrategy("INVALID_STRATEGY"); + boolean result = flatCollection.bulkUpsert(bulkMap); - String docId = generateDocId("test"); - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", docId); - objectNode.put("item", "Test Item"); - objectNode.put("unknown_field", "should be skipped with default SKIP strategy"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + assertTrue(result); - CreateResult result = collectionWithInvalidStrategy.create(key, document); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, existingId), + rs -> { + assertTrue(rs.next()); + assertEquals("UpdatedExisting", rs.getString("item")); + assertEquals(555, rs.getInt("price")); + }); - // With default SKIP strategy, unknown fields are skipped - assertTrue(result.isSucceed()); - assertTrue(result.isPartial()); - assertTrue(result.getSkippedFields().contains("unknown_field")); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-mixed-new"), + rs -> { + assertTrue(rs.next()); + assertEquals("NewItem", rs.getString("item")); + assertEquals(777, rs.getInt("price")); + }); } @Test - @DisplayName("Should return failure when all fields are unknown (parsed.isEmpty)") - void testCreateFailsWhenAllFieldsAreUnknown() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("completely_unknown_field1", "value1"); - objectNode.put("completely_unknown_field2", "value2"); - objectNode.put("another_nonexistent_column", 123); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "all-unknown-doc-700"); - - CreateResult result = flatCollection.create(key, document); - - // Although no column exists in the schema, it'll create a new doc with the key as the id - assertTrue(result.isSucceed()); - assertEquals(3, result.getSkippedFields().size()); - assertTrue( - result - .getSkippedFields() - .containsAll( - List.of( - "completely_unknown_field1", - "completely_unknown_field2", - "another_nonexistent_column"))); + @DisplayName("Should handle empty document map") + void testBulkUpsertEmptyMap() { + Map emptyMap = Collections.emptyMap(); + boolean result = flatCollection.bulkUpsert(emptyMap); + assertTrue(result); + } - // Verify no row was inserted - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = '%s'", - FLAT_COLLECTION_NAME, key)); - ResultSet rs = ps.executeQuery()) { - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - } + @Test + @DisplayName("Should handle null document map") + void testBulkUpsertNullMap() { + boolean result = flatCollection.bulkUpsert(null); + assertTrue(result); } @Test - @DisplayName("Should refresh schema and retry on UNDEFINED_COLUMN error") - void testCreateRefreshesSchemaOnUndefinedColumnError() throws Exception { - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + @DisplayName("Should bulk upsert documents with different column sets") + void testBulkUpsertDocumentsWithDifferentColumns() throws Exception { + Map bulkMap = new LinkedHashMap<>(); - // Step 1: Add a temporary column and do a create to cache the schema - String addColumnSQL = - String.format("ALTER TABLE \"%s\" ADD COLUMN \"temp_col\" TEXT", FLAT_COLLECTION_NAME); - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = conn.prepareStatement(addColumnSQL)) { - ps.execute(); - LOGGER.info("Added temporary column 'temp_col' to table"); - } + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "ItemWithPrice"); + node1.put("price", 100); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-1"), new JSONDocument(node1)); - // Step 2: Create a document with the temp column to cache the schema - ObjectNode objectNode1 = OBJECT_MAPPER.createObjectNode(); - objectNode1.put("id", "cache-schema-doc"); - objectNode1.put("item", "Item to cache schema"); - objectNode1.put("temp_col", "temp value"); - flatCollection.create( - new SingleValueKey("default", "cache-schema-doc"), new JSONDocument(objectNode1)); - LOGGER.info("Schema cached with temp_col"); + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "ItemWithQuantity"); + node2.put("quantity", 50); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-2"), new JSONDocument(node2)); - // Step 3: DROP the column - now the cached schema is stale - String dropColumnSQL = - String.format("ALTER TABLE \"%s\" DROP COLUMN \"temp_col\"", FLAT_COLLECTION_NAME); - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = conn.prepareStatement(dropColumnSQL)) { - ps.execute(); - LOGGER.info("Dropped temp_col - schema cache is now stale"); - } + ObjectNode node3 = OBJECT_MAPPER.createObjectNode(); + node3.put("item", "ItemWithAll"); + node3.put("price", 200); + node3.put("quantity", 30); + node3.put("in_stock", true); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-3"), new JSONDocument(node3)); - // Step 4: Try to create with the dropped column - // Schema registry still thinks temp_col exists, so it will include it in INSERT - // INSERT will fail with UNDEFINED_COLUMN, triggering handlePSQLExceptionForCreate - // which will refresh schema and retry - ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); - objectNode2.put("id", "retry-doc-800"); - objectNode2.put("item", "Item after schema refresh"); - objectNode2.put("temp_col", "this column no longer exists"); - Document document = new JSONDocument(objectNode2); - Key key = new SingleValueKey("default", "retry-doc-800"); + boolean result = flatCollection.bulkUpsert(bulkMap); - CreateResult result = flatCollection.create(key, document); + assertTrue(result); - // Should succeed - temp_col will be skipped (either via retry or schema refresh) - assertTrue(result.isSucceed()); - // The dropped column should be skipped - assertTrue(result.getSkippedFields().contains("temp_col")); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "diff-cols-1"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithPrice", rs.getString("item")); + assertEquals(100, rs.getInt("price")); + assertEquals(0, rs.getInt("quantity")); + assertTrue(rs.wasNull()); + }); - // Verify the valid fields were inserted queryAndAssert( - key, + new SingleValueKey(DEFAULT_TENANT, "diff-cols-2"), rs -> { assertTrue(rs.next()); - assertEquals("Item after schema refresh", rs.getString("item")); + assertEquals("ItemWithQuantity", rs.getString("item")); + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + assertEquals(50, rs.getInt("quantity")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "diff-cols-3"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithAll", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + assertEquals(30, rs.getInt("quantity")); + assertTrue(rs.getBoolean("in_stock")); }); } - @ParameterizedTest - @DisplayName("Should skip column with unparseable value and add to skippedFields") - @ArgumentsSource(MissingColumnStrategyProvider.class) - void testUnparsableValuesAsPerMissingColStrategy(MissingColumnStrategy missingColumnStrategy) - throws Exception { - - String docId = generateDocId("test"); - - // Try to insert a string value into an integer column with wrong type - // The unparseable column should be skipped, not throw an exception - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", docId); - objectNode.put("item", "Valid Item"); - objectNode.put("price", "not_a_number_at_all"); // price is INTEGER, this will fail parsing - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); - - if (missingColumnStrategy == MissingColumnStrategy.THROW) { - CreateResult result = - getFlatCollectionWithStrategy(MissingColumnStrategy.SKIP.toString()) - .create(key, document); - - // Should succeed with the valid columns, skipping the unparseable one - assertTrue(result.isSucceed()); - assertTrue(result.isPartial()); - assertEquals(1, result.getSkippedFields().size()); - assertTrue(result.getSkippedFields().contains("price")); - - // Verify the valid fields were inserted - queryAndAssert( - key, - rs -> { - assertTrue(rs.next()); - assertEquals("Valid Item", rs.getString("item")); - // price should be null since it was skipped - assertEquals(0, rs.getInt("price")); - assertTrue(rs.wasNull()); - }); - } else { - // SKIP strategy: unparseable value should be skipped, document created - CreateResult result = flatCollection.create(key, document); - assertTrue(result.isSucceed()); - assertTrue(result.isPartial()); - assertEquals(1, result.getSkippedFields().size()); - assertTrue(result.getSkippedFields().contains("price")); - - // Verify the valid fields were inserted - queryAndAssert( - key, - rs -> { - assertTrue(rs.next()); - assertEquals("Valid Item", rs.getString("item")); - // price should be null since it was skipped - assertEquals(0, rs.getInt("price")); - assertTrue(rs.wasNull()); - }); - } - } - } - - private static Collection getFlatCollectionWithStrategy(String strategy) { - String postgresConnectionUrl = - String.format("jdbc:postgresql://localhost:%s/", postgresContainer.getMappedPort(5432)); - - Map configWithStrategy = new HashMap<>(); - configWithStrategy.put("url", postgresConnectionUrl); - configWithStrategy.put("user", "postgres"); - configWithStrategy.put("password", "postgres"); - configWithStrategy.put("customParams.missingColumnStrategy", strategy); - - Datastore datastoreWithStrategy = - DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(configWithStrategy)); - - return datastoreWithStrategy.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); - } - - private void queryAndAssert(Key key, ResultSetConsumer consumer) throws Exception { - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM \"%s\" WHERE \"id\" = '%s'", FLAT_COLLECTION_NAME, key)); - ResultSet rs = ps.executeQuery()) { - consumer.accept(rs); - } - } - - @FunctionalInterface - interface ResultSetConsumer { - - void accept(ResultSet rs) throws Exception; - } - - @Nested - @DisplayName("CreateOrReplace Operations") - class CreateOrReplaceTests { - @Test - @DisplayName( - "Should create new document and return true. Cols not specified should be set of default NULL") - void testCreateOrReplaceNewDocument() throws Exception { - - String docId = generateDocId("test"); + @DisplayName("Should skip unknown fields in bulk upsert") + void testBulkUpsertSkipsUnknownFields() throws Exception { + Map bulkMap = new LinkedHashMap<>(); - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "upsert-new-doc-100"); - objectNode.put("item", "New Upsert Item"); - objectNode.put("price", 500); - objectNode.put("quantity", 25); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("item", "ItemWithUnknown"); + node.put("price", 100); + node.put("unknown_field", "should be skipped"); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-unknown-field"), new JSONDocument(node)); - boolean isNew = flatCollection.createOrReplace(key, document); + boolean result = flatCollection.bulkUpsert(bulkMap); - assertTrue(isNew); + assertTrue(result); queryAndAssert( - key, + new SingleValueKey(DEFAULT_TENANT, "bulk-unknown-field"), rs -> { assertTrue(rs.next()); - assertEquals("New Upsert Item", rs.getString("item")); - assertEquals(500, rs.getInt("price")); - assertEquals(25, rs.getInt("quantity")); - // assert on some fields that they're set to null correctly - assertNull(rs.getObject("sales")); - assertNull(rs.getObject("categoryTags")); - assertNull(rs.getObject("date")); + assertEquals("ItemWithUnknown", rs.getString("item")); + assertEquals(100, rs.getInt("price")); }); } @Test - @DisplayName("Should replace existing document and return false") - void testCreateOrReplaceExistingDocument() throws Exception { - String docId = generateDocId("test"); - ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); - initialNode.put("id", docId); - initialNode.put("item", "Original Item"); - initialNode.put("price", 100); - Document initialDoc = new JSONDocument(initialNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + @DisplayName("Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy") + void testBulkUpsertIgnoreDocumentStrategy() throws Exception { + Collection collectionWithIgnoreStrategy = + getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString()); - boolean firstResult = flatCollection.createOrReplace(key, initialDoc); + Map bulkMap = new LinkedHashMap<>(); - Preconditions.checkArgument( - firstResult, "Preconditions failure: Could not create first document with id: " + docId); + // Doc with unknown field - should be ignored + ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode(); + nodeWithUnknown.put("item", "ShouldBeIgnored"); + nodeWithUnknown.put("price", 100); + nodeWithUnknown.put("unknown_field", "causes ignore"); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "ignore-doc-1"), new JSONDocument(nodeWithUnknown)); - // Now replace with updated document - ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); - updatedNode.put("id", docId); - updatedNode.put("item", "Updated Item"); - updatedNode.put("quantity", 50); - Document updatedDoc = new JSONDocument(updatedNode); + // Doc without unknown field - should be upserted + ObjectNode validNode = OBJECT_MAPPER.createObjectNode(); + validNode.put("item", "ValidItem"); + validNode.put("price", 200); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "ignore-doc-2"), new JSONDocument(validNode)); - boolean secondResult = flatCollection.createOrReplace(key, updatedDoc); + boolean result = collectionWithIgnoreStrategy.bulkUpsert(bulkMap); - assertFalse(secondResult); + assertTrue(result); + // First doc should NOT exist (was ignored) queryAndAssert( - key, + new SingleValueKey(DEFAULT_TENANT, "ignore-doc-1"), rs -> { - assertTrue(rs.next()); - assertEquals("Updated Item", rs.getString("item")); - // this should be the default since price is not present in the updated document - assertNull(rs.getObject("price")); - assertEquals(50, rs.getInt("quantity")); + assertFalse(rs.next()); }); - } - - @Test - @DisplayName("Should skip unknown fields in createOrReplace (default SKIP strategy)") - void testCreateOrReplaceSkipsUnknownFields() throws Exception { - ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("id", "upsert-skip-fields-300"); - objectNode.put("item", "Item with unknown"); - objectNode.put("price", 200); - objectNode.put("unknown_field", "should be skipped"); - Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "upsert-skip-fields-300"); - boolean isNew = flatCollection.createOrReplace(key, document); - assertTrue(isNew); - - // Verify only known fields were inserted + // Second doc should exist queryAndAssert( - key, + new SingleValueKey(DEFAULT_TENANT, "ignore-doc-2"), rs -> { assertTrue(rs.next()); - assertEquals("Item with unknown", rs.getString("item")); + assertEquals("ValidItem", rs.getString("item")); assertEquals(200, rs.getInt("price")); }); } @Test - @DisplayName("Should handle JSONB fields in createOrReplace") - void testCreateOrReplaceWithJsonbField() throws Exception { - String docId = generateDocId("test"); - ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); - initialNode.put("id", docId); - initialNode.put("item", "Item with props"); - ObjectNode initialProps = OBJECT_MAPPER.createObjectNode(); - initialProps.put("color", "red"); - initialProps.put("size", "small"); - initialNode.set("props", initialProps); - Document initialDoc = new JSONDocument(initialNode); - Key key = new SingleValueKey(DEFAULT_TENANT, docId); + @DisplayName("Should return false when document has invalid JSON (IOException)") + void testBulkUpsertWithInvalidJsonDocument() { + Map bulkMap = new LinkedHashMap<>(); - boolean wasCreated = flatCollection.createOrReplace(key, initialDoc); - Preconditions.checkArgument( - wasCreated, "Precondition failure: Doc could not be created with id: " + docId); + // Create a Document that returns invalid JSON + Document invalidJsonDoc = + new Document() { + @Override + public String toJson() { + return "{ invalid json without closing brace"; + } + }; - // Update with new JSONB value - ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); - updatedNode.put("id", docId); - updatedNode.put("item", "Updated Item"); - ObjectNode updatedProps = OBJECT_MAPPER.createObjectNode(); - updatedProps.put("color", "blue"); - updatedProps.put("size", "large"); - updatedProps.put("weight", 2.5); - updatedNode.set("props", updatedProps); - Document updatedDoc = new JSONDocument(updatedNode); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "invalid-json-doc"), invalidJsonDoc); - boolean isNew = flatCollection.createOrReplace(key, updatedDoc); - assertFalse(isNew); + // Should return false due to IOException during parsing + boolean result = flatCollection.bulkUpsert(bulkMap); - // Verify JSONB was updated - queryAndAssert( - key, - rs -> { - assertTrue(rs.next()); - JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); - assertEquals("blue", propsResult.get("color").asText()); - assertEquals("large", propsResult.get("size").asText()); - assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); - }); + assertFalse(result); } - } - @Nested - @DisplayName("Bulk Operations") - class BulkOperationTests { + @Test + @DisplayName("Should return false when batch execution fails (BatchUpdateException)") + void testBulkUpsertBatchUpdateException() throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + + String addConstraintSQL = + String.format( + "ALTER TABLE \"%s\" ADD CONSTRAINT price_positive CHECK (\"price\" > 0)", + FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(addConstraintSQL)) { + ps.execute(); + LOGGER.info("Added CHECK constraint: price must be positive"); + } + + try { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("item", "NegativePriceItem"); + node.put("price", -100); // Violates CHECK constraint + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "negative-price-doc"), new JSONDocument(node)); + + // Should return false due to BatchUpdateException + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertFalse(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "negative-price-doc"), + rs -> { + assertFalse(rs.next()); + }); + + } finally { + // Clean up: remove the CHECK constraint + String dropConstraintSQL = + String.format( + "ALTER TABLE \"%s\" DROP CONSTRAINT price_positive", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(dropConstraintSQL)) { + ps.execute(); + LOGGER.info("Removed CHECK constraint"); + } + } + } + } + + @Nested + @DisplayName("Create Operations") + class CreateTests { @Test - @DisplayName("Should bulk upsert multiple new documents") - void testBulkUpsertNewDocuments() throws Exception { - Map bulkMap = new LinkedHashMap<>(); + @DisplayName("Should create document with all supported data types") + void testCreateWithAllDataTypes() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + String docId = generateDocId("test"); - ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); - node1.put("item", "BulkItem101"); - node1.put("price", 101); - node1.put("quantity", 10); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-101"), new JSONDocument(node1)); + objectNode.put("id", docId); + objectNode.put("item", "Comprehensive Test Item"); + objectNode.put("price", 999); + objectNode.put("quantity", "50"); + objectNode.put("big_number", 9223372036854775807L); + objectNode.put("rating", 4.5f); + objectNode.put("weight", 123.456789); + objectNode.put("in_stock", true); + objectNode.put("date", 1705315800000L); + objectNode.put("created_date", "2024-01-15"); + objectNode.putArray("tags").add("electronics").add("sale").add("featured"); + objectNode.put("categoryTags", "single-category"); + objectNode.putArray("numbers").add(10).add(20).add(30); + objectNode.putArray("scores").add(1.5).add(2.5).add(3.5); + objectNode.putArray("flags").add(true).add(false).add(true); - ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); - node2.put("item", "BulkItem102"); - node2.put("price", 102); - node2.put("quantity", 20); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-102"), new JSONDocument(node2)); + ObjectNode propsNode = OBJECT_MAPPER.createObjectNode(); + propsNode.put("color", "blue"); + propsNode.put("size", "large"); + propsNode.put("weight", 2.5); + propsNode.put("warranty", true); + propsNode.putObject("nested").put("key", "value"); + objectNode.set("props", propsNode); - ObjectNode node3 = OBJECT_MAPPER.createObjectNode(); - node3.put("item", "BulkItem103"); - node3.put("price", 103); - node3.put("in_stock", true); - ObjectNode props = OBJECT_MAPPER.createObjectNode(); - props.put("color", "red"); - props.put("size", "large"); - node3.set("props", props); - node3.putArray("tags").add("electronics").add("sale"); - node3.putArray("numbers").add(1).add(2).add(3); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-103"), new JSONDocument(node3)); + ObjectNode salesNode = OBJECT_MAPPER.createObjectNode(); + salesNode.put("total", 1000); + salesNode.put("region", "US"); + objectNode.set("sales", salesNode); - boolean result = flatCollection.bulkUpsert(bulkMap); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - assertTrue(result); + CreateResult result = flatCollection.create(key, document); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-101"), - rs -> { - assertTrue(rs.next()); - assertEquals("BulkItem101", rs.getString("item")); - assertEquals(101, rs.getInt("price")); - assertEquals(10, rs.getInt("quantity")); - }); + assertTrue(result.isSucceed()); + assertFalse(result.isPartial()); + assertTrue(result.getSkippedFields().isEmpty()); + // Verify all data types were inserted correctly queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-102"), + key, rs -> { assertTrue(rs.next()); - assertEquals("BulkItem102", rs.getString("item")); - assertEquals(102, rs.getInt("price")); - assertEquals(20, rs.getInt("quantity")); - }); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-103"), - rs -> { - assertTrue(rs.next()); - assertEquals("BulkItem103", rs.getString("item")); - assertEquals(103, rs.getInt("price")); + assertEquals("Comprehensive Test Item", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + assertEquals(9223372036854775807L, rs.getLong("big_number")); + assertEquals(4.5f, rs.getFloat("rating"), 0.01f); + assertEquals(123.456789, rs.getDouble("weight"), 0.0001); assertTrue(rs.getBoolean("in_stock")); + assertEquals(1705315800000L, rs.getTimestamp("date").getTime()); // epoch millis + assertNotNull(rs.getDate("created_date")); - // Verify JSONB column - JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); - assertEquals("red", propsResult.get("color").asText()); - assertEquals("large", propsResult.get("size").asText()); - - // Verify array columns String[] tags = (String[]) rs.getArray("tags").getArray(); - assertEquals(2, tags.length); + assertEquals(3, tags.length); assertEquals("electronics", tags[0]); + assertEquals("sale", tags[1]); + assertEquals("featured", tags[2]); + + // Single value auto-converted to array + String[] categoryTags = (String[]) rs.getArray("categoryTags").getArray(); + assertEquals(1, categoryTags.length); + assertEquals("single-category", categoryTags[0]); Integer[] numbers = (Integer[]) rs.getArray("numbers").getArray(); assertEquals(3, numbers.length); - assertEquals(1, numbers[0]); + assertEquals(10, numbers[0]); + assertEquals(20, numbers[1]); + assertEquals(30, numbers[2]); + + Double[] scores = (Double[]) rs.getArray("scores").getArray(); + assertEquals(3, scores.length); + assertEquals(1.5, scores[0], 0.01); + + Boolean[] flags = (Boolean[]) rs.getArray("flags").getArray(); + assertEquals(3, flags.length); + assertTrue(flags[0]); + assertFalse(flags[1]); + + String propsJson = rs.getString("props"); + assertNotNull(propsJson); + JsonNode propsResult = OBJECT_MAPPER.readTree(propsJson); + assertEquals("blue", propsResult.get("color").asText()); + assertEquals("large", propsResult.get("size").asText()); + assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); + assertTrue(propsResult.get("warranty").asBoolean()); + assertEquals("value", propsResult.get("nested").get("key").asText()); + + String salesJson = rs.getString("sales"); + assertNotNull(salesJson); + JsonNode salesResult = OBJECT_MAPPER.readTree(salesJson); + assertEquals(1000, salesResult.get("total").asInt()); + assertEquals("US", salesResult.get("region").asText()); }); } @Test - @DisplayName("Should bulk upsert updating existing documents") - void testBulkUpsertUpdatesExistingDocuments() throws Exception { - // First create some documents - String docId1 = "bulk-update-1"; - String docId2 = "bulk-update-2"; - - ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); - initial1.put("item", "Original1"); - initial1.put("price", 100); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); + @DisplayName("Should throw DuplicateDocumentException when creating with existing key") + void testCreateDuplicateDocument() throws Exception { - ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); - initial2.put("item", "Original2"); - initial2.put("price", 200); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); + String docId = generateDocId("test"); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "dup-doc-200"); + objectNode.put("item", "First Item"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - // Now bulk upsert with updates - Map bulkMap = new LinkedHashMap<>(); + CreateResult createResult = flatCollection.create(key, document); + Preconditions.checkArgument( + createResult.isSucceed(), + "Preconditions failure: Could not create doc with id: " + docId); - ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); - updated1.put("item", "Updated1"); - updated1.put("price", 999); - updated1.put("quantity", 50); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); + ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); + objectNode2.put("id", "dup-doc-200"); + objectNode2.put("item", "Second Item"); + Document document2 = new JSONDocument(objectNode2); - ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); - updated2.put("item", "Updated2"); - updated2.put("price", 888); - updated2.put("in_stock", true); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); + assertThrows(DuplicateDocumentException.class, () -> flatCollection.create(key, document2)); + } - boolean result = flatCollection.bulkUpsert(bulkMap); + @ParameterizedTest + @DisplayName( + "When MissingColumnStrategy is Throw, should throw an exception for unknown fields. Unknown fields are those fields that are not found in the schema but are present in the doc") + @ArgumentsSource(MissingColumnStrategyProvider.class) + void testUnknownFieldsAsPerMissingColumnStrategy(MissingColumnStrategy missingColumnStrategy) + throws Exception { - assertTrue(result); + String docId = generateDocId("test"); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, docId1), - rs -> { - assertTrue(rs.next()); - assertEquals("Updated1", rs.getString("item")); - assertEquals(999, rs.getInt("price")); - assertEquals(50, rs.getInt("quantity")); - }); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", docId); + objectNode.put("item", "Item"); + objectNode.put("unknown_column", "should throw"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, docId2), - rs -> { - assertTrue(rs.next()); - assertEquals("Updated2", rs.getString("item")); - assertEquals(888, rs.getInt("price")); - assertTrue(rs.getBoolean("in_stock")); - }); + if (missingColumnStrategy == MissingColumnStrategy.THROW) { + Collection collection = + getFlatCollectionWithStrategy(MissingColumnStrategy.THROW.toString()); + assertThrows(SchemaMismatchException.class, () -> collection.create(key, document)); + // Verify no document was inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + } + } else { + CreateResult result = flatCollection.create(key, document); + // for SKIP + assertTrue(result.isSucceed()); + // this is a partial write because unknown_column was not written to + assertTrue(result.isPartial()); + assertTrue(result.getSkippedFields().contains("unknown_column")); + + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Item", rs.getString("item")); + }); + } } @Test - @DisplayName("Should bulk upsert with mixed inserts and updates") - void testBulkUpsertMixedInsertAndUpdate() throws Exception { - // Create one existing document - String existingId = "bulk-mixed-existing"; - ObjectNode existing = OBJECT_MAPPER.createObjectNode(); - existing.put("item", "ExistingItem"); - existing.put("price", 100); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(existing)); + @DisplayName( + "Should use default SKIP strategy when missingColumnStrategy config is empty string") + void testEmptyMissingColumnStrategyConfigUsesDefault() throws Exception { + Collection collectionWithEmptyStrategy = getFlatCollectionWithStrategy(""); - // Bulk upsert: update existing + insert new - Map bulkMap = new LinkedHashMap<>(); + // Test that it uses default SKIP strategy (unknown fields are skipped, not thrown) + String docId = generateDocId("test"); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", docId); + objectNode.put("item", "Test Item"); + objectNode.put("unknown_field", "should be skipped with default SKIP strategy"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - ObjectNode updatedExisting = OBJECT_MAPPER.createObjectNode(); - updatedExisting.put("item", "UpdatedExisting"); - updatedExisting.put("price", 555); - bulkMap.put( - new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(updatedExisting)); + CreateResult result = collectionWithEmptyStrategy.create(key, document); - ObjectNode newDoc = OBJECT_MAPPER.createObjectNode(); - newDoc.put("item", "NewItem"); - newDoc.put("price", 777); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-mixed-new"), new JSONDocument(newDoc)); + // With default SKIP strategy, unknown fields are skipped + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertTrue(result.getSkippedFields().contains("unknown_field")); + } - boolean result = flatCollection.bulkUpsert(bulkMap); + @Test + @DisplayName("Should use default SKIP strategy when missingColumnStrategy config is invalid") + void testInvalidMissingColumnStrategyConfigUsesDefault() throws Exception { + Collection collectionWithInvalidStrategy = getFlatCollectionWithStrategy("INVALID_STRATEGY"); - assertTrue(result); + String docId = generateDocId("test"); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", docId); + objectNode.put("item", "Test Item"); + objectNode.put("unknown_field", "should be skipped with default SKIP strategy"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, existingId), - rs -> { - assertTrue(rs.next()); - assertEquals("UpdatedExisting", rs.getString("item")); - assertEquals(555, rs.getInt("price")); - }); + CreateResult result = collectionWithInvalidStrategy.create(key, document); + + // With default SKIP strategy, unknown fields are skipped + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertTrue(result.getSkippedFields().contains("unknown_field")); + } + @Test + @DisplayName("Should return failure when all fields are unknown (parsed.isEmpty)") + void testCreateFailsWhenAllFieldsAreUnknown() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("completely_unknown_field1", "value1"); + objectNode.put("completely_unknown_field2", "value2"); + objectNode.put("another_nonexistent_column", 123); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "all-unknown-doc-700"); + + CreateResult result = flatCollection.create(key, document); + + // Although no column exists in the schema, it'll create a new doc with the key as the id + assertTrue(result.isSucceed()); + assertEquals(3, result.getSkippedFields().size()); + assertTrue( + result + .getSkippedFields() + .containsAll( + List.of( + "completely_unknown_field1", + "completely_unknown_field2", + "another_nonexistent_column"))); + + // Verify no row was inserted + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT COUNT(*) FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + } + } + + @Test + @DisplayName("Should refresh schema and retry on UNDEFINED_COLUMN error") + void testCreateRefreshesSchemaOnUndefinedColumnError() throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + + // Step 1: Add a temporary column and do a create to cache the schema + String addColumnSQL = + String.format("ALTER TABLE \"%s\" ADD COLUMN \"temp_col\" TEXT", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(addColumnSQL)) { + ps.execute(); + LOGGER.info("Added temporary column 'temp_col' to table"); + } + + // Step 2: Create a document with the temp column to cache the schema + ObjectNode objectNode1 = OBJECT_MAPPER.createObjectNode(); + objectNode1.put("id", "cache-schema-doc"); + objectNode1.put("item", "Item to cache schema"); + objectNode1.put("temp_col", "temp value"); + flatCollection.create( + new SingleValueKey("default", "cache-schema-doc"), new JSONDocument(objectNode1)); + LOGGER.info("Schema cached with temp_col"); + + // Step 3: DROP the column - now the cached schema is stale + String dropColumnSQL = + String.format("ALTER TABLE \"%s\" DROP COLUMN \"temp_col\"", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(dropColumnSQL)) { + ps.execute(); + LOGGER.info("Dropped temp_col - schema cache is now stale"); + } + + // Step 4: Try to create with the dropped column + // Schema registry still thinks temp_col exists, so it will include it in INSERT + // INSERT will fail with UNDEFINED_COLUMN, triggering handlePSQLExceptionForCreate + // which will refresh schema and retry + ObjectNode objectNode2 = OBJECT_MAPPER.createObjectNode(); + objectNode2.put("id", "retry-doc-800"); + objectNode2.put("item", "Item after schema refresh"); + objectNode2.put("temp_col", "this column no longer exists"); + Document document = new JSONDocument(objectNode2); + Key key = new SingleValueKey("default", "retry-doc-800"); + + CreateResult result = flatCollection.create(key, document); + + // Should succeed - temp_col will be skipped (either via retry or schema refresh) + assertTrue(result.isSucceed()); + // The dropped column should be skipped + assertTrue(result.getSkippedFields().contains("temp_col")); + + // Verify the valid fields were inserted queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-mixed-new"), + key, rs -> { assertTrue(rs.next()); - assertEquals("NewItem", rs.getString("item")); - assertEquals(777, rs.getInt("price")); + assertEquals("Item after schema refresh", rs.getString("item")); }); } - @Test - @DisplayName("Should handle empty document map") - void testBulkUpsertEmptyMap() { - Map emptyMap = Collections.emptyMap(); - boolean result = flatCollection.bulkUpsert(emptyMap); - assertTrue(result); - } + @ParameterizedTest + @DisplayName("Should skip column with unparseable value and add to skippedFields") + @ArgumentsSource(MissingColumnStrategyProvider.class) + void testUnparsableValuesAsPerMissingColStrategy(MissingColumnStrategy missingColumnStrategy) + throws Exception { - @Test - @DisplayName("Should handle null document map") - void testBulkUpsertNullMap() { - boolean result = flatCollection.bulkUpsert(null); - assertTrue(result); + String docId = generateDocId("test"); + + // Try to insert a string value into an integer column with wrong type + // The unparseable column should be skipped, not throw an exception + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", docId); + objectNode.put("item", "Valid Item"); + objectNode.put("price", "not_a_number_at_all"); // price is INTEGER, this will fail parsing + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + if (missingColumnStrategy == MissingColumnStrategy.THROW) { + CreateResult result = + getFlatCollectionWithStrategy(MissingColumnStrategy.SKIP.toString()) + .create(key, document); + + // Should succeed with the valid columns, skipping the unparseable one + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertEquals(1, result.getSkippedFields().size()); + assertTrue(result.getSkippedFields().contains("price")); + + // Verify the valid fields were inserted + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + // price should be null since it was skipped + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + }); + } else { + // SKIP strategy: unparseable value should be skipped, document created + CreateResult result = flatCollection.create(key, document); + assertTrue(result.isSucceed()); + assertTrue(result.isPartial()); + assertEquals(1, result.getSkippedFields().size()); + assertTrue(result.getSkippedFields().contains("price")); + + // Verify the valid fields were inserted + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Valid Item", rs.getString("item")); + // price should be null since it was skipped + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + }); + } } + } - @Test - @DisplayName("Should bulk upsert documents with different column sets") - void testBulkUpsertDocumentsWithDifferentColumns() throws Exception { - Map bulkMap = new LinkedHashMap<>(); + @Nested + @DisplayName("CreateOrReplace Operations") + class CreateOrReplaceTests { - ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); - node1.put("item", "ItemWithPrice"); - node1.put("price", 100); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-1"), new JSONDocument(node1)); + @Test + @DisplayName( + "Should create new document and return true. Cols not specified should be set of default NULL") + void testCreateOrReplaceNewDocument() throws Exception { - ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); - node2.put("item", "ItemWithQuantity"); - node2.put("quantity", 50); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-2"), new JSONDocument(node2)); + String docId = generateDocId("test"); - ObjectNode node3 = OBJECT_MAPPER.createObjectNode(); - node3.put("item", "ItemWithAll"); - node3.put("price", 200); - node3.put("quantity", 30); - node3.put("in_stock", true); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-3"), new JSONDocument(node3)); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "upsert-new-doc-100"); + objectNode.put("item", "New Upsert Item"); + objectNode.put("price", 500); + objectNode.put("quantity", 25); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - boolean result = flatCollection.bulkUpsert(bulkMap); + boolean isNew = flatCollection.createOrReplace(key, document); - assertTrue(result); + assertTrue(isNew); queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "diff-cols-1"), + key, rs -> { assertTrue(rs.next()); - assertEquals("ItemWithPrice", rs.getString("item")); - assertEquals(100, rs.getInt("price")); - assertEquals(0, rs.getInt("quantity")); - assertTrue(rs.wasNull()); + assertEquals("New Upsert Item", rs.getString("item")); + assertEquals(500, rs.getInt("price")); + assertEquals(25, rs.getInt("quantity")); + // assert on some fields that they're set to null correctly + assertNull(rs.getObject("sales")); + assertNull(rs.getObject("categoryTags")); + assertNull(rs.getObject("date")); }); + } + + @Test + @DisplayName("Should replace existing document and return false") + void testCreateOrReplaceExistingDocument() throws Exception { + String docId = generateDocId("test"); + ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); + initialNode.put("id", docId); + initialNode.put("item", "Original Item"); + initialNode.put("price", 100); + Document initialDoc = new JSONDocument(initialNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + boolean firstResult = flatCollection.createOrReplace(key, initialDoc); + + Preconditions.checkArgument( + firstResult, "Preconditions failure: Could not create first document with id: " + docId); + + // Now replace with updated document + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("id", docId); + updatedNode.put("item", "Updated Item"); + updatedNode.put("quantity", 50); + Document updatedDoc = new JSONDocument(updatedNode); + + boolean secondResult = flatCollection.createOrReplace(key, updatedDoc); + + assertFalse(secondResult); queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "diff-cols-2"), + key, rs -> { assertTrue(rs.next()); - assertEquals("ItemWithQuantity", rs.getString("item")); - assertEquals(0, rs.getInt("price")); - assertTrue(rs.wasNull()); + assertEquals("Updated Item", rs.getString("item")); + // this should be the default since price is not present in the updated document + assertNull(rs.getObject("price")); assertEquals(50, rs.getInt("quantity")); }); + } + + @Test + @DisplayName("Should skip unknown fields in createOrReplace (default SKIP strategy)") + void testCreateOrReplaceSkipsUnknownFields() throws Exception { + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "upsert-skip-fields-300"); + objectNode.put("item", "Item with unknown"); + objectNode.put("price", 200); + objectNode.put("unknown_field", "should be skipped"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey("default", "upsert-skip-fields-300"); + + boolean isNew = flatCollection.createOrReplace(key, document); + assertTrue(isNew); + // Verify only known fields were inserted queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "diff-cols-3"), + key, rs -> { assertTrue(rs.next()); - assertEquals("ItemWithAll", rs.getString("item")); + assertEquals("Item with unknown", rs.getString("item")); assertEquals(200, rs.getInt("price")); - assertEquals(30, rs.getInt("quantity")); - assertTrue(rs.getBoolean("in_stock")); }); } @Test - @DisplayName("Should skip unknown fields in bulk upsert") - void testBulkUpsertSkipsUnknownFields() throws Exception { - Map bulkMap = new LinkedHashMap<>(); + @DisplayName("Should handle JSONB fields in createOrReplace") + void testCreateOrReplaceWithJsonbField() throws Exception { + String docId = generateDocId("test"); + ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); + initialNode.put("id", docId); + initialNode.put("item", "Item with props"); + ObjectNode initialProps = OBJECT_MAPPER.createObjectNode(); + initialProps.put("color", "red"); + initialProps.put("size", "small"); + initialNode.set("props", initialProps); + Document initialDoc = new JSONDocument(initialNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); - ObjectNode node = OBJECT_MAPPER.createObjectNode(); - node.put("item", "ItemWithUnknown"); - node.put("price", 100); - node.put("unknown_field", "should be skipped"); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-unknown-field"), new JSONDocument(node)); + boolean wasCreated = flatCollection.createOrReplace(key, initialDoc); + Preconditions.checkArgument( + wasCreated, "Precondition failure: Doc could not be created with id: " + docId); - boolean result = flatCollection.bulkUpsert(bulkMap); + // Update with new JSONB value + ObjectNode updatedNode = OBJECT_MAPPER.createObjectNode(); + updatedNode.put("id", docId); + updatedNode.put("item", "Updated Item"); + ObjectNode updatedProps = OBJECT_MAPPER.createObjectNode(); + updatedProps.put("color", "blue"); + updatedProps.put("size", "large"); + updatedProps.put("weight", 2.5); + updatedNode.set("props", updatedProps); + Document updatedDoc = new JSONDocument(updatedNode); - assertTrue(result); + boolean isNew = flatCollection.createOrReplace(key, updatedDoc); + assertFalse(isNew); + // Verify JSONB was updated queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-unknown-field"), + key, rs -> { assertTrue(rs.next()); - assertEquals("ItemWithUnknown", rs.getString("item")); - assertEquals(100, rs.getInt("price")); + JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); + assertEquals("blue", propsResult.get("color").asText()); + assertEquals("large", propsResult.get("size").asText()); + assertEquals(2.5, propsResult.get("weight").asDouble(), 0.01); }); } + } + + @Nested + @DisplayName("BulkCreateOrReplace Operations") + class BulkCreateOrReplaceTests { @Test - @DisplayName("Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy") - void testBulkUpsertIgnoreDocumentStrategy() throws Exception { - Collection collectionWithIgnoreStrategy = - getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString()); + @DisplayName("Should bulk create new documents and replace existing ones") + void testBulkCreateOrReplaceMixedInsertAndUpdate() throws Exception { + // Create an existing document first + String existingId = "bulk-cor-existing"; + ObjectNode existingNode = OBJECT_MAPPER.createObjectNode(); + existingNode.put("item", "OriginalItem"); + existingNode.put("price", 100); + existingNode.put("quantity", 10); + ObjectNode existingProps = OBJECT_MAPPER.createObjectNode(); + existingProps.put("color", "red"); + existingProps.put("size", "small"); + existingNode.set("props", existingProps); + existingNode.putArray("tags").add("original").add("test"); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(existingNode)); Map bulkMap = new LinkedHashMap<>(); - // Doc with unknown field - should be ignored - ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode(); - nodeWithUnknown.put("item", "ShouldBeIgnored"); - nodeWithUnknown.put("price", 100); - nodeWithUnknown.put("unknown_field", "causes ignore"); + // Updated existing document - should replace entirely + ObjectNode updatedExisting = OBJECT_MAPPER.createObjectNode(); + updatedExisting.put("item", "ReplacedItem"); + updatedExisting.put("price", 999); + ObjectNode updatedProps = OBJECT_MAPPER.createObjectNode(); + updatedProps.put("color", "blue"); + updatedExisting.set("props", updatedProps); + updatedExisting.putArray("tags").add("replaced").add("updated").add("bulk"); + updatedExisting.putArray("numbers").add(100).add(200); + // Note: quantity is NOT included - should become NULL after replace bulkMap.put( - new SingleValueKey(DEFAULT_TENANT, "ignore-doc-1"), new JSONDocument(nodeWithUnknown)); + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(updatedExisting)); - // Doc without unknown field - should be upserted - ObjectNode validNode = OBJECT_MAPPER.createObjectNode(); - validNode.put("item", "ValidItem"); - validNode.put("price", 200); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "ignore-doc-2"), new JSONDocument(validNode)); + // New document with JSONB and array columns + ObjectNode newDoc = OBJECT_MAPPER.createObjectNode(); + newDoc.put("item", "BrandNewItem"); + newDoc.put("price", 500); + newDoc.put("in_stock", true); + ObjectNode newProps = OBJECT_MAPPER.createObjectNode(); + newProps.put("material", "steel"); + newProps.put("warranty", 12); + newDoc.set("props", newProps); + newDoc.putArray("tags").add("new").add("premium"); + newDoc.putArray("numbers").add(10).add(20).add(30); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-cor-new"), new JSONDocument(newDoc)); - boolean result = collectionWithIgnoreStrategy.bulkUpsert(bulkMap); + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); assertTrue(result); - // First doc should NOT exist (was ignored) + // Verify existing document was REPLACED (not merged) queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "ignore-doc-1"), + new SingleValueKey(DEFAULT_TENANT, existingId), rs -> { - assertFalse(rs.next()); + assertTrue(rs.next()); + assertEquals("ReplacedItem", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + // quantity should be NULL since it wasn't in the replacement doc + assertNull(rs.getObject("quantity")); + + // Verify JSONB was replaced (only color set, size from original is gone) + JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); + assertEquals("blue", propsResult.get("color").asText()); + assertNull(propsResult.get("size")); // size was in original but not replacement + + // Verify array was replaced + String[] tags = (String[]) rs.getArray("tags").getArray(); + assertEquals(3, tags.length); + assertEquals("replaced", tags[0]); }); - // Second doc should exist + // Verify new document was created queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "ignore-doc-2"), + new SingleValueKey(DEFAULT_TENANT, "bulk-cor-new"), rs -> { assertTrue(rs.next()); - assertEquals("ValidItem", rs.getString("item")); - assertEquals(200, rs.getInt("price")); - }); - } - - @Test - @DisplayName("Should return false when document has invalid JSON (IOException)") - void testBulkUpsertWithInvalidJsonDocument() { - Map bulkMap = new LinkedHashMap<>(); - - // Create a Document that returns invalid JSON - Document invalidJsonDoc = - new Document() { - @Override - public String toJson() { - return "{ invalid json without closing brace"; - } - }; + assertEquals("BrandNewItem", rs.getString("item")); + assertEquals(500, rs.getInt("price")); + assertTrue(rs.getBoolean("in_stock")); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "invalid-json-doc"), invalidJsonDoc); + // Verify JSONB column + JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); + assertEquals("steel", propsResult.get("material").asText()); + assertEquals(12, propsResult.get("warranty").asInt()); - // Should return false due to IOException during parsing - boolean result = flatCollection.bulkUpsert(bulkMap); + // Verify array columns + String[] tags = (String[]) rs.getArray("tags").getArray(); + assertEquals(2, tags.length); + assertEquals("new", tags[0]); - assertFalse(result); + Integer[] numbers = (Integer[]) rs.getArray("numbers").getArray(); + assertEquals(3, numbers.length); + assertEquals(10, numbers[0]); + }); } + } - @Test - @DisplayName("Should return false when batch execution fails (BatchUpdateException)") - void testBulkUpsertBatchUpdateException() throws Exception { - PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; - - String addConstraintSQL = - String.format( - "ALTER TABLE \"%s\" ADD CONSTRAINT price_positive CHECK (\"price\" > 0)", - FLAT_COLLECTION_NAME); - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = conn.prepareStatement(addConstraintSQL)) { - ps.execute(); - LOGGER.info("Added CHECK constraint: price must be positive"); - } - - try { - Map bulkMap = new LinkedHashMap<>(); - - ObjectNode node = OBJECT_MAPPER.createObjectNode(); - node.put("item", "NegativePriceItem"); - node.put("price", -100); // Violates CHECK constraint - bulkMap.put( - new SingleValueKey(DEFAULT_TENANT, "negative-price-doc"), new JSONDocument(node)); - - // Should return false due to BatchUpdateException - boolean result = flatCollection.bulkUpsert(bulkMap); - - assertFalse(result); - - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "negative-price-doc"), - rs -> { - assertFalse(rs.next()); - }); - - } finally { - // Clean up: remove the CHECK constraint - String dropConstraintSQL = - String.format( - "ALTER TABLE \"%s\" DROP CONSTRAINT price_positive", FLAT_COLLECTION_NAME); - try (Connection conn = pgDatastore.getPostgresClient(); - PreparedStatement ps = conn.prepareStatement(dropConstraintSQL)) { - ps.execute(); - LOGGER.info("Removed CHECK constraint"); - } - } - } + @Nested + @DisplayName("Bulk Operations") + class BulkUpsertAndReturnOlder { @Test @DisplayName("Should return empty iterator for null document map") @@ -1466,243 +1440,359 @@ void testBulkUpsertAndReturnOlderDocumentsUpsertFailure() throws Exception { } } - @Test - @DisplayName("Should bulk createOrReplace multiple new documents") - void testBulkCreateOrReplaceNewDocuments() throws Exception { - Map bulkMap = new LinkedHashMap<>(); - - ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); - node1.put("item", "BulkReplaceItem1"); - node1.put("price", 201); - node1.put("quantity", 10); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), new JSONDocument(node1)); - - ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); - node2.put("item", "BulkReplaceItem2"); - node2.put("price", 202); - node2.put("quantity", 20); - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), new JSONDocument(node2)); - - boolean result = flatCollection.bulkCreateOrReplace(bulkMap); - - assertTrue(result); + @Nested + class BulkCreateOrReplaceTests { + @Test + @DisplayName("Should bulk createOrReplace multiple new documents") + void testBulkCreateOrReplaceNewDocuments() throws Exception { + Map bulkMap = new LinkedHashMap<>(); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), - rs -> { - assertTrue(rs.next()); - assertEquals("BulkReplaceItem1", rs.getString("item")); - assertEquals(201, rs.getInt("price")); - assertEquals(10, rs.getInt("quantity")); - }); + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "BulkReplaceItem1"); + node1.put("price", 201); + node1.put("quantity", 10); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), new JSONDocument(node1)); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), - rs -> { - assertTrue(rs.next()); - assertEquals("BulkReplaceItem2", rs.getString("item")); - assertEquals(202, rs.getInt("price")); - assertEquals(20, rs.getInt("quantity")); - }); - } + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "BulkReplaceItem2"); + node2.put("price", 202); + node2.put("quantity", 20); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), new JSONDocument(node2)); - @Test - @DisplayName( - "Should bulk createOrReplace replacing existing documents and reset missing cols to default") - void testBulkCreateOrReplaceResetsUnspecifiedColumnsToDefault() throws Exception { - // First create documents with multiple fields - String docId1 = "bulk-replace-reset-1"; - String docId2 = "bulk-replace-reset-2"; + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); - ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); - initial1.put("item", "Original1"); - initial1.put("price", 100); - initial1.put("quantity", 50); - initial1.put("in_stock", true); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); + assertTrue(result); - ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); - initial2.put("item", "Original2"); - initial2.put("price", 200); - initial2.put("quantity", 75); - initial2.put("in_stock", false); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkReplaceItem1", rs.getString("item")); + assertEquals(201, rs.getInt("price")); + assertEquals(10, rs.getInt("quantity")); + }); - // Now bulk createOrReplace with only some fields - others should be RESET to default - Map bulkMap = new LinkedHashMap<>(); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkReplaceItem2", rs.getString("item")); + assertEquals(202, rs.getInt("price")); + assertEquals(20, rs.getInt("quantity")); + }); + } - ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); - updated1.put("item", "Updated1"); - // price, quantity, in_stock are NOT specified - should be reset to NULL - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); + @Test + @DisplayName( + "Should bulk createOrReplace replacing existing documents and reset missing cols to default") + void testBulkCreateOrReplaceResetsUnspecifiedColumnsToDefault() throws Exception { + // First create documents with multiple fields + String docId1 = "bulk-replace-reset-1"; + String docId2 = "bulk-replace-reset-2"; + + ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); + initial1.put("item", "Original1"); + initial1.put("price", 100); + initial1.put("quantity", 50); + initial1.put("in_stock", true); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); + + ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); + initial2.put("item", "Original2"); + initial2.put("price", 200); + initial2.put("quantity", 75); + initial2.put("in_stock", false); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); + + // Now bulk createOrReplace with only some fields - others should be RESET to default + Map bulkMap = new LinkedHashMap<>(); - ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); - updated2.put("item", "Updated2"); - updated2.put("price", 999); - // quantity, in_stock are NOT specified - should be reset to NULL - bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); + ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); + updated1.put("item", "Updated1"); + // price, quantity, in_stock are NOT specified - should be reset to NULL + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); - boolean result = flatCollection.bulkCreateOrReplace(bulkMap); + ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); + updated2.put("item", "Updated2"); + updated2.put("price", 999); + // quantity, in_stock are NOT specified - should be reset to NULL + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); - assertTrue(result); + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); - // Verify doc1: item updated, other fields reset to NULL - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, docId1), - rs -> { - assertTrue(rs.next()); - assertEquals("Updated1", rs.getString("item")); - assertNull(rs.getObject("price")); // RESET to NULL - assertNull(rs.getObject("quantity")); // RESET to NULL - assertNull(rs.getObject("in_stock")); // RESET to NULL - }); + assertTrue(result); - // Verify doc2: item and price updated, other fields reset to NULL - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, docId2), - rs -> { - assertTrue(rs.next()); - assertEquals("Updated2", rs.getString("item")); - assertEquals(999, rs.getInt("price")); - assertNull(rs.getObject("quantity")); // RESET to NULL - assertNull(rs.getObject("in_stock")); // RESET to NULL - }); - } + // Verify doc1: item updated, other fields reset to NULL + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated1", rs.getString("item")); + assertNull(rs.getObject("price")); // RESET to NULL + assertNull(rs.getObject("quantity")); // RESET to NULL + assertNull(rs.getObject("in_stock")); // RESET to NULL + }); - @Test - @DisplayName("bulkUpsert vs bulkCreateOrReplace: upsert preserves, createOrReplace resets") - void testBulkUpsertVsBulkCreateOrReplaceBehavior() throws Exception { - // Setup: Create two identical documents - String docId1 = "bulk-compare-upsert"; - String docId2 = "bulk-compare-replace"; + // Verify doc2: item and price updated, other fields reset to NULL + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated2", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertNull(rs.getObject("quantity")); // RESET to NULL + assertNull(rs.getObject("in_stock")); // RESET to NULL + }); + } - ObjectNode initial = OBJECT_MAPPER.createObjectNode(); - initial.put("item", "Original Item"); - initial.put("price", 100); - initial.put("quantity", 50); + @Test + @DisplayName("bulkUpsert vs bulkCreateOrReplace: upsert preserves, createOrReplace resets") + void testBulkUpsertVsBulkCreateOrReplaceBehavior() throws Exception { + // Setup: Create two identical documents + String docId1 = "bulk-compare-upsert"; + String docId2 = "bulk-compare-replace"; + + ObjectNode initial = OBJECT_MAPPER.createObjectNode(); + initial.put("item", "Original Item"); + initial.put("price", 100); + initial.put("quantity", 50); + + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial.deepCopy())); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial.deepCopy())); + + // Now update both with partial documents (only item field) + ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode(); + partialUpdate.put("item", "Updated Item"); + + // Use bulkUpsert for doc1 - should PRESERVE price and quantity + Map upsertMap = new LinkedHashMap<>(); + upsertMap.put( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(partialUpdate.deepCopy())); + flatCollection.bulkUpsert(upsertMap); + + // Use bulkCreateOrReplace for doc2 - should RESET price and quantity to NULL + Map replaceMap = new LinkedHashMap<>(); + replaceMap.put( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(partialUpdate.deepCopy())); + flatCollection.bulkCreateOrReplace(replaceMap); + + // Verify bulkUpsert preserved original values + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertEquals(100, rs.getInt("price")); // PRESERVED + assertEquals(50, rs.getInt("quantity")); // PRESERVED + }); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial.deepCopy())); - flatCollection.createOrReplace( - new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial.deepCopy())); + // Verify bulkCreateOrReplace reset to defaults + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertNull(rs.getObject("price")); // RESET to NULL + assertNull(rs.getObject("quantity")); // RESET to NULL + }); + } - // Now update both with partial documents (only item field) - ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode(); - partialUpdate.put("item", "Updated Item"); + @Test + @DisplayName("Should handle empty document map for bulkCreateOrReplace") + void testBulkCreateOrReplaceEmptyMap() { + Map emptyMap = Collections.emptyMap(); + boolean result = flatCollection.bulkCreateOrReplace(emptyMap); + assertTrue(result); + } - // Use bulkUpsert for doc1 - should PRESERVE price and quantity - Map upsertMap = new LinkedHashMap<>(); - upsertMap.put( - new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(partialUpdate.deepCopy())); - flatCollection.bulkUpsert(upsertMap); + @Test + @DisplayName("Should handle null document map for bulkCreateOrReplace") + void testBulkCreateOrReplaceNullMap() { + boolean result = flatCollection.bulkCreateOrReplace(null); + assertTrue(result); + } - // Use bulkCreateOrReplace for doc2 - should RESET price and quantity to NULL - Map replaceMap = new LinkedHashMap<>(); - replaceMap.put( - new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(partialUpdate.deepCopy())); - flatCollection.bulkCreateOrReplace(replaceMap); + @Test + @DisplayName("Should skip unknown fields in bulkCreateOrReplace") + void testBulkCreateOrReplaceSkipsUnknownFields() throws Exception { + Map bulkMap = new LinkedHashMap<>(); - // Verify bulkUpsert preserved original values - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, docId1), - rs -> { - assertTrue(rs.next()); - assertEquals("Updated Item", rs.getString("item")); - assertEquals(100, rs.getInt("price")); // PRESERVED - assertEquals(50, rs.getInt("quantity")); // PRESERVED - }); + ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("item", "ItemWithUnknown"); + node.put("price", 300); + node.put("unknown_field", "should be skipped"); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), new JSONDocument(node)); - // Verify bulkCreateOrReplace reset to defaults - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, docId2), - rs -> { - assertTrue(rs.next()); - assertEquals("Updated Item", rs.getString("item")); - assertNull(rs.getObject("price")); // RESET to NULL - assertNull(rs.getObject("quantity")); // RESET to NULL - }); - } + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); - @Test - @DisplayName("Should handle empty document map for bulkCreateOrReplace") - void testBulkCreateOrReplaceEmptyMap() { - Map emptyMap = Collections.emptyMap(); - boolean result = flatCollection.bulkCreateOrReplace(emptyMap); - assertTrue(result); - } + assertTrue(result); - @Test - @DisplayName("Should handle null document map for bulkCreateOrReplace") - void testBulkCreateOrReplaceNullMap() { - boolean result = flatCollection.bulkCreateOrReplace(null); - assertTrue(result); - } + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithUnknown", rs.getString("item")); + assertEquals(300, rs.getInt("price")); + }); + } - @Test - @DisplayName("Should skip unknown fields in bulkCreateOrReplace") - void testBulkCreateOrReplaceSkipsUnknownFields() throws Exception { - Map bulkMap = new LinkedHashMap<>(); + @Test + @DisplayName( + "Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy for bulkCreateOrReplace") + void testBulkCreateOrReplaceIgnoreDocumentStrategy() throws Exception { + Collection collectionWithIgnoreStrategy = + getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString()); - ObjectNode node = OBJECT_MAPPER.createObjectNode(); - node.put("item", "ItemWithUnknown"); - node.put("price", 300); - node.put("unknown_field", "should be skipped"); - bulkMap.put( - new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), new JSONDocument(node)); + Map bulkMap = new LinkedHashMap<>(); - boolean result = flatCollection.bulkCreateOrReplace(bulkMap); + // Document with unknown field - should be ignored + ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode(); + nodeWithUnknown.put("item", "ItemWithUnknown"); + nodeWithUnknown.put("unknown_field", "should cause document to be ignored"); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), + new JSONDocument(nodeWithUnknown)); - assertTrue(result); + // Valid document - should be inserted + ObjectNode validNode = OBJECT_MAPPER.createObjectNode(); + validNode.put("item", "ValidItem"); + validNode.put("price", 200); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), new JSONDocument(validNode)); - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), - rs -> { - assertTrue(rs.next()); - assertEquals("ItemWithUnknown", rs.getString("item")); - assertEquals(300, rs.getInt("price")); - }); - } + boolean result = collectionWithIgnoreStrategy.bulkCreateOrReplace(bulkMap); - @Test - @DisplayName( - "Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy for bulkCreateOrReplace") - void testBulkCreateOrReplaceIgnoreDocumentStrategy() throws Exception { - Collection collectionWithIgnoreStrategy = - getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString()); + assertTrue(result); - Map bulkMap = new LinkedHashMap<>(); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), rs -> assertFalse(rs.next())); - // Document with unknown field - should be ignored - ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode(); - nodeWithUnknown.put("item", "ItemWithUnknown"); - nodeWithUnknown.put("unknown_field", "should cause document to be ignored"); - bulkMap.put( - new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), - new JSONDocument(nodeWithUnknown)); + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("ValidItem", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + }); + } - // Valid document - should be inserted - ObjectNode validNode = OBJECT_MAPPER.createObjectNode(); - validNode.put("item", "ValidItem"); - validNode.put("price", 200); - bulkMap.put( - new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), new JSONDocument(validNode)); + @Test + @DisplayName("Should bulk createOrReplace and return older documents with JSONB and arrays") + void testBulkCreateOrReplaceReturnOlderDocuments() throws Exception { + String docId1 = "bulk-replace-return-1"; + String docId2 = "bulk-replace-return-2"; + + String initial1Json = + readFileFromResource("create/bulk_replace_initial_doc1.json").orElseThrow(); + String initial2Json = + readFileFromResource("create/bulk_replace_initial_doc2.json").orElseThrow(); + String updated1Json = + readFileFromResource("create/bulk_replace_updated_doc1.json").orElseThrow(); + String updated2Json = + readFileFromResource("create/bulk_replace_updated_doc2.json").orElseThrow(); + + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1Json)); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2Json)); - boolean result = collectionWithIgnoreStrategy.bulkCreateOrReplace(bulkMap); + Map bulkMap = new LinkedHashMap<>(); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1Json)); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2Json)); + + // Get older documents before replacement + CloseableIterator olderDocs = + flatCollection.bulkCreateOrReplaceReturnOlderDocuments(bulkMap); + + // Collect older documents + Map olderDocsMap = new HashMap<>(); + while (olderDocs.hasNext()) { + Document doc = olderDocs.next(); + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + olderDocsMap.put(json.get("id").asText(), json); + } + olderDocs.close(); + + // Verify we got the older documents with original values including JSONB and arrays + String key1 = new SingleValueKey(DEFAULT_TENANT, docId1).toString(); + String key2 = new SingleValueKey(DEFAULT_TENANT, docId2).toString(); + assertEquals(2, olderDocsMap.size()); + assertTrue(olderDocsMap.containsKey(key1)); + assertTrue(olderDocsMap.containsKey(key2)); + + // Verify doc1 original values + JsonNode expectedDoc1 = OBJECT_MAPPER.readTree(initial1Json); + JsonNode oldDoc1 = olderDocsMap.get(key1); + assertEquals(expectedDoc1, oldDoc1); + + // Verify doc2 original values + JsonNode expectedDoc2 = OBJECT_MAPPER.readTree(initial2Json); + JsonNode oldDoc2 = olderDocsMap.get(key2); + assertEquals(expectedDoc2, oldDoc2); + + // Verify the documents were actually replaced with new values + String expectedResult1Json = + readFileFromResource("expected/bulk_replace_result_doc1.json").orElseThrow(); + String expectedResult2Json = + readFileFromResource("expected/bulk_replace_result_doc2.json").orElseThrow(); + + Query query1 = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of( + new SingleValueKey(DEFAULT_TENANT, docId1).toString()))) + .build(); + try (CloseableIterator iter = flatCollection.find(query1)) { + assertTrue(iter.hasNext()); + JsonNode actualDoc1 = OBJECT_MAPPER.readTree(iter.next().toJson()); + JsonNode expectedResultDoc1 = OBJECT_MAPPER.readTree(expectedResult1Json); + assertEquals(expectedResultDoc1, actualDoc1); + } - assertTrue(result); + Query query2 = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of( + new SingleValueKey(DEFAULT_TENANT, docId2).toString()))) + .build(); + try (CloseableIterator iter = flatCollection.find(query2)) { + assertTrue(iter.hasNext()); + JsonNode actualDoc2 = OBJECT_MAPPER.readTree(iter.next().toJson()); + JsonNode expectedResultDoc2 = OBJECT_MAPPER.readTree(expectedResult2Json); + assertEquals(expectedResultDoc2, actualDoc2); + } + } - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), rs -> assertFalse(rs.next())); + @Test + @DisplayName( + "Should return empty iterator for empty map in bulkCreateOrReplaceReturnOlderDocuments") + void testBulkCreateOrReplaceReturnOlderDocumentsEmptyMap() throws Exception { + CloseableIterator result = + flatCollection.bulkCreateOrReplaceReturnOlderDocuments(Collections.emptyMap()); + assertFalse(result.hasNext()); + result.close(); + } - queryAndAssert( - new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), - rs -> { - assertTrue(rs.next()); - assertEquals("ValidItem", rs.getString("item")); - assertEquals(200, rs.getInt("price")); - }); + @Test + @DisplayName( + "Should return empty iterator for null map in bulkCreateOrReplaceReturnOlderDocuments") + void testBulkCreateOrReplaceReturnOlderDocumentsNullMap() throws Exception { + CloseableIterator result = + flatCollection.bulkCreateOrReplaceReturnOlderDocuments(null); + assertFalse(result.hasNext()); + result.close(); + } } } @@ -3313,14 +3403,67 @@ void testCreateOrReplaceRefreshesSchemaOnDroppedColumn() throws Exception { } } - @Nested - @DisplayName("Drop Operations") - class DropTests { + private static void executeInsertStatements() { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try { + String jsonContent = readFileFromResource(INSERT_STATEMENTS_FILE).orElseThrow(); + JsonNode rootNode = OBJECT_MAPPER.readTree(jsonContent); + JsonNode statementsNode = rootNode.get("statements"); - @Test - @DisplayName("Should throw UnsupportedOperationException for drop") - void testDrop() { - assertThrows(UnsupportedOperationException.class, () -> flatCollection.drop()); + if (statementsNode == null || !statementsNode.isArray()) { + throw new RuntimeException("Invalid JSON format: 'statements' array not found"); + } + + try (Connection connection = pgDatastore.getPostgresClient()) { + for (JsonNode statementNode : statementsNode) { + String statement = statementNode.asText().trim(); + if (!statement.isEmpty()) { + try (PreparedStatement preparedStatement = connection.prepareStatement(statement)) { + preparedStatement.executeUpdate(); + } catch (Exception e) { + LOGGER.error("Failed to execute INSERT statement: {}", e.getMessage(), e); + throw e; + } + } + } + } + LOGGER.info("Inserted initial data into: {}", FLAT_COLLECTION_NAME); + } catch (Exception e) { + LOGGER.error("Failed to execute INSERT statements: {}", e.getMessage(), e); + } + } + + private static Collection getFlatCollectionWithStrategy(String strategy) { + String postgresConnectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgresContainer.getMappedPort(5432)); + + Map configWithStrategy = new HashMap<>(); + configWithStrategy.put("url", postgresConnectionUrl); + configWithStrategy.put("user", "postgres"); + configWithStrategy.put("password", "postgres"); + configWithStrategy.put("customParams.missingColumnStrategy", strategy); + + Datastore datastoreWithStrategy = + DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(configWithStrategy)); + + return datastoreWithStrategy.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); + } + + private void queryAndAssert(Key key, ResultSetConsumer consumer) throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM \"%s\" WHERE \"id\" = '%s'", FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + consumer.accept(rs); } } + + @FunctionalInterface + interface ResultSetConsumer { + + void accept(ResultSet rs) throws Exception; + } } diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/MongoPostgresWriteConsistencyTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/MongoPostgresWriteConsistencyTest.java index cf8c71391..6345d6314 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/MongoPostgresWriteConsistencyTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/MongoPostgresWriteConsistencyTest.java @@ -12,6 +12,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -95,6 +96,181 @@ private void insertTestDocument(String docId) throws IOException { } } + @Nested + class BulkUpsertConsistencyTest { + @ParameterizedTest(name = "{0}: bulkUpsert multiple documents") + @ArgumentsSource(AllStoresProvider.class) + void testBulkUpsert(String storeName) throws Exception { + String docId1 = generateDocId("bulk-1"); + String docId2 = generateDocId("bulk-2"); + + Collection collection = getCollection(storeName); + + Map documents = new HashMap<>(); + documents.put(createKey(docId1), createTestDocument(docId1)); + documents.put(createKey(docId2), createTestDocument(docId2)); + + boolean result = collection.bulkUpsert(documents); + assertTrue(result); + + for (String docId : List.of(docId1, docId2)) { + Query query = buildQueryById(docId); + try (CloseableIterator iterator = collection.find(query)) { + assertTrue(iterator.hasNext()); + Document doc = iterator.next(); + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + + assertEquals("TestItem", json.get("item").asText()); + assertEquals(100, json.get("price").asInt()); + assertEquals(50, json.get("quantity").asInt()); + assertTrue(json.get("in_stock").asBoolean()); + + JsonNode tagsNode = json.get("tags"); + assertNotNull(tagsNode); + assertEquals(2, tagsNode.size()); + } + } + } + + @ParameterizedTest(name = "{0}: bulkUpsert merges fields (does not replace entire document)") + @ArgumentsSource(AllStoresProvider.class) + void testBulkUpsertMergesFields(String storeName) throws Exception { + String docId1 = generateDocId("bulk-merge-1"); + String docId2 = generateDocId("bulk-merge-2"); + + Collection collection = getCollection(storeName); + + // Step 1: Insert initial documents with all fields + Map initialDocs = new HashMap<>(); + initialDocs.put(createKey(docId1), createTestDocument(docId1)); + initialDocs.put(createKey(docId2), createTestDocument(docId2)); + + boolean insertResult = collection.bulkUpsert(initialDocs); + assertTrue(insertResult); + + // Step 2: Upsert with partial documents (only some fields) + Map partialDocs = new HashMap<>(); + + // Partial doc for docId1 - only update item and price + ObjectNode partial1 = OBJECT_MAPPER.createObjectNode(); + partial1.put("id", getKeyString(docId1)); + partial1.put("item", "UpdatedItem1"); + partial1.put("price", 999); + partialDocs.put(createKey(docId1), new JSONDocument(partial1)); + + // Partial doc for docId2 - only update quantity and in_stock + ObjectNode partial2 = OBJECT_MAPPER.createObjectNode(); + partial2.put("id", getKeyString(docId2)); + partial2.put("quantity", 999); + partial2.put("in_stock", false); + partialDocs.put(createKey(docId2), new JSONDocument(partial2)); + + boolean upsertResult = collection.bulkUpsert(partialDocs); + assertTrue(upsertResult); + + // Step 3: Verify that fields were merged, not replaced + // Doc1: item and price should be updated, other fields should be preserved + Query query1 = buildQueryById(docId1); + try (CloseableIterator iter = collection.find(query1)) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + + // Updated fields + assertEquals("UpdatedItem1", json.get("item").asText()); + assertEquals(999, json.get("price").asInt()); + + // Preserved fields (should still have original values) + assertEquals(50, json.get("quantity").asInt()); + assertTrue(json.get("in_stock").asBoolean()); + assertEquals(1000000000000L, json.get("big_number").asLong()); + + // Arrays and JSONB should be preserved + JsonNode tagsNode = json.get("tags"); + assertNotNull(tagsNode); + assertEquals(2, tagsNode.size()); + + JsonNode propsNode = json.get("props"); + assertNotNull(propsNode); + assertEquals("TestBrand", propsNode.get("brand").asText()); + } + + // Doc2: quantity and in_stock should be updated, other fields should be preserved + Query query2 = buildQueryById(docId2); + try (CloseableIterator iter = collection.find(query2)) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + + // Updated fields + assertEquals(999, json.get("quantity").asInt()); + assertFalse(json.get("in_stock").asBoolean()); + + // Preserved fields (should still have original values) + assertEquals("TestItem", json.get("item").asText()); + assertEquals(100, json.get("price").asInt()); + assertEquals(1000000000000L, json.get("big_number").asLong()); + + // Arrays and JSONB should be preserved + JsonNode tagsNode = json.get("tags"); + assertNotNull(tagsNode); + assertEquals(2, tagsNode.size()); + + JsonNode propsNode = json.get("props"); + assertNotNull(propsNode); + assertEquals("TestBrand", propsNode.get("brand").asText()); + } + } + + @ParameterizedTest(name = "{0}: bulkUpsert skips documents with invalid fields gracefully") + @ArgumentsSource(AllStoresProvider.class) + void testBulkUpsertSkipsInvalidFields(String storeName) throws Exception { + String docId1 = generateDocId("bulk-skip-1"); + String docId2 = generateDocId("bulk-skip-2"); + String docId3 = generateDocId("bulk-skip-3"); + + Collection collection = getCollection(storeName); + + Map documents = new LinkedHashMap<>(); + + // First document - valid + documents.put(createKey(docId1), createTestDocument(docId1)); + + ObjectNode invalidFieldDoc = OBJECT_MAPPER.createObjectNode(); + invalidFieldDoc.put("id", getKeyString(docId2)); + invalidFieldDoc.put("item", "PartialItem"); + invalidFieldDoc.put("price", 200); + invalidFieldDoc.put("quantity", 20); + invalidFieldDoc.put("in_stock", false); + invalidFieldDoc.putArray("numbers").add("not-a-number").add("also-not-a-number"); + documents.put(createKey(docId2), new JSONDocument(invalidFieldDoc)); + + // Third document - valid + documents.put(createKey(docId3), createTestDocument(docId3)); + + boolean result = collection.bulkUpsert(documents); + assertTrue(result); + + for (String docId : List.of(docId1, docId2, docId3)) { + Query query = buildQueryById(docId); + try (CloseableIterator iter = collection.find(query)) { + assertTrue(iter.hasNext()); + } + } + + if (storeName.equals(POSTGRES_FLAT_STORE)) { + Query query2 = buildQueryById(docId2); + try (CloseableIterator iter = collection.find(query2)) { + assertTrue(iter.hasNext()); + JsonNode json = OBJECT_MAPPER.readTree(iter.next().toJson()); + // The 'numbers' field should be null/missing since it was skipped + assertTrue(json.get("numbers") == null || json.get("numbers").isNull()); + // But other fields should be present + assertEquals("PartialItem", json.get("item").asText()); + assertEquals(200, json.get("price").asInt()); + } + } + } + } + @Nested @DisplayName("Upsert Consistency Tests") class UpsertConsistencyTests { @@ -212,40 +388,6 @@ void testUpsertExistingDoc(String storeName) throws Exception { } } - @ParameterizedTest(name = "{0}: bulkUpsert multiple documents") - @ArgumentsSource(AllStoresProvider.class) - void testBulkUpsert(String storeName) throws Exception { - String docId1 = generateDocId("bulk-1"); - String docId2 = generateDocId("bulk-2"); - - Collection collection = getCollection(storeName); - - Map documents = new HashMap<>(); - documents.put(createKey(docId1), createTestDocument(docId1)); - documents.put(createKey(docId2), createTestDocument(docId2)); - - boolean result = collection.bulkUpsert(documents); - assertTrue(result); - - for (String docId : List.of(docId1, docId2)) { - Query query = buildQueryById(docId); - try (CloseableIterator iterator = collection.find(query)) { - assertTrue(iterator.hasNext()); - Document doc = iterator.next(); - JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); - - assertEquals("TestItem", json.get("item").asText()); - assertEquals(100, json.get("price").asInt()); - assertEquals(50, json.get("quantity").asInt()); - assertTrue(json.get("in_stock").asBoolean()); - - JsonNode tagsNode = json.get("tags"); - assertNotNull(tagsNode); - assertEquals(2, tagsNode.size()); - } - } - } - @ParameterizedTest(name = "{0}: upsert with non-existing fields (schema mismatch)") @ArgumentsSource(AllStoresProvider.class) void testUpsertNonExistingFields(String storeName) throws Exception { @@ -1087,4 +1229,43 @@ void testMultipleUpdatesOnSameFieldThrowsException(String storeName) throws IOEx } } } + + @Nested + @DisplayName("Delete Consistency Tests") + class DeleteConsistencyTests { + + @ParameterizedTest(name = "{0}: delete existing key returns true") + @ArgumentsSource(AllStoresProvider.class) + void testDeleteExistingKey(String storeName) throws Exception { + String docId = generateDocId("delete-existing"); + Key key = createKey(docId); + Collection collection = getCollection(storeName); + + Document document = createTestDocument(docId); + collection.upsert(key, document); + + Query query = buildQueryById(docId); + try (CloseableIterator iterator = collection.find(query)) { + assertTrue(iterator.hasNext()); + } + + boolean result = collection.delete(key); + assertTrue(result); + + try (CloseableIterator iterator = collection.find(query)) { + assertFalse(iterator.hasNext()); + } + } + + @ParameterizedTest(name = "{0}: delete on non-existent key returns false") + @ArgumentsSource(AllStoresProvider.class) + void testDeleteNonExistentKey(String storeName) { + Collection collection = getCollection(storeName); + + Key nonExistentKey = createKey("non-existent-key-" + System.nanoTime()); + + boolean result = collection.delete(nonExistentKey); + assertFalse(result); + } + } } diff --git a/document-store/src/integrationTest/resources/create/bulk_replace_initial_doc1.json b/document-store/src/integrationTest/resources/create/bulk_replace_initial_doc1.json new file mode 100644 index 000000000..dd0819943 --- /dev/null +++ b/document-store/src/integrationTest/resources/create/bulk_replace_initial_doc1.json @@ -0,0 +1,19 @@ +{ + "id": "default:bulk-replace-return-1", + "item": "Original1", + "price": 100, + "quantity": 50, + "tags": ["electronics", "sale"], + "numbers": [10, 20, 30], + "props": { + "brand": "OriginalBrand1", + "color": "red", + "nested": { + "key": "value1" + } + }, + "sales": { + "total": 500, + "region": "US" + } +} diff --git a/document-store/src/integrationTest/resources/create/bulk_replace_initial_doc2.json b/document-store/src/integrationTest/resources/create/bulk_replace_initial_doc2.json new file mode 100644 index 000000000..dfd1ecf55 --- /dev/null +++ b/document-store/src/integrationTest/resources/create/bulk_replace_initial_doc2.json @@ -0,0 +1,12 @@ +{ + "id": "default:bulk-replace-return-2", + "item": "Original2", + "price": 200, + "quantity": 75, + "tags": ["clothing", "premium"], + "scores": [1.5, 2.5, 3.5], + "props": { + "brand": "OriginalBrand2", + "size": "large" + } +} diff --git a/document-store/src/integrationTest/resources/create/bulk_replace_updated_doc1.json b/document-store/src/integrationTest/resources/create/bulk_replace_updated_doc1.json new file mode 100644 index 000000000..745b5f253 --- /dev/null +++ b/document-store/src/integrationTest/resources/create/bulk_replace_updated_doc1.json @@ -0,0 +1,9 @@ +{ + "id": "default:bulk-replace-return-1", + "item": "Updated1", + "price": 999, + "tags": ["newTag1"], + "props": { + "brand": "NewBrand1" + } +} diff --git a/document-store/src/integrationTest/resources/create/bulk_replace_updated_doc2.json b/document-store/src/integrationTest/resources/create/bulk_replace_updated_doc2.json new file mode 100644 index 000000000..b9ae65a72 --- /dev/null +++ b/document-store/src/integrationTest/resources/create/bulk_replace_updated_doc2.json @@ -0,0 +1,5 @@ +{ + "id": "default:bulk-replace-return-2", + "item": "Updated2", + "price": 888 +} diff --git a/document-store/src/integrationTest/resources/expected/bulk_replace_result_doc1.json b/document-store/src/integrationTest/resources/expected/bulk_replace_result_doc1.json new file mode 100644 index 000000000..745b5f253 --- /dev/null +++ b/document-store/src/integrationTest/resources/expected/bulk_replace_result_doc1.json @@ -0,0 +1,9 @@ +{ + "id": "default:bulk-replace-return-1", + "item": "Updated1", + "price": 999, + "tags": ["newTag1"], + "props": { + "brand": "NewBrand1" + } +} diff --git a/document-store/src/integrationTest/resources/expected/bulk_replace_result_doc2.json b/document-store/src/integrationTest/resources/expected/bulk_replace_result_doc2.json new file mode 100644 index 000000000..b9ae65a72 --- /dev/null +++ b/document-store/src/integrationTest/resources/expected/bulk_replace_result_doc2.json @@ -0,0 +1,5 @@ +{ + "id": "default:bulk-replace-return-2", + "item": "Updated2", + "price": 888 +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 66cd6b6c5..c1d5357a4 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -21,8 +21,8 @@ public interface Collection { * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How - * the existing fields are modified is implementation specific. For example, upserting - * { "foo2": "bar2" } + * the existing fields are modified is implementation specific. For example, upserting { + * "foo2": "bar2" } * if a document * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to @@ -42,8 +42,8 @@ public interface Collection { * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How - * the existing fields are modified is implementation specific. For example, upserting - * { "foo2": "bar2" } + * the existing fields are modified is implementation specific. For example, upserting { + * "foo2": "bar2" } * if a document * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to @@ -285,6 +285,17 @@ default boolean bulkCreateOrReplace(Map documents) { throw new UnsupportedOperationException("bulkCreateOrReplace is not supported"); } + /** + * Method to bulkCreateOrReplace the given documents and return the previous copies of those + * documents. This helps the clients to see how the documents were prior to upserting them and do + * that in one less round trip. + */ + default CloseableIterator bulkCreateOrReplaceReturnOlderDocuments( + Map documents) throws IOException { + throw new UnsupportedOperationException( + "bulkCreateOrReplaceReturnOlderDocuments is not supported!"); + } + /** * Atomically create a new document if the key does not exist in the collection or, replace the * existing document if the key exists in the collection and return the created/replaced document diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 27bb1e255..141b07b92 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -235,8 +235,8 @@ public boolean delete(Key key) { tableIdentifier, PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkForTable)); try (PreparedStatement preparedStatement = client.getConnection().prepareStatement(deleteSQL)) { preparedStatement.setString(1, key.toString()); - preparedStatement.executeUpdate(); - return true; + int rowsDeleted = preparedStatement.executeUpdate(); + return rowsDeleted > 0; } catch (SQLException e) { LOGGER.error("SQLException deleting document. key: {}", key, e); } @@ -379,8 +379,9 @@ public boolean bulkUpsert(Map documents) { } // Build the bulk upsert SQL with all columns + // Use COALESCE to preserve existing values when a document doesn't have a column List columnList = new ArrayList<>(allColumns); - String sql = buildMergeUpsertSql(columnList, quotedPkColumn, false); + String sql = buildMergeUpsertSql(columnList, quotedPkColumn, false, true); LOGGER.debug("Bulk upsert SQL: {}", sql); try (Connection conn = client.getPooledConnection(); @@ -524,13 +525,54 @@ public boolean bulkCreateOrReplace(Map documents) { e.getSQLState(), e.getErrorCode(), e); - } catch (IOException e) { + } catch (Exception e) { LOGGER.error("IOException in bulkCreateOrReplace. documents: {}", documents, e); } - return false; } + @Override + public CloseableIterator bulkCreateOrReplaceReturnOlderDocuments( + Map documents) throws IOException { + if (documents == null || documents.isEmpty()) { + return CloseableIterator.emptyIterator(); + } + + String tableName = tableIdentifier.getTableName(); + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + + Connection connection = null; + try { + connection = client.getPooledConnection(); + + PreparedStatement preparedStatement = + getPreparedStatementForQuery(documents, quotedPkColumn, connection, pkType); + + ResultSet resultSet = preparedStatement.executeQuery(); + + boolean replaceResult = bulkCreateOrReplace(documents); + if (!replaceResult) { + closeConnection(connection); + throw new IOException("bulkCreateOrReplace failed"); + } + + // note that connection will be closed after the iterator is used by the client + return new PostgresCollection.PostgresResultIteratorWithBasicTypes( + resultSet, connection, DocumentType.FLAT); + + } catch (SQLException e) { + LOGGER.error("SQLException in bulkCreateOrReplaceReturnOlderDocuments", e); + closeConnection(connection); + throw new IOException("Could not bulk createOrReplace the documents.", e); + } catch (Exception e) { + LOGGER.error("Exception in bulkCreateOrReplaceReturnOlderDocuments", e); + closeConnection(connection); + throw new IOException("Could not bulk createOrReplace the documents.", e); + } + } + /** * Builds a PostgreSQL upsert SQL statement with merge semantics. * @@ -544,6 +586,24 @@ public boolean bulkCreateOrReplace(Map documents) { */ private String buildMergeUpsertSql( List columns, String pkColumn, boolean includeReturning) { + return buildMergeUpsertSql(columns, pkColumn, includeReturning, false); + } + + /** + * Builds a PostgreSQL upsert SQL statement with merge semantics. + * + *

Generates: INSERT ... ON CONFLICT DO UPDATE SET col = EXCLUDED.col for each column. Only + * columns in the provided list are updated on conflict (merge behavior). + * + * @param columns List of quoted column names to include + * @param pkColumn The quoted primary key column name + * @param includeReturning If true, adds RETURNING clause to detect insert vs update + * @param useCoalesce If true, uses COALESCE(EXCLUDED.col, table.col) to preserve existing values + * when the new value is NULL + * @return The upsert SQL statement + */ + private String buildMergeUpsertSql( + List columns, String pkColumn, boolean includeReturning, boolean useCoalesce) { String columnList = String.join(", ", columns); String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); @@ -552,13 +612,29 @@ private String buildMergeUpsertSql( ? PostgresUtils.wrapFieldNamesWithDoubleQuotes(createdTsColumn) : null; - // Build SET clause for non-PK columns: col = EXCLUDED.col. Exclude createdTsColumn from updates - // to preserve original creation time + // Build SET clause for non-PK columns. + // If useCoalesce is true, use COALESCE(EXCLUDED.col, table.col) to preserve existing values + // when the new value is NULL (for bulk upsert merge semantics). + // Exclude createdTsColumn from updates to preserve original creation time. String setClause = columns.stream() .filter(col -> !col.equals(pkColumn)) .filter(col -> !col.equals(quotedCreatedTs)) - .map(col -> col + " = EXCLUDED." + col) + .map( + col -> { + if (useCoalesce) { + return col + + " = COALESCE(EXCLUDED." + + col + + ", " + + tableIdentifier + + "." + + col + + ")"; + } else { + return col + " = EXCLUDED." + col; + } + }) .collect(Collectors.joining(", ")); String sql = @@ -635,6 +711,7 @@ private PreparedStatement getPreparedStatementForQuery( @Override public void drop() { + // Table management(create/alter/drop) should be outside the collection. throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index 622996c94..af9e724a9 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -468,6 +468,183 @@ void testBulkUpdateClosesIteratorOnException() throws Exception { } } + @Nested + @DisplayName("delete(Key) Exception Handling Tests") + class DeleteKeyExceptionTests { + + @Test + @DisplayName("Should return false when prepareStatement throws SQLException") + void testDeleteReturnsFalseWhenPrepareStatementFails() throws Exception { + Key key = Key.from("test-key"); + + when(mockSchemaRegistry.getPrimaryKeyColumn(COLLECTION_NAME)).thenReturn(Optional.of("id")); + when(mockClient.getConnection()).thenReturn(mockConnection); + when(mockConnection.prepareStatement(anyString())) + .thenThrow(new SQLException("Statement preparation failed")); + + boolean result = flatPostgresCollection.delete(key); + + assertFalse(result); + } + + @Test + @DisplayName("Should return false when executeUpdate throws SQLException") + void testDeleteReturnsFalseWhenExecuteUpdateFails() throws Exception { + Key key = Key.from("test-key"); + + when(mockSchemaRegistry.getPrimaryKeyColumn(COLLECTION_NAME)).thenReturn(Optional.of("id")); + when(mockClient.getConnection()).thenReturn(mockConnection); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + when(mockPreparedStatement.executeUpdate()).thenThrow(new SQLException("Execute failed")); + + boolean result = flatPostgresCollection.delete(key); + + assertFalse(result); + } + } + + @Nested + @DisplayName("bulkCreateOrReplace Exception Handling Tests") + class BulkCreateOrReplaceExceptionTests { + + @Test + @DisplayName("Should return false on BatchUpdateException") + void testBulkCreateOrReplaceReturnsFalseOnBatchUpdateException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + Map documents = Map.of(key, document); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + java.sql.BatchUpdateException batchException = + new java.sql.BatchUpdateException("Batch failed", new int[] {}); + when(mockPreparedStatement.executeBatch()).thenThrow(batchException); + + boolean result = flatPostgresCollection.bulkCreateOrReplace(documents); + + assertFalse(result); + } + + @Test + @DisplayName("Should return false on SQLException") + void testBulkCreateOrReplaceReturnsFalseOnSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + Map documents = Map.of(key, document); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + SQLException sqlException = new SQLException("Connection failed", "08001", 1001); + when(mockPreparedStatement.executeBatch()).thenThrow(sqlException); + + boolean result = flatPostgresCollection.bulkCreateOrReplace(documents); + + assertFalse(result); + } + + @Test + @DisplayName("Should return false on generic Exception") + void testBulkCreateOrReplaceReturnsFalseOnGenericException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + Map documents = Map.of(key, document); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + RuntimeException runtimeException = new RuntimeException("Unexpected error"); + when(mockPreparedStatement.executeBatch()).thenThrow(runtimeException); + + boolean result = flatPostgresCollection.bulkCreateOrReplace(documents); + + assertFalse(result); + } + } + + @Nested + @DisplayName("bulkCreateOrReplaceReturnOlderDocuments Exception Handling Tests") + class BulkCreateOrReplaceReturnOlderDocumentsExceptionTests { + + @Mock private java.sql.Array mockSqlArray; + + private void setupBulkCreateOrReplaceReturnOlderDocsMocks( + Map schema) throws SQLException { + when(mockSchemaRegistry.getPrimaryKeyColumn(COLLECTION_NAME)).thenReturn(Optional.of("id")); + when(mockClient.getPooledConnection()).thenReturn(mockConnection); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + when(mockConnection.createArrayOf(anyString(), any())).thenReturn(mockSqlArray); + } + + @Test + @DisplayName("Should throw IOException on SQLException from getPooledConnection") + void testBulkCreateOrReplaceReturnOlderDocumentsThrowsOnSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + Map documents = Map.of(key, document); + + when(mockSchemaRegistry.getPrimaryKeyColumn(COLLECTION_NAME)).thenReturn(Optional.of("id")); + SQLException sqlException = new SQLException("Connection failed", "08001", 1001); + when(mockClient.getPooledConnection()).thenThrow(sqlException); + + IOException thrown = + assertThrows( + IOException.class, + () -> flatPostgresCollection.bulkCreateOrReplaceReturnOlderDocuments(documents)); + + assertTrue(thrown.getMessage().contains("Could not bulk createOrReplace")); + assertEquals(sqlException, thrown.getCause()); + } + + @Test + @DisplayName("Should throw IOException on SQLException from executeQuery") + void testBulkCreateOrReplaceReturnOlderDocumentsThrowsOnExecuteQuerySQLException() + throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + Map documents = Map.of(key, document); + + Map schema = createBasicSchema(); + setupBulkCreateOrReplaceReturnOlderDocsMocks(schema); + + SQLException sqlException = new SQLException("Query failed", "42000", 1002); + when(mockPreparedStatement.executeQuery()).thenThrow(sqlException); + + IOException thrown = + assertThrows( + IOException.class, + () -> flatPostgresCollection.bulkCreateOrReplaceReturnOlderDocuments(documents)); + + assertTrue(thrown.getMessage().contains("Could not bulk createOrReplace")); + assertEquals(sqlException, thrown.getCause()); + verify(mockConnection).close(); + } + + @Test + @DisplayName("Should throw IOException on generic Exception") + void testBulkCreateOrReplaceReturnOlderDocumentsThrowsOnGenericException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + Map documents = Map.of(key, document); + + Map schema = createBasicSchema(); + setupBulkCreateOrReplaceReturnOlderDocsMocks(schema); + + RuntimeException runtimeException = new RuntimeException("Unexpected error"); + when(mockPreparedStatement.executeQuery()).thenThrow(runtimeException); + + IOException thrown = + assertThrows( + IOException.class, + () -> flatPostgresCollection.bulkCreateOrReplaceReturnOlderDocuments(documents)); + + assertTrue(thrown.getMessage().contains("Could not bulk createOrReplace")); + assertEquals(runtimeException, thrown.getCause()); + verify(mockConnection).close(); + } + } + @Nested @DisplayName("convertTimestampForType Tests") class TimestampCoversionTests {