Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ def _(
expression.value, expression.dtype
)

@compile_expression.register
def _(
self,
expression: ex.ParameterExpression,
bindings: typing.Dict[str, ibis_types.Value],
) -> ibis_types.Value:
import bigframes.core.compile.ibis_types as ibis_types_mod

ibis_type = ibis_types_mod.bigframes_dtype_to_ibis_dtype(expression.dtype)
p = bigframes_vendored.ibis.param(ibis_type)

# Record the mapping from Ibis auto-generated parameter name to user's parameter name
from bigframes.session.productionize import _state as prod_state

if prod_state.active and prod_state.pipeline is not None:
counter = p.op().counter
prod_state.pipeline.recorded_params[f"param_{counter}"] = expression.name

return p

@compile_expression.register
def _(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def _(self, expr: ex.DerefOp) -> sge.Expression:
def _(self, expr: ex.ScalarConstantExpression) -> sge.Expression:
return sql.literal(expr.value, expr.dtype)

@compile_expression.register
def _(self, expr: ex.ParameterExpression) -> sge.Expression:
return sge.Parameter(this=sge.to_identifier(expr.name))
Comment on lines +82 to +84

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the SQLGlot compiler, ParameterExpression is compiled directly without recording the parameter name in recorded_params. This means export_dataform won't know about the parameters when SQLGlot is used as the compiler, leading to unreplaced parameters in the exported Dataform files. We should record the parameter name when productionize mode is active.

Suggested change
@compile_expression.register
def _(self, expr: ex.ParameterExpression) -> sge.Expression:
return sge.Parameter(this=sge.to_identifier(expr.name))
@compile_expression.register
def _(self, expr: ex.ParameterExpression) -> sge.Expression:
from bigframes.session.productionize import _state as prod_state
if prod_state.active and prod_state.pipeline is not None:
prod_state.pipeline.recorded_params[expr.name] = expr.name
return sge.Parameter(this=sge.to_identifier(expr.name))


@compile_expression.register
def _(self, expr: agg_exprs.WindowExpression) -> sge.Expression:
import bigframes.core.compile.sqlglot.aggregate_compiler as agg_compile
Expand Down
68 changes: 65 additions & 3 deletions packages/bigframes/bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import bigframes.operations


def const(
value: typing.Hashable, dtype: dtypes.ExpressionType = None
) -> ScalarConstantExpression:
def const(value: typing.Hashable, dtype: dtypes.ExpressionType = None) -> Expression:
if isinstance(value, Parameter):
return ParameterExpression(value.name, value.dtype)
return ScalarConstantExpression(value, dtype or dtypes.infer_literal_type(value))


Expand Down Expand Up @@ -531,3 +531,65 @@ def bind_schema_fields(


RefOrConstant = Union[DerefOp, ScalarConstantExpression]


@dataclasses.dataclass(frozen=True)
class Parameter:
"""Represents a named parameter in a productionized pipeline."""

name: str
dtype: dtypes.ExpressionType


@dataclasses.dataclass(frozen=True)
class ParameterExpression(Expression):
"""An expression representing a query parameter."""

name: str
dtype: dtypes.ExpressionType

@property
def is_const(self) -> bool:
return True

@property
def column_references(self) -> typing.Tuple[ids.ColumnId, ...]:
return ()

@property
def nullable(self) -> bool:
return True

@property
def is_resolved(self) -> bool:
return True

@property
def output_type(self) -> dtypes.ExpressionType:
return self.dtype

def bind_variables(
self,
bindings: Mapping[Hashable, Expression],
allow_partial_bindings: bool = False,
) -> Expression:
return self

def bind_refs(
self,
bindings: Mapping[ids.ColumnId, Expression],
allow_partial_bindings: bool = False,
) -> ParameterExpression:
return self

@property
def is_bijective(self) -> bool:
return True

def __eq__(self, other):
if not isinstance(other, ParameterExpression):
return False
return self.name == other.name and self.dtype == other.dtype

def transform_children(self, t: Callable[[Expression], Expression]) -> Expression:
return self
3 changes: 3 additions & 0 deletions packages/bigframes/bigframes/core/rewrite/timedeltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def _rewrite_expressions(expr: ex.Expression, schema: schema.ArraySchema) -> _Ty
if isinstance(expr, ex.ScalarConstantExpression):
return _rewrite_scalar_constant_expr(expr)

if isinstance(expr, ex.ParameterExpression):
return _TypedExpr(expr, expr.dtype)

if isinstance(expr, ex.OpExpression):
updated_inputs = tuple(
map(lambda x: _rewrite_expressions(x, schema), expr.inputs)
Expand Down
23 changes: 12 additions & 11 deletions packages/bigframes/bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,12 @@ def create_bq_remote_function(
)
self._create_bq_function(create_function_ddl)

def provision_bq_managed_function(
def generate_bq_managed_function_ddl(
self,
routine_ref: bigquery.RoutineReference,
config: udf_def.ManagedFunctionConfig,
):
"""Create a BigQuery managed function."""

# TODO(b/406283812): Expose the capability to pass down
# capture_references=True in the public udf API.
) -> str:
"""Generate the CREATE FUNCTION DDL for a BigQuery managed function."""
if (
config.capture_references
and (python_version := _utils.get_python_version())
Expand All @@ -232,8 +229,7 @@ def provision_bq_managed_function(
if config.container_memory:
managed_function_options["container_memory"] = config.container_memory

# Augment user package requirements with any internal package
# requirements.
# Augment user package requirements with any internal package requirements.
packages = _utils.get_updated_package_requirements(
config.code.package_requirements or [],
config.signature.is_row_processor,
Expand All @@ -258,9 +254,6 @@ def provision_bq_managed_function(
else ""
)

# Generate the complete Python code block for the managed Python UDF,
# including the user's function, necessary imports, and the BigQuery
# handler wrapper.
python_code_block = bff_template.generate_managed_function_code(
config.code, config.signature, config.capture_references
)
Expand All @@ -281,7 +274,15 @@ def provision_bq_managed_function(
.strip()
.replace("__UDF_PLACE_HOLDER__", python_code_block)
)
return create_function_ddl

def provision_bq_managed_function(
self,
routine_ref: bigquery.RoutineReference,
config: udf_def.ManagedFunctionConfig,
):
"""Create a BigQuery managed function."""
create_function_ddl = self.generate_bq_managed_function_ddl(routine_ref, config)
self._ensure_dataset_exists(
bigquery.DatasetReference(routine_ref.project, routine_ref.dataset_id)
)
Expand Down
47 changes: 47 additions & 0 deletions packages/bigframes/bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ def _deploy_managed_function(
temp: bool,
dataset: Optional[bigquery.DatasetReference] = None,
) -> udf_def.BigqueryUdf:
from bigframes.session.productionize import _state as prod_state

if prod_state.active:
routine_ref = self._resolve_routine_reference(name, dataset=dataset)
ddl = self._function_client.generate_bq_managed_function_ddl(
routine_ref, config
)
prod_state.pipeline.recorded_udfs[str(routine_ref)] = ddl
return udf_def.BigqueryUdf(
routine_ref=routine_ref,
signature=config.signature,
)

routine_ref = self._resolve_routine_reference(name, dataset=dataset)
if temp:
self._add_temp_remote_function(routine_ref)
Expand All @@ -183,6 +196,28 @@ def _deploy_udf(
self,
bq_udf: udf_def.PythonUdf,
) -> udf_def.BigqueryUdf:
"""Deploys a UDF to BigQuery if not already deployed."""
from bigframes.session.productionize import _state as prod_state

if prod_state.active:
config = bq_udf.to_managed_function_config()
bq_function_name = get_managed_function_name(config, self.session_id)
routine_ref = self._resolve_routine_reference(bq_function_name)

ddl = self._function_client.generate_bq_managed_function_ddl(
routine_ref, config
)
prod_state.pipeline.recorded_udfs[str(routine_ref)] = ddl

udf_hash = bq_udf.stable_hash()
with self._artifacts_lock:
self._deployed_routines.add(udf_hash)

return udf_def.BigqueryUdf(
routine_ref=routine_ref,
signature=bq_udf.signature,
)

"""Deploys a UDF to BigQuery if not already deployed."""
udf_hash = bq_udf.stable_hash()

Expand Down Expand Up @@ -261,6 +296,18 @@ def remote_function(
] = "internal-only",
cloud_build_service_account: Optional[str] = None,
):
from bigframes.session.productionize import (
ProductionizeBlockedError,
)
from bigframes.session.productionize import (
_state as prod_state,
)

if prod_state.active:
raise ProductionizeBlockedError(
"Creating remote functions is not supported in productionize mode."
)

"""Decorator to turn a user defined function into a BigQuery remote function.

.. deprecated:: 0.0.1
Expand Down
3 changes: 3 additions & 0 deletions packages/bigframes/bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
read_parquet,
read_pickle,
)
from bigframes.session.productionize import parameter, productionize

try:
import resource
Expand Down Expand Up @@ -525,6 +526,8 @@ def reset_session():
"close_session",
"reset_session",
"udf",
"parameter",
"productionize",
]

_module = sys.modules[__name__]
Expand Down
Loading
Loading