Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9aa0901
Add WASM compatibility for componentize-py builds
kesmit13 Mar 19, 2026
73dcaad
Add WIT interface definition for WASM UDF components
kesmit13 Mar 19, 2026
49521d1
Add type annotations to WASM numpy stub and UDF handler
kesmit13 Mar 20, 2026
5d1e9e6
Add code generation for UDF handler function registration
kesmit13 Mar 23, 2026
aed95df
Replace top-level optional imports with lazy import helpers
kesmit13 Mar 23, 2026
24df239
Fix Python 3.10+ union syntax in udf_handler type annotation
kesmit13 Mar 23, 2026
5eef5fd
feat: add call_function_accel C function to accel.c
kesmit13 Mar 23, 2026
0e4bb40
Add WASM build script for wasm32-wasip2 target
kesmit13 Mar 25, 2026
a18eb52
Remove WASM numpy_stub, now unnecessary with lazy imports
kesmit13 Mar 25, 2026
248f897
Add collocated Python UDF server with pre-fork process mode
kesmit13 Mar 27, 2026
e4d8627
Fix @@register propagation in collocated server process mode
kesmit13 Mar 31, 2026
1b90ab6
Fix broken pipe in collocated UDF server under concurrent load
kesmit13 Apr 1, 2026
c732dff
Guard np.dtype check in normalize_dtype for environments without numpy
kesmit13 Apr 1, 2026
1b62a6c
Guard WASI-incompatible POSIX APIs in accel.c with #ifndef __wasi__
kesmit13 Apr 1, 2026
c65793a
Call setup_logging() in FunctionHandler.initialize() for WASM handler
kesmit13 Apr 1, 2026
b66653a
Add WASI stubs for mmap_read/mmap_write/recv_exact and fix accel logging
kesmit13 Apr 1, 2026
34e2452
Address PR #121 review comments: memory safety, correctness, hardening
kesmit13 Apr 2, 2026
fe1d8ad
Fix recv_exact protocol desync and unchecked PyObject_Length returns
kesmit13 Apr 2, 2026
548edcc
Fix _iquery DataFrame conversion for non-tuple results_type
kesmit13 Apr 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
891 changes: 874 additions & 17 deletions accel.c

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ dev = [
"singlestoredb[test,docs,build]",
]

[project.scripts]
python-udf-server = "singlestoredb.functions.ext.collocated.__main__:main"

[project.entry-points.pytest11]
singlestoredb = "singlestoredb.pytest"

Expand Down
41 changes: 41 additions & 0 deletions resources/build_wasm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash

set -eou pipefail

# CPYTHON_ROOT must contain a build of cpython for wasm32-wasip2

TARGET="wasm32-wasip2"
CROSS_BUILD="${CPYTHON_ROOT}/cross-build/${TARGET}"
WASI_SDK_PATH=${WASI_SDK_PATH:-/opt/wasi-sdk}
PYTHON_VERSION=$(grep '^VERSION=' "${CROSS_BUILD}/Makefile" | sed 's/VERSION=[[:space:]]*//')

if [ ! -e wasm_venv ]; then
uv venv --python ${PYTHON_VERSION} wasm_venv
fi

. wasm_venv/bin/activate

HOST_PYTHON=$(which python3)

uv pip install build wheel cython setuptools

ARCH_TRIPLET=_wasi_wasm32-wasi

export CC="${WASI_SDK_PATH}/bin/clang"
export CXX="${WASI_SDK_PATH}/bin/clang++"

export PYTHONPATH="${CROSS_BUILD}/build/lib.wasi-wasm32-${PYTHON_VERSION}"

export CFLAGS="--target=${TARGET} -fPIC -I${CROSS_BUILD}/install/include/python${PYTHON_VERSION} -D__EMSCRIPTEN__=1"
export CXXFLAGS="--target=${TARGET} -fPIC -I${CROSS_BUILD}/install/include/python${PYTHON_VERSION}"
export LDSHARED=${CC}
export AR="${WASI_SDK_PATH}/bin/ar"
export RANLIB=true
export LDFLAGS="--target=${TARGET} -shared -Wl,--allow-undefined"
export _PYTHON_SYSCONFIGDATA_NAME=_sysconfigdata__wasi_wasm32-wasi
export _PYTHON_HOST_PLATFORM=wasm32-wasi

python3 -m build -n -w
wheel unpack --dest build dist/*.whl

rm -rf ./wasm_venv
3 changes: 1 addition & 2 deletions singlestoredb/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from typing import Optional
from typing import Union

import jwt


# Credential types
PASSWORD = 'password'
Expand Down Expand Up @@ -42,6 +40,7 @@ def __init__(
@classmethod
def from_token(cls, token: bytes, verify_signature: bool = False) -> 'JSONWebToken':
"""Validate the contents of the JWT."""
import jwt
info = jwt.decode(token, options={'verify_signature': verify_signature})

if not info.get('sub', None) and not info.get('username', None):
Expand Down
64 changes: 50 additions & 14 deletions singlestoredb/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
from urllib.parse import urlparse

import sqlparams
try:
from pandas import DataFrame
except ImportError:
class DataFrame(object): # type: ignore
def itertuples(self, *args: Any, **kwargs: Any) -> None:
pass

from . import auth
from . import exceptions
Expand Down Expand Up @@ -1172,17 +1166,59 @@ def _iquery(
cur.execute(oper, params)
if not re.match(r'^\s*(select|show|call|echo)\s+', oper, flags=re.I):
return []
out = list(cur.fetchall())
if not out:
raw = cur.fetchall()
if raw is None:
return []
if isinstance(out, DataFrame):
out = out.to_dict(orient='records')
elif isinstance(out[0], (tuple, list)):
# pandas DataFrame
if hasattr(raw, 'to_dict') and hasattr(raw, 'columns'):
out = raw.to_dict(orient='records')
# polars DataFrame
elif hasattr(raw, 'to_dicts') and callable(raw.to_dicts):
out = raw.to_dicts()
# arrow Table
elif hasattr(raw, 'to_pydict') and callable(raw.to_pydict):
d = raw.to_pydict()
cols = list(d.keys())
n = len(next(iter(d.values()))) if d else 0
out = [{c: d[c][i] for c in cols} for i in range(n)]
# numpy ndarray
elif hasattr(raw, 'tolist') and hasattr(raw, 'ndim'):
rows = raw.tolist()
if cur.description:
names = [x[0] for x in cur.description]
if fix_names:
names = [under2camel(str(x).replace(' ', '')) for x in names]
out = [{k: v for k, v in zip(names, row)} for row in out]
out = [
{k: v for k, v in zip(names, row)}
for row in rows
]
else:
return []
# list of tuples/namedtuples/dicts
else:
out = list(raw)
if not out:
return []
if isinstance(out[0], dict):
pass # already dicts
elif isinstance(out[0], (tuple, list)):
if cur.description:
names = [x[0] for x in cur.description]
out = [
{k: v for k, v in zip(names, row)}
for row in out
]
else:
return []
if not out:
return []
# Apply camelCase name conversion if requested
if fix_names:
out = [
{
under2camel(str(k).replace(' ', '')): v
for k, v in row.items()
}
for row in out
]
return out

@abc.abstractmethod
Expand Down
76 changes: 43 additions & 33 deletions singlestoredb/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
except (AttributeError, ImportError):
has_pygeos = False

try:
import numpy
has_numpy = True
except ImportError:
has_numpy = False
from .utils._lazy_import import get_numpy

try:
import bson
Expand Down Expand Up @@ -563,8 +559,9 @@ def float32_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.float32)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.float32)

return map(float, json_loads(x))

Expand All @@ -591,8 +588,9 @@ def float32_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.frombuffer(x, dtype=numpy.float32)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.float32)

return struct.unpack(f'<{len(x)//4}f', x)

Expand All @@ -619,8 +617,9 @@ def float16_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.float16)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.float16)

return map(float, json_loads(x))

Expand All @@ -647,8 +646,9 @@ def float16_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.frombuffer(x, dtype=numpy.float16)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.float16)

return struct.unpack(f'<{len(x)//2}e', x)

Expand All @@ -675,8 +675,9 @@ def float64_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.float64)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.float64)

return map(float, json_loads(x))

Expand All @@ -703,8 +704,9 @@ def float64_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.frombuffer(x, dtype=numpy.float64)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.float64)

return struct.unpack(f'<{len(x)//8}d', x)

Expand All @@ -731,8 +733,9 @@ def int8_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.int8)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.int8)

return map(int, json_loads(x))

Expand All @@ -759,8 +762,9 @@ def int8_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.frombuffer(x, dtype=numpy.int8)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.int8)

return struct.unpack(f'<{len(x)}b', x)

Expand All @@ -787,8 +791,9 @@ def int16_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.int16)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.int16)

return map(int, json_loads(x))

Expand All @@ -815,8 +820,9 @@ def int16_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.frombuffer(x, dtype=numpy.int16)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.int16)

return struct.unpack(f'<{len(x)//2}h', x)

Expand All @@ -843,8 +849,9 @@ def int32_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.int32)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.int32)

return map(int, json_loads(x))

Expand All @@ -871,8 +878,9 @@ def int32_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.frombuffer(x, dtype=numpy.int32)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.int32)

return struct.unpack(f'<{len(x)//4}l', x)

Expand All @@ -899,8 +907,9 @@ def int64_vector_json_or_none(x: Optional[str]) -> Optional[Any]:
if x is None:
return None

if has_numpy:
return numpy.array(json_loads(x), dtype=numpy.int64)
np = get_numpy()
if np is not None:
return np.array(json_loads(x), dtype=np.int64)

return map(int, json_loads(x))

Expand Down Expand Up @@ -928,8 +937,9 @@ def int64_vector_or_none(x: Optional[bytes]) -> Optional[Any]:
return None

# Bytes
if has_numpy:
return numpy.frombuffer(x, dtype=numpy.int64)
np = get_numpy()
if np is not None:
return np.frombuffer(x, dtype=np.int64)

return struct.unpack(f'<{len(x)//8}l', x)

Expand Down
7 changes: 3 additions & 4 deletions singlestoredb/functions/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
from ..converters import converters
from ..mysql.converters import escape_item # type: ignore
from ..utils.dtypes import DEFAULT_VALUES # noqa
from ..utils.dtypes import NUMPY_TYPE_MAP # noqa
from ..utils.dtypes import PANDAS_TYPE_MAP # noqa
from ..utils.dtypes import POLARS_TYPE_MAP # noqa
from ..utils.dtypes import PYARROW_TYPE_MAP # noqa
from ..utils.dtypes import get_numpy_type_map # noqa
from ..utils.dtypes import get_polars_type_map # noqa
from ..utils.dtypes import get_pyarrow_type_map # noqa


DataType = Union[str, Callable[..., Any]]
Expand Down
1 change: 1 addition & 0 deletions singlestoredb/functions/ext/collocated/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""High-performance collocated Python UDF server for SingleStoreDB."""
Loading
Loading