diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index b5c3178210d9..893b4cb58f4f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2456,10 +2456,10 @@ def expand(self, pcoll): tagged_type_hints = dict(self._fn.get_type_hints().tagged_output_types()) # Dead letter format: Tuple[element, Tuple[exception_type, repr, traceback]] - dead_letter_type = typehints.Tuple[pcoll.element_type, - typehints.Tuple[type, - str, - typehints.List[str]]] + dead_letter_type = typehints.Tuple[ + pcoll.element_type, + typehints.Tuple[type[typing.Any], str, typehints.Sequence[str]], + ] tagged_type_hints[self._dead_letter_tag] = dead_letter_type pardo = pardo.with_output_types(main_output_type, **tagged_type_hints) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index d80a03bdf53b..357971b9c0b0 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -1,33 +1,13 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - """Unit tests for the core python file.""" -# pytype: skip-file - import logging import os import tempfile import unittest +import typing from typing import Iterable from typing import Literal from typing import TypeVar - import pytest - import apache_beam as beam from apache_beam.coders import coders from apache_beam.options.pipeline_options import PipelineOptions @@ -42,67 +22,47 @@ from apache_beam.typehints import TypeCheckError from apache_beam.typehints import row_type from apache_beam.typehints import typehints - RETURN_NONE_PARTIAL_WARNING = "Process method returned None" - - class TestDoFn0(beam.DoFn): """Returning without a value is allowed""" def process(self, element): if not element: return yield element - - class TestDoFn1(beam.DoFn): def process(self, element): yield element - - class TestDoFn2(beam.DoFn): def process(self, element): def inner_func(x): yield x - return inner_func(element) - - class TestDoFn3(beam.DoFn): """mixing return and yield is not allowed""" def process(self, element): if not element: return -1 yield element - - class TestDoFn4(beam.DoFn): """test the variable name containing return""" def process(self, element): my_return = element yield my_return - - class TestDoFn5(beam.DoFn): """test the variable name containing yield""" def process(self, element): my_yield = element return my_yield - - class TestDoFn6(beam.DoFn): """test the variable name containing return""" def process(self, element): return_test = element yield return_test - - class TestDoFn7(beam.DoFn): """test the variable name containing yield""" def process(self, element): yield_test = element return yield_test - - class TestDoFn8(beam.DoFn): """test the code containing yield and yield from""" def process(self, element): @@ -110,35 +70,25 @@ def process(self, element): yield from [1, 2, 3] else: yield element - - class TestDoFn9(beam.DoFn): def process(self, element): if len(element) > 3: raise ValueError('Not allowed to have long elements') yield element - - class TestDoFn10(beam.DoFn): """test process returning None explicitly""" def process(self, element): return None - - class TestDoFn11(beam.DoFn): """test process returning None (no return and no yield)""" def process(self, element): pass - - class TestDoFn12(beam.DoFn): """test process returning None in a filter pattern""" def process(self, element): if element == 0: return return element - - class TestDoFnStateful(beam.DoFn): STATE_SPEC = ReadModifyWriteStateSpec('num_elements', coders.VarIntCoder()) """test process with a stateful dofn""" @@ -148,8 +98,6 @@ def process(self, element, state=beam.DoFn.StateParam(STATE_SPEC)): current_value = state.read() or 1 state.write(current_value + 1) yield current_value - - class TestDoFnWithTimer(beam.DoFn): ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder()) TIMER = TimerSpec('timer', beam.TimeDomain.WATERMARK) @@ -164,25 +112,18 @@ def process( raise ValueError('Not allowed to have large numbers') state.add(element[1]) timer.set(t) - return [] - @on_timer(TIMER) def expiry_callback(self, state=beam.DoFn.StateParam(ALL_ELEMENTS)): unique_elements = list(state.read()) state.clear() - return unique_elements - - class CreateTest(unittest.TestCase): @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): self._caplog = caplog - def test_dofn_with_yield_and_return(self): warning_text = 'Using yield and return' - with self._caplog.at_level(logging.WARNING): assert beam.ParDo(sum) assert beam.ParDo(TestDoFn0()) @@ -194,28 +135,22 @@ def test_dofn_with_yield_and_return(self): assert beam.ParDo(TestDoFn7()) assert beam.ParDo(TestDoFn8()) assert warning_text not in self._caplog.text - with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn3()) assert warning_text in self._caplog.text - def test_dofn_with_explicit_return_none(self): with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn10()) assert RETURN_NONE_PARTIAL_WARNING in self._caplog.text assert str(TestDoFn10) in self._caplog.text - def test_dofn_with_implicit_return_none_missing_return_and_yield(self): with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn11()) assert RETURN_NONE_PARTIAL_WARNING not in self._caplog.text - def test_dofn_with_implicit_return_none_and_value(self): with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn12()) assert RETURN_NONE_PARTIAL_WARNING not in self._caplog.text - - class PartitionTest(unittest.TestCase): def test_partition_with_bools(self): with pytest.raises( @@ -230,89 +165,69 @@ def test_partition_with_bools(self): _ = ( p | beam.Create([input_value]) | beam.Partition(lambda x, _: x, 2)) - def test_partition_with_numpy_integers(self): # Test that numpy integer types are correctly accepted by the # ApplyPartitionFnFn class import numpy as np - # Create an instance of the ApplyPartitionFnFn class apply_partition_fn = beam.Partition.ApplyPartitionFnFn() - # Define a simple partition function class SimplePartitionFn(beam.PartitionFn): def partition_for(self, element, num_partitions): return element % num_partitions - partition_fn = SimplePartitionFn() - # Test with numpy.int32 # This should not raise an exception outputs = list(apply_partition_fn.process(np.int32(1), partition_fn, 3)) self.assertEqual(len(outputs), 1) self.assertEqual(outputs[0].tag, '1') # 1 % 3 = 1 - # Test with numpy.int64 # This should not raise an exception outputs = list(apply_partition_fn.process(np.int64(2), partition_fn, 3)) self.assertEqual(len(outputs), 1) self.assertEqual(outputs[0].tag, '2') # 2 % 3 = 2 - def test_partition_fn_returning_numpy_integers(self): # Test that partition functions can return numpy integer types import numpy as np - # Create an instance of the ApplyPartitionFnFn class apply_partition_fn = beam.Partition.ApplyPartitionFnFn() - # Define partition functions that return numpy integer types class Int32PartitionFn(beam.PartitionFn): def partition_for(self, element, num_partitions): return np.int32(element % num_partitions) - class Int64PartitionFn(beam.PartitionFn): def partition_for(self, element, num_partitions): return np.int64(element % num_partitions) - # Test with partition function returning numpy.int32 # This should not raise an exception outputs = list(apply_partition_fn.process(1, Int32PartitionFn(), 3)) self.assertEqual(len(outputs), 1) self.assertEqual(outputs[0].tag, '1') # 1 % 3 = 1 - # Test with partition function returning numpy.int64 # This should not raise an exception outputs = list(apply_partition_fn.process(2, Int64PartitionFn(), 3)) self.assertEqual(len(outputs), 1) self.assertEqual(outputs[0].tag, '2') # 2 % 3 = 2 - def test_partition_boundedness(self): def partition_fn(val, num_partitions): return val % num_partitions - class UnboundedDoFn(beam.DoFn): @beam.DoFn.unbounded_per_element() def process(self, element): yield element - with beam.testing.test_pipeline.TestPipeline() as p: source = p | beam.Create([1, 2, 3, 4, 5]) p1, p2, p3 = source | "bounded" >> beam.Partition(partition_fn, 3) - self.assertEqual(source.is_bounded, True) self.assertEqual(p1.is_bounded, True) self.assertEqual(p2.is_bounded, True) self.assertEqual(p3.is_bounded, True) - unbounded = source | beam.ParDo(UnboundedDoFn()) p4, p5, p6 = unbounded | "unbounded" >> beam.Partition(partition_fn, 3) - self.assertEqual(unbounded.is_bounded, False) self.assertEqual(p4.is_bounded, False) self.assertEqual(p5.is_bounded, False) self.assertEqual(p6.is_bounded, False) - - class FlattenTest(unittest.TestCase): def test_flatten_identical_windows(self): with beam.testing.test_pipeline.TestPipeline() as p: @@ -324,7 +239,6 @@ def test_flatten_identical_windows(self): FixedWindows(100)) out = (source1, source2, source3) | "flatten" >> beam.Flatten() assert_that(out, equal_to([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) - def test_flatten_no_windows(self): with beam.testing.test_pipeline.TestPipeline() as p: source1 = p | "c1" >> beam.Create([1, 2, 3, 4, 5]) @@ -332,7 +246,6 @@ def test_flatten_no_windows(self): source3 = p | "c3" >> beam.Create([9, 10]) out = (source1, source2, source3) | "flatten" >> beam.Flatten() assert_that(out, equal_to([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) - def test_flatten_mismatched_windows(self): with beam.testing.test_pipeline.TestPipeline() as p: source1 = p | "c1" >> beam.Create( @@ -342,8 +255,6 @@ def test_flatten_mismatched_windows(self): source3 = p | "c3" >> beam.Create([9, 10]) | "w3" >> beam.WindowInto( FixedWindows(100)) _ = (source1, source2, source3) | "flatten" >> beam.Flatten() - - class ExceptionHandlingTest(unittest.TestCase): def test_routes_failures(self): with beam.Pipeline() as pipeline: @@ -354,12 +265,10 @@ def test_routes_failures(self): bad_elements = bad | beam.Keys() assert_that(good, equal_to(['abc', 'foo', 'bar']), 'good') assert_that(bad_elements, equal_to(['long_word', 'foobar']), 'bad') - def test_handles_callbacks(self): with tempfile.TemporaryDirectory() as tmp_dirname: tmp_path = os.path.join(tmp_dirname, 'tmp_filename') file_contents = 'random content' - def failure_callback(e, el): if type(e) is not ValueError: raise Exception(f'Failed to pass in correct exception, received {e}') @@ -369,7 +278,6 @@ def failure_callback(e, el): logging.warning(tmp_path) f.write(file_contents) f.close() - with beam.Pipeline() as pipeline: good, bad = ( pipeline | beam.Create(['abc', 'bcd', 'foo', 'bar', 'foobar']) @@ -382,18 +290,15 @@ def failure_callback(e, el): with open(tmp_path) as f: s = f.read() self.assertEqual(s, file_contents) - def test_handles_no_callback_triggered(self): with tempfile.TemporaryDirectory() as tmp_dirname: tmp_path = os.path.join(tmp_dirname, 'tmp_filename') file_contents = 'random content' - def failure_callback(e, el): f = open(tmp_path, "a") logging.warning(tmp_path) f.write(file_contents) f.close() - with beam.Pipeline() as pipeline: good, bad = ( pipeline | beam.Create(['abc', 'bcd', 'foo', 'bar']) @@ -404,7 +309,6 @@ def failure_callback(e, el): assert_that(good, equal_to(['abc', 'bcd', 'foo', 'bar']), 'good') assert_that(bad_elements, equal_to([]), 'bad') self.assertFalse(os.path.isfile(tmp_path)) - def test_stateful_exception_handling(self): with beam.Pipeline() as pipeline: good, bad = ( @@ -417,7 +321,6 @@ def test_stateful_exception_handling(self): assert_that(good, equal_to([1, 2, 3]), 'good') assert_that( bad_elements, equal_to([(1, 'long_word'), (1, 'foobar')]), 'bad') - def test_timer_exception_handling(self): with beam.Pipeline() as pipeline: good, bad = ( @@ -428,11 +331,9 @@ def test_timer_exception_handling(self): bad_elements = bad | beam.Keys() assert_that(good, equal_to([0, 1, 2]), 'good') assert_that(bad_elements, equal_to([(1, 5), (1, 10)]), 'bad') - def test_tags_with_exception_handling_then_resource_hint(self): class TagHint(ResourceHint): urn = 'beam:resources:tags:v1' - ResourceHint.register_resource_hint('tags', TagHint) with beam.Pipeline() as pipeline: ok, unused_errors = ( @@ -450,11 +351,9 @@ class TagHint(ResourceHint): pd.get_resource_hints(), {'beam:resources:tags:v1': b'test_tag'}, ) - def test_tags_with_exception_handling_timeout_then_resource_hint(self): class TagHint(ResourceHint): urn = 'beam:resources:tags:v1' - ResourceHint.register_resource_hint('tags', TagHint) with beam.Pipeline() as pipeline: ok, unused_errors = ( @@ -472,11 +371,9 @@ class TagHint(ResourceHint): pd.get_resource_hints(), {'beam:resources:tags:v1': b'test_tag'}, ) - def test_tags_with_resource_hint_then_exception_handling(self): class TagHint(ResourceHint): urn = 'beam:resources:tags:v1' - ResourceHint.register_resource_hint('tags', TagHint) with beam.Pipeline() as pipeline: ok, unused_errors = ( @@ -494,11 +391,9 @@ class TagHint(ResourceHint): pd.get_resource_hints(), {'beam:resources:tags:v1': b'test_tag'}, ) - def test_tags_with_resource_hint_then_exception_handling_timeout(self): class TagHint(ResourceHint): urn = 'beam:resources:tags:v1' - ResourceHint.register_resource_hint('tags', TagHint) with beam.Pipeline() as pipeline: ok, unused_errors = ( @@ -516,8 +411,6 @@ class TagHint(ResourceHint): pd.get_resource_hints(), {'beam:resources:tags:v1': b'test_tag'}, ) - - class ExceptionHandlingWithOutputsTest(unittest.TestCase): """Tests for combining with_exception_handling() and with_outputs().""" def _create_dofn_with_tagged_outputs(self): @@ -536,12 +429,9 @@ def process( yield beam.pvalue.TaggedOutput('fives', str(element)) # type: ignore[misc] else: yield element - return DoWithFailures() - def test_with_exception_handling_then_with_outputs(self): """Direction 1: .with_exception_handling().with_outputs()""" - with beam.Pipeline() as p: results = ( p @@ -549,7 +439,6 @@ def test_with_exception_handling_then_with_outputs(self): | beam.ParDo(self._create_dofn_with_tagged_outputs()). with_exception_handling().with_outputs( 'threes', 'fives', main='main')) - assert_that(results.main, equal_to([1, 7]), 'main') assert_that(results.threes, equal_to([3]), 'threes') assert_that(results.fives, equal_to(['5']), 'fives') @@ -561,18 +450,15 @@ def test_with_exception_handling_then_with_outputs(self): self.assertEqual(results.fives.element_type, str) self.assertEqual( results.bad.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) - + typehints.Tuple[int, typehints.Tuple[type[typing.Any], str, typehints.Sequence[str]]]) def test_with_outputs_then_with_exception_handling(self): """Direction 2: .with_outputs().with_exception_handling()""" - with beam.Pipeline() as p: results = ( p | beam.Create([1, 2, 3, 4, 5, 6, 7]) | beam.ParDo(self._create_dofn_with_tagged_outputs()).with_outputs( 'threes', 'fives', main='main').with_exception_handling()) - assert_that(results.main, equal_to([1, 7]), 'main') assert_that(results.threes, equal_to([3]), 'threes') assert_that(results.fives, equal_to(['5']), 'fives') @@ -584,12 +470,10 @@ def test_with_outputs_then_with_exception_handling(self): self.assertEqual(results.fives.element_type, str) self.assertEqual( results.bad.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) - + typehints.Tuple[int, typehints.Tuple[type[typing.Any], str, typehints.Sequence[str]]]) def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag( self): """Direction 2 with custom dead_letter_tag.""" - with beam.Pipeline() as p: results = ( p @@ -597,7 +481,6 @@ def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag( | beam.ParDo(self._create_dofn_with_tagged_outputs()).with_outputs( 'threes', main='main').with_exception_handling(dead_letter_tag='errors')) - assert_that(results.main, equal_to([1]), 'main') assert_that(results.threes, equal_to([3]), 'threes') bad_elements = results.errors | beam.Keys() @@ -605,12 +488,10 @@ def test_with_outputs_then_with_exception_handling_custom_dead_letter_tag( self.assertEqual(results.threes.element_type, int) self.assertEqual( results.errors.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) - + typehints.Tuple[int, typehints.Tuple[type[typing.Any], str, typehints.Sequence[str]]]) def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag( self): """Direction 1 with custom dead_letter_tag.""" - with beam.Pipeline() as p: results = ( p @@ -618,7 +499,6 @@ def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag( | beam.ParDo( self._create_dofn_with_tagged_outputs()).with_exception_handling( dead_letter_tag='errors').with_outputs('threes', main='main')) - assert_that(results.main, equal_to([1]), 'main') assert_that(results.threes, equal_to([3]), 'threes') bad_elements = results.errors | beam.Keys() @@ -626,22 +506,18 @@ def test_with_exception_handling_then_with_outputs_custom_dead_letter_tag( self.assertEqual(results.threes.element_type, int) self.assertEqual( results.errors.element_type, - typehints.Tuple[int, typehints.Tuple[type, str, typehints.List[str]]]) - + typehints.Tuple[int, typehints.Tuple[type[typing.Any], str, typehints.Sequence[str]]]) def test_exception_handling_no_with_outputs_backward_compat(self): """Without with_outputs(), behavior is unchanged.""" - with beam.Pipeline() as p: good, bad = ( p | beam.Create([1, 2, 7]) | beam.ParDo(self._create_dofn_with_tagged_outputs()) .with_exception_handling()) - assert_that(good, equal_to([1, 7]), 'good') bad_elements = bad | beam.Keys() assert_that(bad_elements, equal_to([2]), 'bad') - def test_exception_handling_compat_version_uses_old_behavior(self): """With compat version < 2.73.0, old expand path is used.""" options = PipelineOptions(update_compatibility_version="2.72.0") @@ -651,15 +527,12 @@ def test_exception_handling_compat_version_uses_old_behavior(self): | beam.Create([1, 2, 7]) | beam.ParDo(self._create_dofn_with_tagged_outputs()) .with_exception_handling()) - assert_that(good, equal_to([1, 7]), 'good') bad_elements = bad | beam.Keys() assert_that(bad_elements, equal_to([2]), 'bad') - def test_exception_handling_compat_version_element_type_set_manually(self): """With compat version < 2.73.0, element_type is set via manual override (the old behavior) rather than via with_output_types.""" - options = PipelineOptions(update_compatibility_version="2.72.0") with beam.Pipeline(options=options) as p: results = ( @@ -667,7 +540,6 @@ def test_exception_handling_compat_version_element_type_set_manually(self): | beam.Create([1, 2, 3]) | beam.ParDo(self._create_dofn_with_tagged_outputs()). with_exception_handling().with_outputs('threes', main='main')) - # In old path, dead letter type is Any (no with_output_types call) self.assertEqual(results.bad.element_type, typehints.Any) # Tagged outputs still get types from DoFn Literal annotations @@ -675,7 +547,6 @@ def test_exception_handling_compat_version_element_type_set_manually(self): self.assertEqual(results.threes.element_type, int) # Main output type should still be inferred via manual override assert_that(results.main, equal_to([1]), 'main') - def test_with_outputs_then_exception_handling_with_map(self): """with_outputs().with_exception_handling() also works on Map.""" with beam.Pipeline() as p: @@ -687,7 +558,6 @@ def test_with_outputs_then_exception_handling_with_map(self): assert_that(results.main, equal_to([1, 3, 5]), 'main') bad_elements = results.bad | beam.Keys() assert_that(bad_elements, equal_to([2, 4]), 'bad') - def test_with_output_types_chained_on_pardo(self): """When type hints are chained on the ParDo (not annotations on the DoFn), tagged output types should still be propagated through @@ -700,7 +570,6 @@ def process(self, element): yield beam.pvalue.TaggedOutput('threes', element) else: yield element - with beam.Pipeline() as p: results = ( p @@ -708,14 +577,12 @@ def process(self, element): | beam.ParDo(DoWithFailuresNoAnnotations()).with_output_types( int, threes=int).with_exception_handling().with_outputs( 'threes', main='main')) - assert_that(results.main, equal_to([1, 7]), 'main') assert_that(results.threes, equal_to([3]), 'threes') bad_elements = results.bad | beam.Keys() assert_that(bad_elements, equal_to([2]), 'bad') self.assertEqual(results.main.element_type, int) self.assertEqual(results.threes.element_type, int) - def test_with_outputs_and_error_handler(self): """with_outputs() + error_handler should return DoOutputsTuple, not a bare PCollection.""" @@ -728,32 +595,23 @@ def test_with_outputs_and_error_handler(self): | beam.ParDo(self._create_dofn_with_tagged_outputs()).with_outputs( 'threes', 'fives', main='main').with_exception_handling(error_handler=handler)) - assert_that(results.main, equal_to([1, 7]), 'main') assert_that(results.threes, equal_to([3]), 'threes') assert_that(results.fives, equal_to(['5']), 'fives') - - def test_callablewrapper_typehint(): T = TypeVar("T") - def identity(x: T) -> T: return x - dofn = beam.core.CallableWrapperDoFn(identity) assert dofn.get_type_hints().strip_iterable()[1][0][0] == typehints.Any - - class FlatMapTest(unittest.TestCase): def test_default(self): - with beam.Pipeline() as pipeline: letters = ( pipeline | beam.Create(['abc', 'def'], reshuffle=False) | beam.FlatMap()) assert_that(letters, equal_to(['a', 'b', 'c', 'd', 'e', 'f'])) - def test_default_identity_function_with_typehint(self): with beam.Pipeline() as pipeline: letters = ( @@ -761,19 +619,15 @@ def test_default_identity_function_with_typehint(self): | beam.Create([["abc"]], reshuffle=False) | beam.FlatMap() | beam.Map(lambda s: s.upper()).with_input_types(str)) - assert_that(letters, equal_to(["ABC"])) - def test_typecheck_with_default(self): with pytest.raises(TypeCheckError): with beam.Pipeline() as pipeline: _ = ( - pipeline +pipeline | beam.Create([[1, 2, 3]], reshuffle=False) | beam.FlatMap() | beam.Map(lambda s: s.upper()).with_input_types(str)) - - class CreateInferOutputSchemaTest(unittest.TestCase): def test_multiple_types_for_field(self): output_type = beam.Create([beam.Row(a=1), @@ -783,13 +637,11 @@ def test_multiple_types_for_field(self): row_type.RowTypeConstraint.from_fields([ ('a', typehints.Union[int, str]) ])) - def test_single_type_for_field(self): output_type = beam.Create([beam.Row(a=1), beam.Row(a=2)]).infer_output_type(None) self.assertEqual( output_type, row_type.RowTypeConstraint.from_fields([('a', int)])) - def test_optional_type_for_field(self): output_type = beam.Create([beam.Row(a=1), beam.Row(a=None)]).infer_output_type(None) @@ -797,13 +649,10 @@ def test_optional_type_for_field(self): output_type, row_type.RowTypeConstraint.from_fields([('a', typehints.Optional[int]) ])) - def test_none_type_for_field_raises_error(self): with self.assertRaisesRegex(TypeError, "('No types found for field %s', 'a')"): beam.Create([beam.Row(a=None), beam.Row(a=None)]).infer_output_type(None) - - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()