Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jspecify = "1.0.0"
junit = "6.0.2"
nmcp = "1.4.4"
picocli = "4.7.7"
postgresql = "42.7.9"
protobuf-plugin = "0.9.6"
protobuf = "3.25.8"
scala-library = "2.12.21"
Expand All @@ -24,6 +25,7 @@ shadow = "9.3.1"
slf4j = "2.0.17"
spark = "3.4.4"
spotless = "8.2.1"
testcontainers = "2.0.3"
validator = "3.0.0"

[libraries]
Expand Down Expand Up @@ -56,6 +58,7 @@ junit-platform-launcher = { module = "org.junit.platform:junit-platform-launcher
junit-platform-engine = { module = "org.junit.platform:junit-platform-engine" }
picocli = { module = "info.picocli:picocli", version.ref = "picocli" }
picocli-codegen = { module = "info.picocli:picocli-codegen", version.ref = "picocli" }
postgresql-jdbc = { module = "org.postgresql:postgresql", version.ref = "postgresql" }
protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "protobuf" }
protobuf-java-util = { module = "com.google.protobuf:protobuf-java-util", version.ref = "protobuf" }
protoc = { module = "com.google.protobuf:protoc", version.ref = "protobuf" }
Expand All @@ -68,9 +71,13 @@ spark-catalyst = { module = "org.apache.spark:spark-catalyst_2.12", version.ref
spark-core = { module = "org.apache.spark:spark-core_2.12", version.ref = "spark" }
spark-hive = { module = "org.apache.spark:spark-hive_2.12", version.ref = "spark" }
spark-sql = { module = "org.apache.spark:spark-sql_2.12", version.ref = "spark" }
testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" }
testcontainers-junit5 = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" }
testcontainers-postgres = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" }

[bundles]
jackson = [ "jackson-databind", "jackson-annotations", "jackson-datatype-jdk8", "jackson-dataformat-yaml" ]
testcontainers = [ "testcontainers", "testcontainers-junit5", "testcontainers-postgres" ]

[plugins]
graal = { id = "org.graalvm.buildtools.native", version.ref = "graal-plugin" }
Expand Down
6 changes: 6 additions & 0 deletions isthmus/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ dependencies {
}
testImplementation(libs.protobuf.java)
api(libs.jspecify)

testImplementation(libs.bundles.testcontainers)

testImplementation(libs.postgresql.jdbc)

testImplementation(libs.slf4j.jdk14)
}

tasks {
Expand Down
26 changes: 26 additions & 0 deletions isthmus/src/main/java/io/substrait/isthmus/SubstraitToSql.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,48 @@
package io.substrait.isthmus;

import io.substrait.extension.SimpleExtension;
import io.substrait.plan.Plan;
import io.substrait.plan.Plan.Root;
import io.substrait.relation.Rel;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.sql.SqlDialect;

public class SubstraitToSql extends SqlConverterBase {

protected SubstraitToCalcite substraitToCalcite;

public SubstraitToSql() {
super(FEATURES_DEFAULT);
}

public SubstraitToSql(SimpleExtension.ExtensionCollection extensions) {
super(FEATURES_DEFAULT, extensions);

substraitToCalcite = new SubstraitToCalcite(extensions, factory);
}

public RelNode substraitRelToCalciteRel(Rel relRoot, Prepare.CatalogReader catalog) {
return SubstraitRelNodeConverter.convert(
relRoot, relOptCluster, catalog, parserConfig, extensionCollection);
}

public List<String> convert(Plan plan, SqlDialect dialect) {
List<String> result = new ArrayList<>();
RelToSqlConverter relToSql = new RelToSqlConverter(dialect);

for (Root root : plan.getRoots()) {
result.add(
relToSql
.visitRoot(substraitToCalcite.convert(root).project(true))
.asStatement()
.toSqlString(dialect)
.getSql());
}

return result;
}
}
4 changes: 2 additions & 2 deletions isthmus/src/test/java/io/substrait/isthmus/PlanTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.google.common.annotations.Beta;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import io.substrait.dsl.SubstraitBuilder;
import io.substrait.extension.DefaultExtensionCatalog;
Expand All @@ -24,6 +23,7 @@
import io.substrait.type.Type;
import io.substrait.type.TypeCreator;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.adapter.tpcds.TpcdsSchema;
Expand Down Expand Up @@ -79,7 +79,7 @@ protected PlanTestBase(SimpleExtension.ExtensionCollection extensions) {
}

public static String asString(String resource) throws IOException {
return Resources.toString(Resources.getResource(resource), Charsets.UTF_8);
return Resources.toString(Resources.getResource(resource), StandardCharsets.UTF_8);
}

protected Plan assertProtoPlanRoundrip(String query) throws SqlParseException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.substrait.isthmus.integration;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.substrait.isthmus.PlanTestBase;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.isthmus.SubstraitToSql;
import io.substrait.plan.Plan;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.stream.IntStream;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
import org.apache.calcite.sql.parser.SqlParseException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.JdbcDatabaseContainer.NoDriverFoundException;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.postgresql.PostgreSQLContainer;

@Testcontainers
class PostgreSqlIntegrationTest extends PlanTestBase {
private static final Logger LOG = LoggerFactory.getLogger(PostgreSqlIntegrationTest.class);

@Container
PostgreSQLContainer postgres =
new PostgreSQLContainer("postgres:18")
.withClasspathResourceMapping("tpch/data", "/tmp/tpc-h", BindMode.READ_ONLY)
.withClasspathResourceMapping(
"tpch/postgresql/tpch_init.sql",
"/docker-entrypoint-initdb.d/tpch_init.sql",
BindMode.READ_ONLY);

private static final String COMPARE_RESULTS_SQL_TEMPLATE =
"""
WITH expected AS (%s),
actual AS (%s)
SELECT count(*) FROM (
SELECT * FROM
(SELECT * FROM expected EXCEPT SELECT * FROM actual)
UNION (SELECT * FROM actual EXCEPT SELECT * FROM expected)
)
""";

static IntStream tpcHTestCases() {
// TODO: query 21 currently does not produce the same result when run through Substrait
return IntStream.rangeClosed(1, 22).filter(i -> i != 21);
}

@ParameterizedTest
@MethodSource("tpcHTestCases")
void testTpcH(int queryNo)
throws NoDriverFoundException, SQLException, IOException, SqlParseException {

String inputSql = asString(String.format("tpch/queries/%02d.sql", queryNo));
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.convert(inputSql, TPCH_CATALOG);

SubstraitToSql substraitToSql = new SubstraitToSql(extensions);

String generatedSql = substraitToSql.convert(plan, PostgresqlSqlDialect.DEFAULT).get(0);

String referenceSql = asString(String.format("tpch/postgresql/%02d.sql", queryNo));

String compareSql = String.format(COMPARE_RESULTS_SQL_TEMPLATE, referenceSql, generatedSql);

LOG.atInfo().log(compareSql);

try (Connection conn = postgres.createConnection("");
Statement stmt = conn.createStatement();
ResultSet result = stmt.executeQuery(compareSql); ) {
// we expect exactly one row
assertTrue(result.next());

// the count should be zero if both the reference and generated SQL produce the same results
assertEquals(0, result.getInt(1));

// we expect exactly one row
assertFalse(result.next());
}
}
}
Loading