Skip to content

[BUG] GenericResult[T] Cloudpickle Fails In Fresh Processes When Serialized By Global Reference #210

@NeejWeej

Description

@NeejWeej

Description

GenericResult[int] is a runtime-created Pydantic generic specialization. When
that specialization is created at module top level, Pydantic registers it in
ccflow.result.generic under the generated name GenericResult[int] so pickle
can serialize it by global reference.

That global registration is process-local. If one process cloudpickles a
GenericResult[int] instance by reference, and a fresh process or Ray worker
tries to unpickle it before that worker has created the same specialization,
unpickle fails with:

AttributeError: Can't get attribute 'GenericResult[int]' on
<module 'ccflow.result.generic' ...>

It is a base serialization issue for
runtime-created Pydantic generic result classes. @Flow.model (from #206) and Ray make the
bug much easier to hit because generated model state or distributed worker
payloads can contain GenericResult[T] instances without importing/re-evaluating
the source module in a way that materializes the specialization first.

The behavior is sensitive to where GenericResult[int] is first evaluated:

  • If GenericResult[int] is evaluated at module top level, Pydantic installs a
    module global and cloudpickle may serialize by global reference. This fails in
    a fresh process that lacks that process-local global.
  • If GenericResult[int] is created only inside a function, cloudpickle may
    serialize the dynamic class by value instead. That can pass, which masks the
    issue and makes the failure look import-order dependent.

Steps to Reproduce

Repro 1: plain cloudpickle, no Ray, no @Flow.model

  1. Save or run this script:
# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "ccflow",
#   "cloudpickle",
# ]
# ///

import base64
import subprocess
import sys


def run_case(label, dumps_kwargs):
    creator_script = f"""
import base64
import cloudpickle
from ccflow import GenericResult

print(base64.b64encode(cloudpickle.dumps(GenericResult[int](value=5), **{dumps_kwargs!r})).decode())
"""
    creator = subprocess.run([sys.executable, "-c", creator_script], capture_output=True, text=True)
    blob = creator.stdout.strip()

    loader_script = f"""
import base64
import cloudpickle
import ccflow.result.generic as generic_module
from ccflow import GenericResult

print("has GenericResult[int] before unpickle:", hasattr(generic_module, "GenericResult[int]"))
obj = cloudpickle.loads(base64.b64decode({blob!r}))
print("unpickled:", obj)
"""
    result = subprocess.run([sys.executable, "-c", loader_script], capture_output=True, text=True)
    print(f"\\n=== {label} ===")
    print("returncode:", result.returncode)
    print("stdout:", result.stdout.strip() or "<empty>")
    print("stderr:", result.stderr.strip() or "<empty>")


run_case("cloudpickle default protocol", {})
run_case("cloudpickle protocol=5", {"protocol": 5})
  1. Run it with uv:
uv run python repro_generic_result_cloudpickle.py
  1. Observe that both default cloudpickle protocol and protocol=5 fail in the
    fresh loader process.

Repro 2: Ray worker, no @Flow.model

  1. Save or run this script:
# /// script
# requires-python = ">=3.12"
# dependencies = [
#   "ccflow",
#   "cloudpickle",
#   "ray",
# ]
# ///

import base64
from typing import Any

import cloudpickle
import ray

from ccflow import GenericResult


# Toggle this flag:
# - True: materializes GenericResult[int] at module top level and reproduces the failure.
# - False: avoids the top-level global-reference path and can pass, which shows why
#   this bug is sensitive to where the specialization is first evaluated.
top_level_eval = True

if top_level_eval:
    SPECIALIZED_RESULT = GenericResult[int]


@ray.remote
def load_payload(label: str, encoded_payload: str) -> tuple[str, bool, str]:
    import base64
    import traceback

    import ccflow.result.generic as generic_module
    import cloudpickle
    from ccflow import GenericResult  # noqa: F401

    has_specialization = hasattr(generic_module, "GenericResult[int]")
    try:
        obj = cloudpickle.loads(base64.b64decode(encoded_payload))
    except Exception:
        return label, has_specialization, traceback.format_exc()
    return label, has_specialization, f"unpickled: {obj!r}"


def make_payload(*, dumps_kwargs: dict[str, Any]) -> str:
    if top_level_eval:
        payload = cloudpickle.dumps(SPECIALIZED_RESULT(value=5), **dumps_kwargs)
    else:
        payload = cloudpickle.dumps(GenericResult[int](value=5), **dumps_kwargs)
    return base64.b64encode(payload).decode()


ray.init(num_cpus=1)
try:
    cases = {
        "cloudpickle default protocol": make_payload(dumps_kwargs={}),
        "cloudpickle protocol=5": make_payload(dumps_kwargs={"protocol": 5}),
    }
    refs = [load_payload.remote(label, payload) for label, payload in cases.items()]
    for label, has_specialization, result in ray.get(refs):
        print(f"\\n=== {label} ===")
        print("worker has GenericResult[int] before unpickle:", has_specialization)
        print(result.strip())
finally:
    ray.shutdown()
  1. In this repo, run Ray repros with uv targeting the active venv so Ray workers
    inherit ray correctly:
source .venv/bin/activate
uv run --active python repro_generic_result_ray_cloudpickle.py
  1. With top_level_eval = True, observe the worker-side unpickle failure.

  2. Set top_level_eval = False and run again. The script can pass because the
    specialization is no longer serialized through the same top-level
    global-reference path. This confirms the bug is tied to Pydantic's
    process-local global registration side effect, not to @Flow.model.

Expected Behavior

A GenericResult[T] instance should be portable through cloudpickle across a
fresh process or Ray worker.

In particular, this should work regardless of whether GenericResult[int] has
already been materialized in the receiving process:

payload = cloudpickle.dumps(GenericResult[int](value=5))
restored = cloudpickle.loads(payload)
assert restored == GenericResult[int](value=5)

The same should hold when the payload is sent to a Ray worker.

Actual Behavior

The receiving process imports GenericResult, but GenericResult[int] is not
present in ccflow.result.generic before unpickle:

has GenericResult[int] before unpickle: False

Plain cloudpickle/fresh-process output:

=== cloudpickle default protocol ===
returncode: 1
stdout: has GenericResult[int] before unpickle: False
stderr: Traceback (most recent call last):
  File "<string>", line 11, in <module>
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>

=== cloudpickle protocol=5 ===
returncode: 1
stdout: has GenericResult[int] before unpickle: False
stderr: Traceback (most recent call last):
  File "<string>", line 11, in <module>
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>

Ray worker output with top_level_eval = True:

=== cloudpickle default protocol ===
worker has GenericResult[int] before unpickle: False
Traceback (most recent call last):
  File "/Users/neej/dev/ccflow/repro_generic_result_ray_cloudpickle.py", line 55, in load_payload
    obj = cloudpickle.loads(base64.b64decode(encoded_payload))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>

=== cloudpickle protocol=5 ===
worker has GenericResult[int] before unpickle: False
Traceback (most recent call last):
  File "/Users/neej/dev/ccflow/repro_generic_result_ray_cloudpickle.py", line 55, in load_payload
    obj = cloudpickle.loads(base64.b64decode(encoded_payload))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'GenericResult[int]' on <module 'ccflow.result.generic' from '/Users/neej/dev/ccflow/ccflow/result/generic.py'>

This reproduces with both cloudpickle's default protocol and protocol=5, so
the failure is not caused by protocol 5.

Environment

  • OS: macOS 26.2, build 25C56
  • Python version: Python 3.12.12 (uv run python --version)
  • Package version: ccflow 0.8.3
  • cloudpickle: 3.1.2
  • ray: 2.55.1
  • pydantic: 2.13.4
  • cclfow: main (commit: 17905fb )

Package versions were collected with:

uv run python -c "import importlib.metadata as m; print('ccflow', m.version('ccflow')); print('cloudpickle', m.version('cloudpickle')); print('ray', m.version('ray')); print('pydantic', m.version('pydantic'))"

Additional Context

Pydantic's generic specialization creation mutates module globals only in some
call contexts. This can be observed directly:

import ccflow.result.generic as generic_module
from ccflow import GenericResult

assert not hasattr(generic_module, "GenericResult[int]")

cls = GenericResult[int]

assert hasattr(generic_module, "GenericResult[int]")
assert generic_module.__dict__["GenericResult[int]"] is cls

That side effect is process-local. A fresh Ray worker may import
ccflow.result.generic and GenericResult without ever evaluating
GenericResult[int], so the module global expected by pickle does not exist.

The robust fix should avoid relying on Pydantic's process-local global
registration for ccflow generic result/context instances. Instead, generic
instances should be serialized by contract:

  1. origin class, e.g. GenericResult;
  2. generic args, e.g. (int,);
  3. Pydantic instance state.

Conceptually:

def restore_generic_pydantic_instance(origin, args, state):
    cls = origin[args] if args else origin
    obj = cls.__new__(cls)
    obj.__setstate__(state)
    return obj

This should probably be fixed at the ResultBase/ContextBase serialization
layer.

One uv/Ray caveat from this repo: plain uv run python repro_generic_result_ray_cloudpickle.py may cause Ray workers to bootstrap a
temporary uv environment that does not include ray, producing
ModuleNotFoundError: No module named 'ray' before the intended repro executes.
Using uv run --active from the repo venv reaches the actual cloudpickle
failure shown above.

Metadata

Metadata

Assignees

No one assigned

    Labels

    type: bugConcrete, reproducible bugs
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions