Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 85 additions & 19 deletions packages/bigframes/bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,26 @@
import bigframes.core.bytecode as bytecode
import bigframes.core.expression as ex
import bigframes.core.ordering as ordering
import bigframes.core.window_spec as windows
import bigframes.core.window_spec as window_specs

windows = window_specs
import bigframes.dtypes as dtypes
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
from bigframes.core import agg_expressions, py_expressions


def apply_to_block_rows(
func: Callable, block: blocks.Block, *args, **kwargs
) -> blocks.Block:
"""
Apply the given function to each row of the block.

The function is applied to each row of the block, and the result is returned
as a new block with the same index.
"""
def compile_udf(
block: blocks.Block,
func: Callable,
args: tuple = (),
kwargs: dict | None = None,
col_series_args: typing.Mapping[str, str] | None = None,
window_spec: Optional[window_specs.WindowSpec] = None,
) -> ex.Expression:
"""Compile a python function to a BigFrames expression in the context of a block."""
if kwargs is None:
kwargs = {}
expr = bytecode._compile_bytecode_to_py_expr(func)
sig = inspect.signature(func)

Expand All @@ -54,17 +58,79 @@ def apply_to_block_rows(
for name, value in bound_params.items():
bindings[name] = ex.const(value)

expr = py_expressions.resolve_py_exprs(
expr,
series_arg=next(iter(sig.parameters.keys())),
series_attrs={
label: col_id
for label in block.column_labels
if (col_id := block.resolve_label_exact(label)) is not None
},
)
series_arg = next(iter(sig.parameters.keys()))

if col_series_args is not None:
expr = py_expressions.resolve_py_exprs(
expr,
series_arg=series_arg,
col_series_args=col_series_args,
window_spec=window_spec,
)
else:
series_attrs: dict = {}
for i, (col_id, label) in enumerate(
zip(block.value_columns, block.column_labels)
):
series_attrs[i] = col_id
if label is not None:
series_attrs[label] = col_id

expr = py_expressions.resolve_py_exprs(
expr,
series_arg=series_arg,
series_attrs=series_attrs,
window_spec=window_spec,
)

expr = expr.bind_variables(bindings)
return expr


def is_transpiler_eligible(func: typing.Any) -> bool:
"""Return True if func is eligible for Python transpilation."""
from bigframes._config import options

return (
options.experiments.enable_python_transpiler
and callable(func)
and not isinstance(func, bigframes.functions.Udf)
)
Comment on lines +90 to +98

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function is_transpiler_eligible references bigframes.functions.Udf [97], but bigframes is not imported in this module. This will result in a NameError at runtime. Please import bigframes or import Udf directly from bigframes.functions.

Suggested change
def is_transpiler_eligible(func: typing.Any) -> bool:
"""Return True if func is eligible for Python transpilation."""
from bigframes._config import options
return (
options.experiments.enable_python_transpiler
and callable(func)
and not isinstance(func, bigframes.functions.Udf)
)
def is_transpiler_eligible(func: typing.Any) -> bool:
"""Return True if func is eligible for Python transpilation."""
from bigframes._config import options
import bigframes.functions
return (
options.experiments.enable_python_transpiler
and callable(func)
and not isinstance(func, bigframes.functions.Udf)
)



def compile_column_udf(
block: blocks.Block,
func: Callable,
column_id: str,
args: tuple = (),
kwargs: dict | None = None,
window_spec: Optional[window_specs.WindowSpec] = None,
) -> tuple[ex.Expression, str]:
"""Compile a column-wise python UDF in block context and return (expr, name)."""
sig = inspect.signature(func)
series_arg = next(iter(sig.parameters.keys()))
expr = compile_udf(
block,
func,
args=args,
kwargs=kwargs,
col_series_args={series_arg: column_id},
window_spec=window_spec,
)
name = getattr(func, "__name__", "<lambda>")
return expr, name


def apply_to_block_rows(
func: Callable, block: blocks.Block, *args, **kwargs
) -> blocks.Block:
"""
Apply the given function to each row of the block.

The function is applied to each row of the block, and the result is returned
as a new block with the same index.
"""
expr = compile_udf(block, func, args, kwargs)
return block.project_exprs([expr], labels=[None], drop=True)


Expand Down
46 changes: 35 additions & 11 deletions packages/bigframes/bigframes/core/bytecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"//": operator.floordiv,
"%": operator.mod,
"**": operator.pow,
"[]": operator.getitem,
}

_COMPARE_OP_MAP = {
Expand Down Expand Up @@ -508,6 +509,18 @@ def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression:
)
)

case "BINARY_SUBSCR":
if len(stack) < 2:
raise ValueError("Stack has < 2 elements")
key = stack.pop()
container = stack.pop()
stack.append(
py_exprs.Call(
py_exprs.PyObject(operator.getitem),
(container, key),
)
)

case name if name in _OLD_BINARY_OP_MAP:
if len(stack) < 2:
raise ValueError("Stack has < 2 elements")
Expand Down Expand Up @@ -575,17 +588,28 @@ def _compile_bytecode_to_py_expr(func: Callable) -> expression.Expression:
if len(stack) < num_args:
raise ValueError(f"Stack has fewer than {num_args} elements")
args = [stack.pop() for _ in range(num_args)][::-1]
if len(stack) >= 2 and stack[-2] == _NULL:
stack[-1], stack[-2] = stack[-2], stack[-1]
if stack and stack[-1] == _NULL:
stack.pop()
elif (
stack
and stack[-1] != _NULL
and isinstance(stack[-1], expression.Expression)
):
self_arg = stack.pop()
args = [self_arg] + args

is_method_call = False
if opname == "CALL" or opname == "CALL_METHOD":
if len(stack) >= 2 and stack[-2] == _NULL:
stack[-1], stack[-2] = stack[-2], stack[-1]
if stack and stack[-1] == _NULL:
stack.pop()
is_method_call = False
else:
is_method_call = True
elif opname == "CALL_FUNCTION":
is_method_call = False

if is_method_call:
if (
stack
and stack[-1] != _NULL
and isinstance(stack[-1], expression.Expression)
):
self_arg = stack.pop()
args = [self_arg] + args

if not stack:
raise ValueError("Stack is empty")
callable_expr = stack.pop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,6 @@ def contains_regex_op_impl(x: ibis_types.Value, op: ops.StrContainsRegexOp):
return typing.cast(ibis_types.StringValue, x).re_search(op.pat)


@scalar_op_compiler.register_unary_op(ops.StrGetOp, pass_op=True)
def strget_op_impl(x: ibis_types.Value, op: ops.StrGetOp):
substr = typing.cast(
ibis_types.StringValue, typing.cast(ibis_types.StringValue, x)[op.i]
)
return substr.nullif(ibis_types.literal(""))


@scalar_op_compiler.register_unary_op(ops.StrPadOp, pass_op=True)
def strpad_op_impl(x: ibis_types.Value, op: ops.StrPadOp):
str_val = typing.cast(ibis_types.StringValue, x)
Expand Down Expand Up @@ -1059,13 +1051,39 @@ def array_to_string_op_impl(x: ibis_types.Value, op: ops.ArrayToStringOp):
return typing.cast(ibis_types.ArrayValue, x).join(op.delimiter)


@scalar_op_compiler.register_unary_op(ops.ArrayIndexOp, pass_op=True)
def array_index_op_impl(x: ibis_types.Value, op: ops.ArrayIndexOp):
res = typing.cast(ibis_types.ArrayValue, x)[op.index]
if x.type().is_string():
@scalar_op_compiler.register_unary_op(ops.GetItemOp, pass_op=True)
def getitem_op_impl(x: ibis_types.Value, op: ops.GetItemOp):
if x.type().is_struct():
struct_value = typing.cast(ibis_types.StructValue, x)
if isinstance(op.key, str):
name = op.key
else:
name = struct_value.names[op.key]
result = struct_value[name]
return result.cast(result.type()(nullable=True)).name(name)
elif x.type().is_array():
key = typing.cast(int, op.key)
res = typing.cast(ibis_types.ArrayValue, x)[key]
return res
elif x.type().is_string():
key = typing.cast(int, op.key)
res = typing.cast(ibis_types.StringValue, x)[key]
return _null_or_value(res, res != ibis_types.literal(""))
else:
return res
raise TypeError(f"Cannot subscript input of type {x.type()}")


@scalar_op_compiler.register_binary_op(ops.DynamicGetItemOp)
def dynamic_getitem_op_impl(left: ibis_types.Value, right: ibis_types.Value):
if left.type().is_array():
int_right = typing.cast(ibis_types.IntegerValue, right)
return typing.cast(ibis_types.ArrayValue, left)[int_right]
elif left.type().is_string():
scalar_right = typing.cast(ibis_types.IntegerScalar, right)
res = typing.cast(ibis_types.StringValue, left)[scalar_right]
return _null_or_value(res, res != ibis_types.literal(""))
else:
raise TypeError(f"Cannot dynamically subscript input of type {left.type()}")


@scalar_op_compiler.register_unary_op(ops.ArraySliceOp, pass_op=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# The ops imports appear first so that the implementations can be registered.
# polars shouldn't be needed at import time, as register is a no-op if polars
# isn't installed.
import bigframes.core.compile.polars.operations.array_ops # noqa: F401
import bigframes.core.compile.polars.operations.generic_ops # noqa: F401
import bigframes.core.compile.polars.operations.numeric_ops # noqa: F401
import bigframes.core.compile.polars.operations.struct_ops # noqa: F401
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
BigFrames -> Polars compilation for the operations in bigframes.operations.array_ops.
"""

from __future__ import annotations

from typing import TYPE_CHECKING

import bigframes.core.compile.polars.compiler as polars_compiler
import bigframes.dtypes as dtypes
from bigframes.operations import generic_ops

if TYPE_CHECKING:
import polars as pl


@polars_compiler.register_op(generic_ops.GetItemOp)
def getitem_op_impl(
compiler: polars_compiler.PolarsExpressionCompiler,
op: generic_ops.GetItemOp, # type: ignore
input: pl.Expr,
) -> pl.Expr:
input_type = compiler._expr_types.get(id(input))
if input_type is not None and dtypes.is_struct_like(input_type):
if isinstance(op.key, str):
return input.struct.field(op.key)
else:
raise NotImplementedError(
"Referencing a struct field by number not implemented in polars compiler."
)
else:
return input.list.get(op.key)
Comment on lines +31 to +46

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In getitem_op_impl, if the input type is string-like, calling input.list.get(op.key) will fail at runtime in Polars because strings are not list-like. We should handle string-like inputs by slicing them, similar to the logic in dynamic_getitem_op_impl.

Suggested change
@polars_compiler.register_op(generic_ops.GetItemOp)
def getitem_op_impl(
compiler: polars_compiler.PolarsExpressionCompiler,
op: generic_ops.GetItemOp, # type: ignore
input: pl.Expr,
) -> pl.Expr:
input_type = compiler._expr_types.get(id(input))
if input_type is not None and dtypes.is_struct_like(input_type):
if isinstance(op.key, str):
return input.struct.field(op.key)
else:
raise NotImplementedError(
"Referencing a struct field by number not implemented in polars compiler."
)
else:
return input.list.get(op.key)
@polars_compiler.register_op(generic_ops.GetItemOp)
def getitem_op_impl(
compiler: polars_compiler.PolarsExpressionCompiler,
op: generic_ops.GetItemOp, # type: ignore
input: pl.Expr,
) -> pl.Expr:
input_type = compiler._expr_types.get(id(input))
if input_type is not None and dtypes.is_struct_like(input_type):
if isinstance(op.key, str):
return input.struct.field(op.key)
else:
raise NotImplementedError(
"Referencing a struct field by number not implemented in polars compiler."
)
elif input_type is not None and dtypes.is_string_like(input_type):
return input.str.slice(op.key, 1)
else:
return input.list.get(op.key)



@polars_compiler.register_op(generic_ops.DynamicGetItemOp)
def dynamic_getitem_op_impl(
compiler: polars_compiler.PolarsExpressionCompiler,
op: generic_ops.DynamicGetItemOp, # type: ignore
left: pl.Expr,
right: pl.Expr,
) -> pl.Expr:
left_type = compiler._expr_types.get(id(left))
if left_type is not None and dtypes.is_string_like(left_type):
return left.str.slice(right, 1)
else:
return left.list.get(right)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import bigframes_vendored.sqlglot as sg
import bigframes_vendored.sqlglot.expressions as sge
import pandas as pd
import pyarrow as pa

import bigframes.core.compile.sqlglot.expression_compiler as expression_compiler
import bigframes.dtypes as dtypes
Expand All @@ -32,17 +34,56 @@
register_nary_op = expression_compiler.expression_compiler.register_nary_op


@register_unary_op(ops.ArrayIndexOp, pass_op=True)
def _(expr: TypedExpr, op: ops.ArrayIndexOp) -> sge.Expression:
if expr.dtype == dtypes.STRING_DTYPE:
return string_index(expr, op.index)
@register_unary_op(ops.GetItemOp, pass_op=True)
def _(expr: TypedExpr, op: ops.GetItemOp) -> sge.Expression:
if dtypes.is_struct_like(expr.dtype):
if isinstance(op.key, str):
name = op.key
else:
pa_type = typing.cast(pd.ArrowDtype, expr.dtype)
pa_struct_type = typing.cast(pa.StructType, pa_type.pyarrow_dtype)
name = pa_struct_type.field(op.key).name

return sge.Bracket(
this=expr.expr,
expressions=[sge.convert(op.index)],
safe=True,
offset=False,
)
return sge.Column(
this=sge.to_identifier(name, quoted=True),
catalog=expr.expr,
)
elif dtypes.is_array_like(expr.dtype):
return sge.Bracket(
this=expr.expr,
expressions=[sge.convert(op.key)],
safe=True,
offset=False,
)
elif expr.dtype == dtypes.STRING_DTYPE:
return string_index(expr, typing.cast(int, op.key))
else:
raise TypeError(f"Cannot subscript input of type {expr.dtype}")


@register_nary_op(ops.DynamicGetItemOp) # type: ignore[arg-type]
def _(left: TypedExpr, right: TypedExpr) -> sge.Expression:
if dtypes.is_array_like(left.dtype):
return sge.Bracket(
this=left.expr,
expressions=[right.expr],
safe=True,
offset=False,
)
elif left.dtype == dtypes.STRING_DTYPE:
start_expr = sge.Add(this=right.expr, expression=sge.convert(1))
sub_str = sge.Substring(
this=left.expr,
start=start_expr,
length=sge.convert(1),
)
return sge.If(
this=sge.NEQ(this=sub_str, expression=sge.convert("")),
true=sub_str,
false=sge.Null(),
)
else:
raise TypeError(f"Cannot dynamically subscript input of type {left.dtype}")


@register_unary_op(ops.ArrayReduceOp, pass_op=True)
Expand Down
Loading
Loading