From 133b75392d8db4084805015caeb0430da67d651a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 30 Jun 2026 00:45:34 +0000 Subject: [PATCH] feat(bigframes): Add productionize mode to bigframes --- .../ibis_compiler/scalar_op_compiler.py | 20 + .../compile/sqlglot/expression_compiler.py | 4 + .../bigframes/bigframes/core/expression.py | 68 +++- .../bigframes/core/rewrite/timedeltas.py | 3 + .../bigframes/functions/_function_client.py | 23 +- .../bigframes/functions/_function_session.py | 47 +++ .../bigframes/bigframes/pandas/__init__.py | 3 + .../bigframes/session/productionize.py | 378 ++++++++++++++++++ .../bigframes/session/proxy_executor.py | 10 + .../tests/system/small/test_productionize.py | 187 +++++++++ .../tests/unit/session/test_productionize.py | 205 ++++++++++ .../ibis/backends/sql/compilers/base.py | 3 + .../bigframes_vendored/ibis/expr/rewrites.py | 4 +- 13 files changed, 940 insertions(+), 15 deletions(-) create mode 100644 packages/bigframes/bigframes/session/productionize.py create mode 100644 packages/bigframes/tests/system/small/test_productionize.py create mode 100644 packages/bigframes/tests/unit/session/test_productionize.py diff --git a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py index 31a8459923c4..05f5687a83b6 100644 --- a/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py +++ b/packages/bigframes/bigframes/core/compile/ibis_compiler/scalar_op_compiler.py @@ -61,6 +61,26 @@ def _( expression.value, expression.dtype ) + @compile_expression.register + def _( + self, + expression: ex.ParameterExpression, + bindings: typing.Dict[str, ibis_types.Value], + ) -> ibis_types.Value: + import bigframes.core.compile.ibis_types as ibis_types_mod + + ibis_type = ibis_types_mod.bigframes_dtype_to_ibis_dtype(expression.dtype) + p = bigframes_vendored.ibis.param(ibis_type) + + # Record the mapping from Ibis auto-generated parameter name to user's parameter name + from bigframes.session.productionize import _state as prod_state + + if prod_state.active and prod_state.pipeline is not None: + counter = p.op().counter + prod_state.pipeline.recorded_params[f"param_{counter}"] = expression.name + + return p + @compile_expression.register def _( self, diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/expression_compiler.py b/packages/bigframes/bigframes/core/compile/sqlglot/expression_compiler.py index d07cb2186fbc..2dac8b361629 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/expression_compiler.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/expression_compiler.py @@ -79,6 +79,10 @@ def _(self, expr: ex.DerefOp) -> sge.Expression: def _(self, expr: ex.ScalarConstantExpression) -> sge.Expression: return sql.literal(expr.value, expr.dtype) + @compile_expression.register + def _(self, expr: ex.ParameterExpression) -> sge.Expression: + return sge.Parameter(this=sge.to_identifier(expr.name)) + @compile_expression.register def _(self, expr: agg_exprs.WindowExpression) -> sge.Expression: import bigframes.core.compile.sqlglot.aggregate_compiler as agg_compile diff --git a/packages/bigframes/bigframes/core/expression.py b/packages/bigframes/bigframes/core/expression.py index 6c27dfc120b6..8ce5a227165f 100644 --- a/packages/bigframes/bigframes/core/expression.py +++ b/packages/bigframes/bigframes/core/expression.py @@ -32,9 +32,9 @@ import bigframes.operations -def const( - value: typing.Hashable, dtype: dtypes.ExpressionType = None -) -> ScalarConstantExpression: +def const(value: typing.Hashable, dtype: dtypes.ExpressionType = None) -> Expression: + if isinstance(value, Parameter): + return ParameterExpression(value.name, value.dtype) return ScalarConstantExpression(value, dtype or dtypes.infer_literal_type(value)) @@ -531,3 +531,65 @@ def bind_schema_fields( RefOrConstant = Union[DerefOp, ScalarConstantExpression] + + +@dataclasses.dataclass(frozen=True) +class Parameter: + """Represents a named parameter in a productionized pipeline.""" + + name: str + dtype: dtypes.ExpressionType + + +@dataclasses.dataclass(frozen=True) +class ParameterExpression(Expression): + """An expression representing a query parameter.""" + + name: str + dtype: dtypes.ExpressionType + + @property + def is_const(self) -> bool: + return True + + @property + def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: + return () + + @property + def nullable(self) -> bool: + return True + + @property + def is_resolved(self) -> bool: + return True + + @property + def output_type(self) -> dtypes.ExpressionType: + return self.dtype + + def bind_variables( + self, + bindings: Mapping[Hashable, Expression], + allow_partial_bindings: bool = False, + ) -> Expression: + return self + + def bind_refs( + self, + bindings: Mapping[ids.ColumnId, Expression], + allow_partial_bindings: bool = False, + ) -> ParameterExpression: + return self + + @property + def is_bijective(self) -> bool: + return True + + def __eq__(self, other): + if not isinstance(other, ParameterExpression): + return False + return self.name == other.name and self.dtype == other.dtype + + def transform_children(self, t: Callable[[Expression], Expression]) -> Expression: + return self diff --git a/packages/bigframes/bigframes/core/rewrite/timedeltas.py b/packages/bigframes/bigframes/core/rewrite/timedeltas.py index 7544963732e0..31eb51ad6326 100644 --- a/packages/bigframes/bigframes/core/rewrite/timedeltas.py +++ b/packages/bigframes/bigframes/core/rewrite/timedeltas.py @@ -104,6 +104,9 @@ def _rewrite_expressions(expr: ex.Expression, schema: schema.ArraySchema) -> _Ty if isinstance(expr, ex.ScalarConstantExpression): return _rewrite_scalar_constant_expr(expr) + if isinstance(expr, ex.ParameterExpression): + return _TypedExpr(expr, expr.dtype) + if isinstance(expr, ex.OpExpression): updated_inputs = tuple( map(lambda x: _rewrite_expressions(x, schema), expr.inputs) diff --git a/packages/bigframes/bigframes/functions/_function_client.py b/packages/bigframes/bigframes/functions/_function_client.py index 69f99b50276f..b2df98e82ebd 100644 --- a/packages/bigframes/bigframes/functions/_function_client.py +++ b/packages/bigframes/bigframes/functions/_function_client.py @@ -198,15 +198,12 @@ def create_bq_remote_function( ) self._create_bq_function(create_function_ddl) - def provision_bq_managed_function( + def generate_bq_managed_function_ddl( self, routine_ref: bigquery.RoutineReference, config: udf_def.ManagedFunctionConfig, - ): - """Create a BigQuery managed function.""" - - # TODO(b/406283812): Expose the capability to pass down - # capture_references=True in the public udf API. + ) -> str: + """Generate the CREATE FUNCTION DDL for a BigQuery managed function.""" if ( config.capture_references and (python_version := _utils.get_python_version()) @@ -232,8 +229,7 @@ def provision_bq_managed_function( if config.container_memory: managed_function_options["container_memory"] = config.container_memory - # Augment user package requirements with any internal package - # requirements. + # Augment user package requirements with any internal package requirements. packages = _utils.get_updated_package_requirements( config.code.package_requirements or [], config.signature.is_row_processor, @@ -258,9 +254,6 @@ def provision_bq_managed_function( else "" ) - # Generate the complete Python code block for the managed Python UDF, - # including the user's function, necessary imports, and the BigQuery - # handler wrapper. python_code_block = bff_template.generate_managed_function_code( config.code, config.signature, config.capture_references ) @@ -281,7 +274,15 @@ def provision_bq_managed_function( .strip() .replace("__UDF_PLACE_HOLDER__", python_code_block) ) + return create_function_ddl + def provision_bq_managed_function( + self, + routine_ref: bigquery.RoutineReference, + config: udf_def.ManagedFunctionConfig, + ): + """Create a BigQuery managed function.""" + create_function_ddl = self.generate_bq_managed_function_ddl(routine_ref, config) self._ensure_dataset_exists( bigquery.DatasetReference(routine_ref.project, routine_ref.dataset_id) ) diff --git a/packages/bigframes/bigframes/functions/_function_session.py b/packages/bigframes/bigframes/functions/_function_session.py index 2bc2b597372b..9fd620f9c39e 100644 --- a/packages/bigframes/bigframes/functions/_function_session.py +++ b/packages/bigframes/bigframes/functions/_function_session.py @@ -168,6 +168,19 @@ def _deploy_managed_function( temp: bool, dataset: Optional[bigquery.DatasetReference] = None, ) -> udf_def.BigqueryUdf: + from bigframes.session.productionize import _state as prod_state + + if prod_state.active: + routine_ref = self._resolve_routine_reference(name, dataset=dataset) + ddl = self._function_client.generate_bq_managed_function_ddl( + routine_ref, config + ) + prod_state.pipeline.recorded_udfs[str(routine_ref)] = ddl + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=config.signature, + ) + routine_ref = self._resolve_routine_reference(name, dataset=dataset) if temp: self._add_temp_remote_function(routine_ref) @@ -183,6 +196,28 @@ def _deploy_udf( self, bq_udf: udf_def.PythonUdf, ) -> udf_def.BigqueryUdf: + """Deploys a UDF to BigQuery if not already deployed.""" + from bigframes.session.productionize import _state as prod_state + + if prod_state.active: + config = bq_udf.to_managed_function_config() + bq_function_name = get_managed_function_name(config, self.session_id) + routine_ref = self._resolve_routine_reference(bq_function_name) + + ddl = self._function_client.generate_bq_managed_function_ddl( + routine_ref, config + ) + prod_state.pipeline.recorded_udfs[str(routine_ref)] = ddl + + udf_hash = bq_udf.stable_hash() + with self._artifacts_lock: + self._deployed_routines.add(udf_hash) + + return udf_def.BigqueryUdf( + routine_ref=routine_ref, + signature=bq_udf.signature, + ) + """Deploys a UDF to BigQuery if not already deployed.""" udf_hash = bq_udf.stable_hash() @@ -261,6 +296,18 @@ def remote_function( ] = "internal-only", cloud_build_service_account: Optional[str] = None, ): + from bigframes.session.productionize import ( + ProductionizeBlockedError, + ) + from bigframes.session.productionize import ( + _state as prod_state, + ) + + if prod_state.active: + raise ProductionizeBlockedError( + "Creating remote functions is not supported in productionize mode." + ) + """Decorator to turn a user defined function into a BigQuery remote function. .. deprecated:: 0.0.1 diff --git a/packages/bigframes/bigframes/pandas/__init__.py b/packages/bigframes/bigframes/pandas/__init__.py index b88816ab5ab2..81250138e841 100644 --- a/packages/bigframes/bigframes/pandas/__init__.py +++ b/packages/bigframes/bigframes/pandas/__init__.py @@ -114,6 +114,7 @@ read_parquet, read_pickle, ) +from bigframes.session.productionize import parameter, productionize try: import resource @@ -525,6 +526,8 @@ def reset_session(): "close_session", "reset_session", "udf", + "parameter", + "productionize", ] _module = sys.modules[__name__] diff --git a/packages/bigframes/bigframes/session/productionize.py b/packages/bigframes/bigframes/session/productionize.py new file mode 100644 index 000000000000..266f1c5c1ac5 --- /dev/null +++ b/packages/bigframes/bigframes/session/productionize.py @@ -0,0 +1,378 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import os +import re +import threading +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple + +import google.cloud.bigquery as bigquery + +import bigframes.dtypes +import bigframes.session.executor as executor +from bigframes.core.expression import Parameter + +if TYPE_CHECKING: + import bigframes.core + import bigframes.core.schema + import bigframes.session.execution_spec as ex_spec + + +class ProductionizeBlockedError(RuntimeError): + """Exception raised when an operation attempts immediate execution or data download in productionize mode.""" + + pass + + +class ProductionizeState(threading.local): + def __init__(self): + super().__init__() + self.active: bool = False + self.pipeline: Optional[PipelineDefinition] = None + + +# Thread-local state singleton +_state = ProductionizeState() + + +class MockQueryJob: + """A mock BigQuery QueryJob that only exposes the destination table.""" + + def __init__(self, destination: bigquery.TableReference): + self.destination = destination + + +class MockExecuteResult(executor.ExecuteResult): + """A mock ExecuteResult returned during productionize interception.""" + + def __init__( + self, + destination_table: Optional[bigquery.TableReference], + schema: bigframes.core.schema.ArraySchema, + ): + self._schema = schema + self._query_job = MockQueryJob(destination_table) if destination_table else None + self._metadata = executor.ExecutionMetadata( + query_job=self._query_job, + bytes_processed=0, + ) + + @property + def execution_metadata(self) -> executor.ExecutionMetadata: + return self._metadata + + @property + def schema(self) -> bigframes.core.schema.ArraySchema: + return self._schema + + def batches(self, sample_rate: Optional[float] = None) -> executor.ResultsIterator: + return executor.ResultsIterator(iter([]), self._schema, 0, 0) + + +class PipelineDefinition: + """Context manager and recorder for a productionized BigQuery DataFrames pipeline.""" + + def __init__(self, session: bigframes.session.Session): + self.session = session + # Maps target table name (str) -> Tuple[ArrayValue, TableOutputSpec] + self.recorded_writes: Dict[ + str, Tuple[bigframes.core.ArrayValue, ex_spec.TableOutputSpec] + ] = {} + # List of GCS export specs and their corresponding ArrayValues + self.recorded_exports: List[ + Tuple[ex_spec.GcsOutputSpec, bigframes.core.ArrayValue] + ] = [] + # Maps UDF routine name (str) -> UDF DDL (str) + self.recorded_udfs: Dict[str, str] = {} + # Maps Ibis parameter name (str) -> User parameter name (str) + self.recorded_params: Dict[str, str] = {} + + def __enter__(self) -> PipelineDefinition: + self._prev_active = _state.active + self._prev_pipeline = _state.pipeline + + _state.active = True + _state.pipeline = self + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + _state.active = self._prev_active + _state.pipeline = self._prev_pipeline + + def _build_dependency_graph(self) -> Tuple[List[str], Dict[str, Set[str]]]: + """Builds the dependency graph and returns the topologically sorted tables.""" + dependencies = {table: set() for table in self.recorded_writes} + + for table, (array_value, _) in self.recorded_writes.items(): + read_tables = _find_read_tables(array_value.node) + for read_table in read_tables: + if read_table in self.recorded_writes: + # table depends on read_table + dependencies[table].add(read_table) + + # Topological sort (DFS) + visited = {} + order = [] + + def visit(node): + if visited.get(node) == "visiting": + raise ValueError( + f"Cyclic dependency detected in pipeline involving: {node}" + ) + if visited.get(node) == "visited": + return + + visited[node] = "visiting" + for neighbor in dependencies.get(node, []): + visit(neighbor) + visited[node] = "visited" + order.append(node) + + for node in dependencies: + visit(node) + + return order, dependencies + + def to_sql(self) -> str: + """Compiles the recorded pipeline into a single sequential SQL script.""" + if not self.recorded_writes and not self.recorded_udfs: + return "" + + order, _ = self._build_dependency_graph() + statements = [] + + # 1. Add UDF definitions + for udf_ddl in self.recorded_udfs.values(): + statements.append(udf_ddl + ";") + + # 2. Add table creation statements + for table_name in order: + array_value, _ = self.recorded_writes[table_name] + # Compile to SQL using the session's executor + raw_sql = self.session._executor.to_sql(array_value, ordered=False) + statements.append(f"CREATE OR REPLACE TABLE `{table_name}` AS\n{raw_sql};") + + sql = "\n\n".join(statements) + + # Rewrite Ibis-generated parameter names to user-defined names + for ibis_name, user_name in self.recorded_params.items(): + sql = re.sub(rf"(? udf_id + for routine_name, udf_ddl in self.recorded_udfs.items(): + parts = routine_name.split(".") + udf_id = parts[-1] + udf_mapping[routine_name] = udf_id + + config_block = f'config {{\n type: "operations",\n name: "{udf_id}"\n}}\n' + file_path = os.path.join(defs_dir, f"{udf_id}.sqlx") + with open(file_path, "w", encoding="utf-8") as f: + f.write(config_block) + f.write("\n") + f.write(udf_ddl) + f.write("\n") + + # 2. Export Tables + for table_name in order: + array_value, dest_spec = self.recorded_writes[table_name] + + # Parse the table name + parts = table_name.split(".") + if len(parts) == 3: + project_id, dataset_id, table_id = parts[0], parts[1], parts[2] + elif len(parts) == 2: + project_id = self.session.bqclient.project + dataset_id, table_id = parts[0], parts[1] + else: + raise ValueError(f"Invalid recorded table name: {table_name}") + + # Compile to SQL + sql = self.session._executor.to_sql(array_value, ordered=False) + + # Rewrite Ibis-generated parameter names to Dataform variables + for ibis_name, user_name in self.recorded_params.items(): + sql = re.sub( + rf"(? Set[str]: + """Traverses the AST and returns a set of all fully qualified table names read by the query.""" + from bigframes.core.nodes import ReadTableNode + + def reduction(n, children_results): + results = set() + for child_res in children_results: + results.update(child_res) + if isinstance(n, ReadTableNode): + table_ref = n.source.table + table_name = ( + f"{table_ref.project_id}.{table_ref.dataset_id}.{table_ref.table_id}" + ) + results.add(table_name) + return results + + return node.reduce_up(reduction) + + +def _replace_table_ref( + sql: str, project_id: str, dataset_id: str, table_id: str +) -> str: + """Replaces raw BigQuery table references in SQL with Dataform ref() calls.""" + p = re.escape(project_id) + d = re.escape(dataset_id) + t = re.escape(table_id) + + patterns = [ + rf"`{p}\.{d}\.{t}`", + rf"`{d}\.{t}`", + rf"{p}\.{d}\.{t}", + rf"{d}\.{t}", + ] + + combined_pattern = "|".join(patterns) + return re.sub(combined_pattern, f'${{ref("{table_id}")}}', sql, flags=re.IGNORECASE) + + +def intercept_execution( + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + pipeline: PipelineDefinition, +) -> executor.ExecuteResult: + """Intercepts execution requests and records them or blocks them accordingly.""" + import bigframes.session.execution_spec as xs + + dest_spec = execution_spec.destination_spec + if isinstance(dest_spec, xs.TableOutputSpec): + # Record the write operation (store both array_value and dest_spec) + table_ref = dest_spec.table + table_name = f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}" + pipeline.recorded_writes[table_name] = (array_value, dest_spec) + + # Return a mock result so the calling code (like to_gbq) believes it succeeded + return MockExecuteResult(table_ref, array_value.schema) + + elif isinstance(dest_spec, xs.GcsOutputSpec): + # Record the GCS export operation + pipeline.recorded_exports.append((dest_spec, array_value)) + return MockExecuteResult(None, array_value.schema) + + elif isinstance(dest_spec, xs.EphemeralTableSpec): + # Ephemeral tables are used for caching/staging. We block them to keep the pipeline static. + raise ProductionizeBlockedError( + "Caching or temporary table materialization (e.g., df.cache()) is not supported " + "in productionize mode. Please define a pure, lazy data pipeline." + ) + + else: + # No destination spec means we are trying to download data to Python (to_pandas, head, etc.) + raise ProductionizeBlockedError( + "Immediate execution/data downloading is disabled in productionize mode. " + "You cannot call to_pandas(), head(), shape, or other data-fetching operations " + "inside this context." + ) + + +def productionize() -> PipelineDefinition: + """Enters a 'productionize' context where execution is deferred and compiled into a pipeline.""" + import bigframes.core.global_session as global_session + + return PipelineDefinition(global_session.get_global_session()) + + +def parameter(name: str, dtype: Any) -> Parameter: + """Define a parameter for the productionized pipeline. + + Args: + name: The name of the parameter (will be used as @name in SQL). + dtype: The type of the parameter (e.g. int, str, datetime.date). + """ + bf_dtype = bigframes.dtypes.bigframes_type(dtype) + return Parameter(name, bf_dtype) + + +def _replace_parameter_refs(sql: str) -> str: + """Replaces BigQuery query parameters (@param) with Dataform project variables (${dataform.projectConfig.vars.param}).""" + return re.sub( + r"(? executor.ExecuteResult: + from bigframes.session.productionize import ( + _state as prod_state, + ) + from bigframes.session.productionize import ( + intercept_execution, + ) + + if prod_state.active: + return intercept_execution(array_value, execution_spec, prod_state.pipeline) + compiler_option = bigframes.options.experiments.sql_compiler if compiler_option == "legacy": return self._ibis_executor.execute(array_value, execution_spec) diff --git a/packages/bigframes/tests/system/small/test_productionize.py b/packages/bigframes/tests/system/small/test_productionize.py new file mode 100644 index 000000000000..77fb42c87c78 --- /dev/null +++ b/packages/bigframes/tests/system/small/test_productionize.py @@ -0,0 +1,187 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +import pytest +from google.cloud import bigquery, dataform_v1beta1 + +import bigframes.core.global_session as global_session +import bigframes.pandas as bpd + + +def test_productionize_e2e(session, project_id, dataset_id, tmp_path): + # 1. Define unique table names + unique_suffix = uuid.uuid4().hex[:8] + source_table = f"{dataset_id}.test_prod_src_{unique_suffix}" + dest_table = f"{dataset_id}.test_prod_dest_{unique_suffix}" + udf_name = f"my_e2e_udf_{unique_suffix}" + + # Seed the source table with some data + df_source = bpd.DataFrame({"col": [1, 2, 3, 4, 5]}, session=session) + df_source.to_gbq(source_table, if_exists="replace") + + try: + # 2. Run the pipeline in productionize mode + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + # Define a parameter + limit_val = bpd.parameter("limit_val", dtype=int) + + # Define a managed UDF + @bpd.udf(input_types=[int], output_type=int, name=udf_name) + def add_one(x): + return x + 1 + + # Read, filter, map, and write + df = bpd.read_gbq(source_table) + df_filtered = df[df["col"] > limit_val] + df_mapped = df_filtered.copy() + df_mapped["col"] = add_one(df_filtered["col"]) + + df_mapped.to_gbq(dest_table, if_exists="replace") + + # 3. Verify SQL compilation by running it on BigQuery + sql = pipeline.to_sql() + assert "@limit_val" in sql + assert udf_name in sql + + # Run the SQL script on BigQuery with a parameter value + job_config = bigquery.QueryJobConfig( + query_parameters=[bigquery.ScalarQueryParameter("limit_val", "INT64", 2)] + ) + query_job = session.bqclient.query(sql, job_config=job_config) + query_job.result() # Wait for the script to complete + + # Verify the destination table was created and contains the expected data + # Source: [1, 2, 3, 4, 5] -> Filter > 2: [3, 4, 5] -> Add 1: [4, 5, 6] + df_result = session.read_gbq(dest_table) + pd_df = df_result.to_pandas() + assert list(pd_df["col"].sort_values()) == [4, 5, 6] + + # 4. Verify Dataform export by uploading and compiling in the real repo + target_dir = str(tmp_path) + pipeline.export_dataform(target_dir) + + # Connect to Dataform + dataform_client = dataform_v1beta1.DataformClient() + repo_name = "projects/bigframes-testing/locations/us-central1/repositories/bigframes-testing-dataform" + workspace_id = f"test-ws-{unique_suffix}" + workspace_name = f"{repo_name}/workspaces/{workspace_id}" + + # Create a temporary workspace + print(f"Creating Dataform workspace: {workspace_id}") + dataform_client.create_workspace( + request=dataform_v1beta1.CreateWorkspaceRequest( + parent=repo_name, + workspace_id=workspace_id, + ) + ) + + try: + # Initialize the workspace with package.json and workflow_settings.yaml + # (since the repository might be empty) + package_json_content = """{ + "dependencies": { + "@dataform/core": "3.0.2" + } +}""" + dataform_client.write_file( + request=dataform_v1beta1.WriteFileRequest( + workspace=workspace_name, + path="package.json", + contents=package_json_content.encode("utf-8"), + ) + ) + + # Extract the dataset name from the fully qualified dataset_id + dataset_name = dataset_id.split(".")[-1] + workflow_settings_content = f"""defaultProject: {project_id} +defaultDataset: {dataset_name} +defaultLocation: us-central1 +""" + dataform_client.write_file( + request=dataform_v1beta1.WriteFileRequest( + workspace=workspace_name, + path="workflow_settings.yaml", + contents=workflow_settings_content.encode("utf-8"), + ) + ) + + # Recursively upload all exported files to the workspace + for root, _, files in os.walk(target_dir): + for file in files: + local_path = os.path.join(root, file) + relative_path = os.path.relpath(local_path, target_dir) + + with open(local_path, "r", encoding="utf-8") as f: + contents = f.read() + + # Write the file to the workspace + dataform_client.write_file( + request=dataform_v1beta1.WriteFileRequest( + workspace=workspace_name, + path=relative_path, + contents=contents.encode("utf-8"), + ) + ) + + # Trigger compilation of the workspace + print("Compiling Dataform workspace...") + compilation_result = dataform_client.create_compilation_result( + request=dataform_v1beta1.CreateCompilationResultRequest( + parent=repo_name, + compilation_result=dataform_v1beta1.CompilationResult( + workspace=workspace_name + ), + ) + ) + + # Verify there are no compilation errors + errors = compilation_result.compilation_errors + if errors: + error_messages = "\n".join(f"{e.path}: {e.message}" for e in errors) + pytest.fail( + f"Dataform compilation failed with errors:\n{error_messages}" + ) + + print("Dataform compilation succeeded!") + + finally: + # Clean up the workspace + print(f"Deleting Dataform workspace: {workspace_id}") + try: + dataform_client.delete_workspace(name=workspace_name) + except Exception as e: + print(f"Failed to delete workspace {workspace_id}: {e}") + + finally: + # Clean up BigQuery tables and UDFs + print("Cleaning up BigQuery resources...") + try: + session.bqclient.delete_table(source_table, not_found_ok=True) + except Exception: + pass + try: + session.bqclient.delete_table(dest_table, not_found_ok=True) + except Exception: + pass + try: + # Delete the UDF + session.bqclient.delete_routine( + f"{dataset_id}.{udf_name}", not_found_ok=True + ) + except Exception: + pass diff --git a/packages/bigframes/tests/unit/session/test_productionize.py b/packages/bigframes/tests/unit/session/test_productionize.py new file mode 100644 index 000000000000..f693ab8f4e37 --- /dev/null +++ b/packages/bigframes/tests/unit/session/test_productionize.py @@ -0,0 +1,205 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import google.cloud.bigquery +import pytest + +import bigframes +import bigframes.core.global_session as global_session +import bigframes.pandas as bpd +from bigframes.session.productionize import ( + ProductionizeBlockedError, +) +from bigframes.testing import mocks + + +@pytest.fixture(autouse=True) +def setup_session_started(): + """Fixture to pretend a session has already started, skipping location auto-detection.""" + original = bigframes.options.bigquery._session_started + bigframes.options.bigquery._session_started = True + yield + bigframes.options.bigquery._session_started = original + + +def _setup_mock_table(session, project, dataset, table_id): + """Helper to register a mock table in the session's bigquery client.""" + table_ref = google.cloud.bigquery.TableReference( + google.cloud.bigquery.DatasetReference(project, dataset), + table_id, + ) + table = google.cloud.bigquery.Table( + table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),) + ) + table._properties["location"] = session._location + table._properties["numRows"] = "10" + table._properties["type"] = "TABLE" + + # Mock get_table to return this table + session.bqclient.get_table.return_value = table + return table + + +def test_productionize_blocks_downloads_and_execution(): + session = mocks.create_bigquery_session() + _setup_mock_table(session, "project", "dataset", "table") + + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + # Create a mock dataframe (backed by a mock table) + df = bpd.read_gbq("project.dataset.table") + + # 1. to_pandas() should be blocked + with pytest.raises( + ProductionizeBlockedError, + match="Immediate execution/data downloading is disabled", + ): + df.to_pandas() + + # 2. shape should be blocked (since it triggers execution to count rows) + with pytest.raises( + ProductionizeBlockedError, + match="Immediate execution/data downloading is disabled", + ): + _ = df.shape + + # 3. Lazy operations should succeed + df_lazy = df.sort_values("col").head() + + # 4. Downloading the lazy result should be blocked + with pytest.raises( + ProductionizeBlockedError, + match="Immediate execution/data downloading is disabled", + ): + df_lazy.to_pandas() + + +def test_productionize_intercepts_to_gbq(): + session = mocks.create_bigquery_session() + _setup_mock_table(session, "project", "dataset", "table") + + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + df = bpd.read_gbq("project.dataset.table") + + # to_gbq should NOT raise an error, but instead return successfully + # and record the write! + df.to_gbq("project.dataset.output_table", if_exists="replace") + + assert "project.dataset.output_table" in pipeline.recorded_writes + + # Check the recorded write + array_value, dest_spec = pipeline.recorded_writes[ + "project.dataset.output_table" + ] + assert dest_spec.if_exists == "replace" + + # to_sql() should generate the SQL + sql = pipeline.to_sql() + assert "CREATE OR REPLACE TABLE `project.dataset.output_table` AS" in sql + + +def test_productionize_intercepts_udf(): + session = mocks.create_bigquery_session() + + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + # Define a UDF + @bpd.udf(input_types=[int], output_type=int, name="my_udf") + def add_one(x): + return x + 1 + + # The UDF should be recorded under the resolved routine name + expected_routine_name = "test-project.test_dataset.my_udf" + assert expected_routine_name in pipeline.recorded_udfs + + # to_sql should include the UDF DDL at the top + sql = pipeline.to_sql() + assert ( + "CREATE OR REPLACE FUNCTION `test-project.test_dataset.my_udf`" in sql + ) + assert sql.strip().startswith("CREATE OR REPLACE FUNCTION") + + +def test_productionize_blocks_remote_function(): + session = mocks.create_bigquery_session() + + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + # remote_function should raise an error immediately + with pytest.raises( + ProductionizeBlockedError, + match="Creating remote functions is not supported", + ): + + @bpd.remote_function( + input_types=[int], + output_type=int, + cloud_function_service_account="test-sa@project.iam.gserviceaccount.com", + ) + def remote_add_one(x): + return x + 1 + + +def test_productionize_parameter(): + session = mocks.create_bigquery_session() + _setup_mock_table(session, "project", "dataset", "table") + + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + # Define a parameter + limit_val = bpd.parameter("limit_val", dtype=int) + + df = bpd.read_gbq("project.dataset.table") + # Use the parameter in a filter + df_filtered = df[df["col"] > limit_val] + + # Write the result + df_filtered.to_gbq("project.dataset.output_table", if_exists="replace") + + # to_sql() should compile the SQL containing @limit_val + sql = pipeline.to_sql() + assert "@limit_val" in sql + # Verify it compiles to a valid comparison + assert "col` > @limit_val" in sql or "col > @limit_val" in sql + + +def test_productionize_parameter_dataform(tmp_path): + import os + + session = mocks.create_bigquery_session() + _setup_mock_table(session, "project", "dataset", "table") + + with global_session._GlobalSessionContext(session): + with bpd.productionize() as pipeline: + limit_val = bpd.parameter("limit_val", dtype=int) + df = bpd.read_gbq("project.dataset.table") + df_filtered = df[df["col"] > limit_val] + df_filtered.to_gbq("project.dataset.output_table", if_exists="replace") + + # Export to Dataform + target_dir = str(tmp_path) + pipeline.export_dataform(target_dir) + + # Check the generated .sqlx file + file_path = os.path.join(target_dir, "definitions", "output_table.sqlx") + assert os.path.exists(file_path) + + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + # The parameter @limit_val should be rewritten to ${dataform.projectConfig.vars.limit_val}! + assert "${dataform.projectConfig.vars.limit_val}" in content + assert "@limit_val" not in content diff --git a/packages/bigframes/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py b/packages/bigframes/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py index e6ab427be5e4..691915535c0b 100644 --- a/packages/bigframes/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py +++ b/packages/bigframes/third_party/bigframes_vendored/ibis/backends/sql/compilers/base.py @@ -688,6 +688,9 @@ def visit_ScalarSubquery(self, op, *, rel): def visit_Alias(self, op, *, arg, name): return arg + def visit_ScalarParameter(self, op, *, dtype, counter): + return sge.Parameter(this=sge.to_identifier(f"param_{counter}")) + def visit_Literal(self, op, *, value, dtype): """Compile a literal value. diff --git a/packages/bigframes/third_party/bigframes_vendored/ibis/expr/rewrites.py b/packages/bigframes/third_party/bigframes_vendored/ibis/expr/rewrites.py index 3ec5ea127141..4c816dce751c 100644 --- a/packages/bigframes/third_party/bigframes_vendored/ibis/expr/rewrites.py +++ b/packages/bigframes/third_party/bigframes_vendored/ibis/expr/rewrites.py @@ -200,7 +200,9 @@ def peel_join_field(_): @replace(p.ScalarParameter) def replace_parameter(_, params, **kwargs): """Replace scalar parameters with their values.""" - return ops.Literal(value=params[_], dtype=_.dtype) + if params and _ in params: + return ops.Literal(value=params[_], dtype=_.dtype) + return _ @replace(p.StringSlice)