diff --git a/ccflow/base.py b/ccflow/base.py index 60289cd..6570a98 100644 --- a/ccflow/base.py +++ b/ccflow/base.py @@ -7,9 +7,28 @@ import pathlib import platform import sys +import types +import typing import warnings +from functools import singledispatch from types import GenericAlias, MappingProxyType -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union, get_args, get_origin +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ClassVar, + Dict, + Generic, + List, + NamedTuple, + Optional, + Tuple, + Type, + TypeVar, + Union, + get_args, + get_origin, +) from packaging import version @@ -165,6 +184,155 @@ def __new__(self, name: str, bases: Tuple[type], namespaces: Dict[str, Any], **k return super().__new__(self, name, bases, namespaces, **kwargs) +class _PydanticGenericTypeSpec(NamedTuple): + # Pickle must not rely on process-local names like ``GenericResult[int]``. + # Store generic args as data so the receiver can rebuild ``origin[args]``. + origin: Type[PydanticBaseModel] + args: Tuple[Any, ...] + + +class _GenericAliasTypeSpec(NamedTuple): + # Builtin/typing aliases such as ``list[GenericResult[int]]`` are normally + # pickleable, but their nested Pydantic generic args may not be. Only wrap + # aliases when at least one nested arg had to become portable data. + origin: Any + args: Tuple[Any, ...] + typing_name: Optional[str] + + +def _is_pydantic_generic_specialization(value: Any) -> bool: + if not isinstance(value, type): + return False + metadata = getattr(value, "__pydantic_generic_metadata__", None) + return bool(metadata and metadata.get("origin") is not None) + + +@singledispatch +def _portable_generic_type_arg(value: Any) -> Any: + """Convert fragile generic type arguments into pickleable rebuild specs. + + The top-level reducer stores ``origin`` and ``args`` for a model instance, + for example ``GenericResult`` and ``(int,)`` for ``GenericResult[int]``. + Some args are themselves runtime Pydantic generic classes, such as the + ``ListResult[int]`` in ``GenericResult[ListResult[int]]``. Those nested + classes have the same cross-process problem as the top-level class: pickle + may look for a process-local global named ``ListResult[int]``. + + This helper walks only type arguments, not model field values. Plain + importable args like ``int`` are left alone. Pydantic generic + specializations become explicit ``(origin, args)`` specs. Generic aliases + like ``list[ListResult[int]]`` are wrapped only if one of their nested args + needed that conversion. Some type args contain list/tuple containers, such + as the argument list in ``typing.Callable[[ListResult[int]], int]``; those + containers are also walked because they are part of the type expression. + """ + if _is_pydantic_generic_specialization(value): + # Example: ``ListResult[int]`` becomes + # ``_PydanticGenericTypeSpec(ListResult, (int,))``. This avoids storing + # the generated class object that may not be globally registered in the + # receiving process. + metadata = value.__pydantic_generic_metadata__ + return _PydanticGenericTypeSpec( + metadata["origin"], + tuple(_portable_generic_type_arg(arg) for arg in metadata.get("args", ())), + ) + + origin = get_origin(value) + args = get_args(value) + if origin is not None and args: + # Example: ``list[ListResult[int]]`` is a normal generic alias, but it + # contains a fragile Pydantic specialization. Rebuild the alias from a + # portable version of its args only when recursion changed something. + portable_args = tuple(_portable_generic_type_arg(arg) for arg in args) + if portable_args != args: + typing_name = getattr(value, "_name", None) if getattr(value, "__module__", None) == "typing" else None + return _GenericAliasTypeSpec(origin, portable_args, typing_name) + return value + + +@_portable_generic_type_arg.register(list) +def _(value: list) -> Any: + portable_items = [_portable_generic_type_arg(item) for item in value] + return portable_items if portable_items != value else value + + +@_portable_generic_type_arg.register(tuple) +def _(value: tuple) -> Any: + portable_items = tuple(_portable_generic_type_arg(item) for item in value) + return portable_items if portable_items != value else value + + +@singledispatch +def _restore_generic_type_arg(value: Any) -> Any: + return value + + +@_restore_generic_type_arg.register(_PydanticGenericTypeSpec) +def _(value: _PydanticGenericTypeSpec) -> Any: + origin, args = value + return origin[tuple(_restore_generic_type_arg(arg) for arg in args)] + + +@_restore_generic_type_arg.register(_GenericAliasTypeSpec) +def _(value: _GenericAliasTypeSpec) -> Any: + origin, args, typing_name = value + restored_args = tuple(_restore_generic_type_arg(arg) for arg in args) + if typing_name is not None: + alias = getattr(typing, typing_name) + if typing_name == "Optional": + non_none_args = tuple(arg for arg in restored_args if arg is not type(None)) + if len(non_none_args) == 1: + return alias[non_none_args[0]] + return alias[restored_args] + try: + return origin[restored_args] + except TypeError as exc: + if len(restored_args) == 1: + try: + return origin[restored_args[0]] + except TypeError: + pass + # ``types.UnionType`` is not itself subscriptable; rebuild PEP 604 + # unions from their members if one of those members needed + # portable restoration. + union_type = getattr(types, "UnionType", None) + if union_type is not None and origin is union_type: + result = restored_args[0] + for arg in restored_args[1:]: + result = result | arg + return result + raise exc + + +@_restore_generic_type_arg.register(list) +def _(value: list) -> list: + return [_restore_generic_type_arg(item) for item in value] + + +@_restore_generic_type_arg.register(tuple) +def _(value: tuple) -> tuple: + return tuple(_restore_generic_type_arg(item) for item in value) + + +def _new_ccflow_generic_model(origin: Type[PydanticBaseModel], args: Tuple[Any, ...]) -> PydanticBaseModel: + """Restore a Pydantic generic specialization without a process-local global. + + Pydantic registers runtime generic specializations like ``GenericResult[int]`` + in the origin module only in the process that first materializes them. Pickle + can then serialize instances by that global name, but fresh workers may not + have created the same specialization yet. Restore from the stable origin + class plus generic args instead of relying on that process-local module + mutation. + """ + + # Materialize the specialized class in this process. Pickle will restore the + # raw Pydantic state afterwards through BaseModel.__setstate__; keeping that + # state in the outer pickle stream preserves memo semantics for shared + # references, cycles, and protocol-5 buffers. + cls = origin[tuple(_restore_generic_type_arg(arg) for arg in args)] + return cls.__new__(cls) + + class BaseModel(PydanticBaseModel, _RegistryMixin, metaclass=_SerializeAsAnyMeta): """BaseModel is a base class for all pydantic models within the ccflow framework. @@ -334,6 +502,21 @@ def __setstate__(self, state): state["__pydantic_fields_set__"] = set(state["__pydantic_fields_set__"]) super().__setstate__(state) + def __reduce_ex__(self, protocol): + if _is_pydantic_generic_specialization(type(self)): + # Pydantic's default reducer may serialize runtime generic classes + # by names like ``GenericResult[int]``. Those names exist only + # after a process has materialized that exact specialization, so + # Ray/fresh workers can fail to import them while unpickling. + # Serialize a stable constructor instead: generic origin plus + # portable generic args. Leave Pydantic instance state to the outer + # pickle stream so shared references, cycles, and protocol-5 buffers + # keep normal pickle semantics. + metadata = type(self).__pydantic_generic_metadata__ + args = tuple(_portable_generic_type_arg(arg) for arg in metadata.get("args", ())) + return (_new_ccflow_generic_model, (metadata["origin"], args), self.__getstate__()) + return super().__reduce_ex__(protocol) + class _ModelRegistryData(PydanticBaseModel): """A data structure representation of the model registry, without the associated functionality""" diff --git a/ccflow/tests/test_base_cloudpickle.py b/ccflow/tests/test_base_cloudpickle.py new file mode 100644 index 0000000..493fdb7 --- /dev/null +++ b/ccflow/tests/test_base_cloudpickle.py @@ -0,0 +1,321 @@ +import json +import subprocess +import sys +import textwrap +from pathlib import Path + + +def _run_python(script: str) -> subprocess.CompletedProcess[str]: + return subprocess.run([sys.executable, "-c", script], capture_output=True, text=True, timeout=30) + + +def _create_payloads_in_fresh_process(creator_body: str) -> dict[str, dict[str, str]]: + # The bug only appears when the receiver has not already materialized the + # same Pydantic generic specialization. Create payloads in a subprocess so + # the pytest process cannot accidentally warm the receiver's generic cache. + creator = _run_python( + "import base64\n" + "import cloudpickle\n" + "import json\n" + "import pickle\n" + f"{textwrap.dedent(creator_body)}\n" + "encoded = {}\n" + "for name, (serializer, value) in payloads.items():\n" + " module = pickle if serializer == 'pickle' else cloudpickle\n" + " encoded[name] = {\n" + " 'serializer': serializer,\n" + " 'payload': base64.b64encode(module.dumps(value, protocol=5)).decode(),\n" + " }\n" + "print(json.dumps(encoded, sort_keys=True))\n" + ) + assert creator.returncode == 0, creator.stderr + return json.loads(creator.stdout) + + +def _assert_payloads_load_in_fresh_process(creator_body: str, assertions: str, *, extra_loader_setup: str = "") -> None: + encoded_payloads = _create_payloads_in_fresh_process(creator_body) + # First load each payload in its own fresh process. The original bug was + # sensitive to whether a worker had already materialized the same generic + # specialization, so a shared loader process can accidentally warm the + # receiver and create false positives. + for name, spec in encoded_payloads.items(): + cold_loader = _run_python( + "import base64\n" + "import cloudpickle\n" + "import json\n" + "import pickle\n" + f"{textwrap.dedent(extra_loader_setup)}\n" + f"spec = json.loads({json.dumps(spec)!r})\n" + "module = pickle if spec['serializer'] == 'pickle' else cloudpickle\n" + "module.loads(base64.b64decode(spec['payload']))\n" + ) + assert cold_loader.returncode == 0, f"{name}: {cold_loader.stderr}" + + # Load in a second fresh process, then run all assertions there. Keeping many + # payloads in one process pair keeps the detailed assertions cheap. The + # per-payload loop above already verifies cold-receiver loading. + loader = _run_python( + "import base64\n" + "import cloudpickle\n" + "import json\n" + "import pickle\n" + f"{textwrap.dedent(extra_loader_setup)}\n" + f"encoded = json.loads({json.dumps(encoded_payloads)!r})\n" + "values = {}\n" + "for name, spec in encoded.items():\n" + " module = pickle if spec['serializer'] == 'pickle' else cloudpickle\n" + " values[name] = module.loads(base64.b64decode(spec['payload']))\n" + f"{textwrap.dedent(assertions)}\n" + ) + assert loader.returncode == 0, loader.stderr + + +def test_ccflow_generic_specializations_pickle_across_fresh_processes(): + # One matrix test covers the ccflow-provided generic families and the hard + # type-argument shapes: nested Pydantic generics, builtin aliases containing + # Pydantic generics, and PEP 604 unions containing Pydantic generics. + _assert_payloads_load_in_fresh_process( + """ + import numpy as np + from typing import Callable, ClassVar, Final, List, Optional + + from ccflow import GenericContext, GenericResult + from ccflow.result import DictResult, ListResult + from ccflow.result.numpy import NumpyResult + + payloads = { + "standard_pickle_result": ("pickle", GenericResult[int](value=5)), + "generic_result": ("cloudpickle", GenericResult[int](value=6)), + "generic_context": ("cloudpickle", GenericContext[str](value="abc")), + "list_result": ("cloudpickle", ListResult[int](value=[1, 2])), + "dict_result": ("cloudpickle", DictResult[str, float](value={"a": 1.5})), + "numpy_result": ( + "cloudpickle", + NumpyResult[np.float64](array=np.array([1.0, 2.0], dtype=np.float64)), + ), + "nested_result": ( + "cloudpickle", + GenericResult[ListResult[int]](value=ListResult[int](value=[1, 2])), + ), + "list_alias_result": ( + "cloudpickle", + GenericResult[list[ListResult[int]]](value=[ListResult[int](value=[1])]), + ), + "typing_list_alias_result": ( + "cloudpickle", + GenericResult[List[ListResult[int]]](value=[ListResult[int](value=[2])]), + ), + "callable_alias_result": ( + "cloudpickle", + GenericResult[Callable[[ListResult[int]], int]](value=lambda result: len(result.value)), + ), + "optional_alias_result": ( + "cloudpickle", + GenericResult[Optional[ListResult[int]]](value=ListResult[int](value=[4])), + ), + "classvar_alias_result": ( + "cloudpickle", + GenericResult[ClassVar[ListResult[int]]](value=ListResult[int](value=[5])), + ), + "final_alias_result": ( + "cloudpickle", + GenericResult[Final[ListResult[int]]](value=ListResult[int](value=[6])), + ), + "dict_alias_result": ( + "cloudpickle", + GenericResult[dict[str, GenericContext[int]]](value={"a": GenericContext[int](value=1)}), + ), + "union_result": ( + "cloudpickle", + GenericResult[GenericContext[int] | None](value=GenericContext[int](value=1)), + ), + } + """, + """ + import numpy as np + from typing import Callable, ClassVar, Final, List, Optional + + from ccflow import GenericContext, GenericResult + from ccflow.result import DictResult, ListResult + from ccflow.result.numpy import NumpyResult + + assert values["standard_pickle_result"] == GenericResult[int](value=5) + assert type(values["standard_pickle_result"]).__pydantic_generic_metadata__["args"] == (int,) + + assert values["generic_result"] == GenericResult[int](value=6) + assert values["generic_context"] == GenericContext[str](value="abc") + assert values["list_result"] == ListResult[int](value=[1, 2]) + assert values["dict_result"] == DictResult[str, float](value={"a": 1.5}) + + assert type(values["numpy_result"]) is NumpyResult[np.float64] + np.testing.assert_array_equal(values["numpy_result"].array, np.array([1.0, 2.0], dtype=np.float64)) + + assert values["nested_result"] == GenericResult[ListResult[int]](value=ListResult[int](value=[1, 2])) + assert type(values["nested_result"]).__pydantic_generic_metadata__["args"] == (ListResult[int],) + assert type(values["nested_result"].value).__pydantic_generic_metadata__["args"] == (int,) + + assert values["list_alias_result"] == GenericResult[list[ListResult[int]]](value=[ListResult[int](value=[1])]) + assert type(values["list_alias_result"]).__pydantic_generic_metadata__["args"] == (list[ListResult[int]],) + assert type(values["list_alias_result"].value[0]).__pydantic_generic_metadata__["args"] == (int,) + + assert values["typing_list_alias_result"] == GenericResult[List[ListResult[int]]]( + value=[ListResult[int](value=[2])] + ) + assert type(values["typing_list_alias_result"]).__pydantic_generic_metadata__["args"] == (List[ListResult[int]],) + + assert values["callable_alias_result"].value(ListResult[int](value=[1, 2, 3])) == 3 + assert type(values["callable_alias_result"]).__pydantic_generic_metadata__["args"] == ( + Callable[[ListResult[int]], int], + ) + + assert values["optional_alias_result"] == GenericResult[Optional[ListResult[int]]]( + value=ListResult[int](value=[4]) + ) + assert type(values["optional_alias_result"]).__pydantic_generic_metadata__["args"] == ( + Optional[ListResult[int]], + ) + + assert values["classvar_alias_result"] == GenericResult[ClassVar[ListResult[int]]]( + value=ListResult[int](value=[5]) + ) + assert type(values["classvar_alias_result"]).__pydantic_generic_metadata__["args"] == ( + ClassVar[ListResult[int]], + ) + + assert values["final_alias_result"] == GenericResult[Final[ListResult[int]]]( + value=ListResult[int](value=[6]) + ) + assert type(values["final_alias_result"]).__pydantic_generic_metadata__["args"] == ( + Final[ListResult[int]], + ) + + assert values["dict_alias_result"] == GenericResult[dict[str, GenericContext[int]]]( + value={"a": GenericContext[int](value=1)} + ) + assert type(values["dict_alias_result"]).__pydantic_generic_metadata__["args"] == ( + dict[str, GenericContext[int]], + ) + assert type(values["dict_alias_result"].value["a"]).__pydantic_generic_metadata__["args"] == (int,) + + assert values["union_result"] == GenericResult[GenericContext[int] | None](value=GenericContext[int](value=1)) + assert type(values["union_result"].value).__pydantic_generic_metadata__["args"] == (int,) + """, + ) + + +def test_user_generic_and_non_generic_ccflow_models_pickle_across_fresh_processes(tmp_path: Path): + # User subclasses exercise the same BaseModel reducer without depending on + # ccflow's result/context classes. The non-generic case proves the generic + # override has not captured ordinary BaseModel pickling. + module_path = tmp_path / "generic_user_model.py" + module_path.write_text( + textwrap.dedent( + """ + from typing import Generic, TypeVar + + from pydantic import PrivateAttr + + from ccflow import BaseModel + + T = TypeVar("T") + + class UserBox(BaseModel, Generic[T]): + value: T + _bonus: int = PrivateAttr(default=1) + """ + ) + ) + + _assert_payloads_load_in_fresh_process( + f""" + import sys + from typing import Generic, TypeVar + + from pydantic import PrivateAttr + + from ccflow import BaseModel + + sys.path.insert(0, {str(tmp_path)!r}) + from generic_user_model import UserBox + + T = TypeVar("T") + + class LocalBox(BaseModel, Generic[T]): + value: T + _bonus: int = PrivateAttr(default=1) + + class LocalPayload(BaseModel): + value: int + _bonus: int = PrivateAttr(default=1) + + importable = UserBox[int](value=2) + importable._bonus = 40 + local_generic = LocalBox[int](value=3) + local_generic._bonus = 41 + local_payload = LocalPayload(value=4) + local_payload._bonus = 42 + + payloads = {{ + "importable_generic": ("cloudpickle", importable), + "local_generic": ("cloudpickle", local_generic), + "local_payload": ("cloudpickle", local_payload), + }} + """, + """ + from generic_user_model import UserBox + + assert type(values["importable_generic"]) is UserBox[int] + assert values["importable_generic"].value == 2 + assert values["importable_generic"]._bonus == 40 + + assert values["local_generic"].value == 3 + assert values["local_generic"]._bonus == 41 + assert type(values["local_generic"]).__name__ == "LocalBox[int]" + assert type(values["local_generic"]).__pydantic_generic_metadata__["args"] == (int,) + + assert values["local_payload"].value == 4 + assert values["local_payload"]._bonus == 42 + assert type(values["local_payload"]).__name__ == "LocalPayload" + """, + extra_loader_setup=f""" + import sys + sys.path.insert(0, {str(tmp_path)!r}) + """, + ) + + +def test_ccflow_generic_result_cloudpickle_in_ray_worker_without_receiver_materialization(): + import ray + + # Ray is the production-shaped failure mode: workers are fresh processes and + # may deserialize before ``GenericResult[int]`` has ever been created there. + payload = _create_payloads_in_fresh_process( + """ + from ccflow import GenericResult + + payloads = {"generic_result": ("cloudpickle", GenericResult[int](value=5))} + """ + )["generic_result"]["payload"] + + @ray.remote + def load_payload(encoded_payload: str): + import base64 + + import cloudpickle + + import ccflow.result.generic as generic_module + from ccflow import GenericResult # noqa: F401 + + # Importing the generic origin must not be enough to make the test pass. + # The worker should not know about ``GenericResult[int]`` until the + # reducer intentionally rebuilds it during cloudpickle.loads. + had_specialization_before_load = hasattr(generic_module, "GenericResult[int]") + value = cloudpickle.loads(base64.b64decode(encoded_payload)) + return ( + had_specialization_before_load, + value.value, + tuple(arg.__name__ for arg in type(value).__pydantic_generic_metadata__["args"]), + ) + + with ray.init(num_cpus=1): + assert ray.get(load_payload.remote(payload), timeout=30) == (False, 5, ("int",)) diff --git a/ccflow/tests/test_base_serialize.py b/ccflow/tests/test_base_serialize.py index cb41730..ab8d415 100644 --- a/ccflow/tests/test_base_serialize.py +++ b/ccflow/tests/test_base_serialize.py @@ -1,15 +1,39 @@ import pickle import platform import unittest -from typing import Annotated, ClassVar, Dict, List, Optional, Type, Union +from typing import ( + Annotated, + Any, + Callable, + ClassVar, + Dict, + Final, + Generic, + List, + NotRequired, + Optional, + Required, + Type, + TypeVar, + Union, +) import numpy as np from packaging import version from pydantic import BaseModel as PydanticBaseModel, ConfigDict, Field, ValidationError -from ccflow import BaseModel, NDArray +from ccflow import BaseModel, CallableModelGenericType, GenericContext, GenericResult, NDArray, NullContext +from ccflow.base import ( + _GenericAliasTypeSpec, + _is_pydantic_generic_specialization, + _new_ccflow_generic_model, + _portable_generic_type_arg, + _PydanticGenericTypeSpec, + _restore_generic_type_arg, +) from ccflow.enums import Enum from ccflow.exttypes.pydantic_numpy.ndtypes import bool_, complex64, float32, float64, int8, uint32 +from ccflow.result import ListResult from ccflow.serialization import make_ndarray_orjson_valid @@ -98,6 +122,13 @@ class MultiAttributeModel(BaseModel): w: Annotated[bool, None] +T = TypeVar("T") + + +class GenericBox(BaseModel, Generic[T]): + value: T + + class TestBaseModelSerialization(unittest.TestCase): def _numpy_equality(self, val: BaseModel, other: BaseModel) -> bool: if val.__class__ == other.__class__ and len(val.__dict__) == len(other.__dict__): @@ -270,3 +301,113 @@ def test_pickle_consistency(self): self.assertEqual(serialized, target) deserialized = pickle.loads(serialized) self.assertEqual(model, deserialized) + + def test_generic_pickle_override_guard_is_narrow(self): + # Blast radius check: the custom reducer should apply only to concrete + # Pydantic generic specializations. Ordinary BaseModel classes and + # unspecialized generic origins must keep Pydantic's default pickle + # behavior. + self.assertFalse(_is_pydantic_generic_specialization(ParentModel)) + self.assertFalse(_is_pydantic_generic_specialization(GenericBox)) + self.assertFalse(_is_pydantic_generic_specialization(GenericResult[int](value=5))) + self.assertTrue(_is_pydantic_generic_specialization(GenericBox[int])) + self.assertTrue(_is_pydantic_generic_specialization(GenericResult[int])) + + def test_reduce_ex_only_takes_over_generic_specializations(self): + # This is the core blast-radius assertion for normal users: a non-generic + # model should not route through the ccflow generic restore helper at + # all. If this fails, the BaseModel pickle override became too broad. + non_generic_reduce = ParentModel(field1=1).__reduce_ex__(pickle.HIGHEST_PROTOCOL) + self.assertIsNot(non_generic_reduce[0], _new_ccflow_generic_model) + + generic_model = GenericResult[int](value=5) + reduce_func, reduce_args, reduce_state = generic_model.__reduce_ex__(pickle.HIGHEST_PROTOCOL) + + self.assertIs(reduce_func, _new_ccflow_generic_model) + origin, args = reduce_args + self.assertIs(origin, GenericResult) + self.assertEqual(args, (int,)) + self.assertEqual(reduce_state, generic_model.__getstate__()) + + restored = reduce_func(*reduce_args) + restored.__setstate__(reduce_state) + self.assertEqual(restored, generic_model) + self.assertIs(type(restored), GenericResult[int]) + + def test_portable_generic_type_arg_only_wraps_fragile_type_arguments(self): + # The helper walks type arguments, not instance values. Plain importable + # args and generic aliases with only plain args stay unchanged, so the + # extra serialization shape is limited to aliases that actually contain + # runtime Pydantic generic specializations. This includes + # CallableModelGenericType annotations, which are themselves Pydantic + # generic specializations and can contain nested result specializations. + self.assertIs(_portable_generic_type_arg(int), int) + self.assertEqual(_portable_generic_type_arg(list[int]), list[int]) + + portable_model_arg = _portable_generic_type_arg(ListResult[int]) + self.assertIsInstance(portable_model_arg, _PydanticGenericTypeSpec) + self.assertIs(_restore_generic_type_arg(portable_model_arg), ListResult[int]) + + portable_alias_arg = _portable_generic_type_arg(list[ListResult[int]]) + self.assertIsInstance(portable_alias_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_alias_arg), list[ListResult[int]]) + + typing_list_alias = List[ListResult[int]] + portable_typing_alias_arg = _portable_generic_type_arg(typing_list_alias) + self.assertIsInstance(portable_typing_alias_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_typing_alias_arg), typing_list_alias) + + callable_alias = Callable[[ListResult[int]], int] + portable_callable_alias_arg = _portable_generic_type_arg(callable_alias) + self.assertIsInstance(portable_callable_alias_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_callable_alias_arg), callable_alias) + + portable_union_arg = _portable_generic_type_arg(GenericContext[int] | None) + self.assertIsInstance(portable_union_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_union_arg), GenericContext[int] | None) + + optional_alias = Optional[ListResult[int]] + portable_optional_arg = _portable_generic_type_arg(optional_alias) + self.assertIsInstance(portable_optional_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_optional_arg), optional_alias) + + classvar_alias = ClassVar[ListResult[int]] + portable_classvar_arg = _portable_generic_type_arg(classvar_alias) + self.assertIsInstance(portable_classvar_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_classvar_arg), classvar_alias) + + final_alias = Final[ListResult[int]] + portable_final_arg = _portable_generic_type_arg(final_alias) + self.assertIsInstance(portable_final_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_final_arg), final_alias) + + required_alias = Required[ListResult[int]] + portable_required_arg = _portable_generic_type_arg(required_alias) + self.assertIsInstance(portable_required_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_required_arg), required_alias) + + not_required_alias = NotRequired[ListResult[int]] + portable_not_required_arg = _portable_generic_type_arg(not_required_alias) + self.assertIsInstance(portable_not_required_arg, _GenericAliasTypeSpec) + self.assertEqual(_restore_generic_type_arg(portable_not_required_arg), not_required_alias) + + callable_annotation = CallableModelGenericType[NullContext, GenericResult[int]] + portable_callable_arg = _portable_generic_type_arg(callable_annotation) + self.assertIsInstance(portable_callable_arg, _PydanticGenericTypeSpec) + self.assertIs(_restore_generic_type_arg(portable_callable_arg), callable_annotation) + + def test_generic_pickle_handles_all_pickle_protocols(self): + for protocol in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=protocol): + restored = pickle.loads(pickle.dumps(GenericResult[int](value=5), protocol=protocol)) + self.assertEqual(restored, GenericResult[int](value=5)) + + def test_generic_pickle_preserves_outer_graph_identity_and_cycles(self): + shared = [] + restored_shared = pickle.loads(pickle.dumps([GenericResult[Any](value=shared), shared], protocol=pickle.HIGHEST_PROTOCOL)) + self.assertIs(restored_shared[0].value, restored_shared[1]) + + model = GenericResult[Any](value=None) + model.value = model + restored_cycle = pickle.loads(pickle.dumps(model, protocol=pickle.HIGHEST_PROTOCOL)) + self.assertIs(restored_cycle.value, restored_cycle)