Skip to content

Commit e615343

Browse files
committed
PR feedback
1 parent 71cfe09 commit e615343

File tree

2 files changed

+121
-91
lines changed

2 files changed

+121
-91
lines changed

docs/features.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,14 @@ modify behavior on an existing logger.
163163
```python
164164
import logging
165165

166+
from durabletask import task
167+
166168
logger = logging.getLogger("my_orchestrator")
167169

168-
def my_orchestrator(ctx: task.OrchestrationContext, input):
170+
def my_orchestrator(ctx: task.OrchestrationContext, payload):
169171
replay_logger = ctx.create_replay_safe_logger(logger)
170172
replay_logger.info("Starting orchestration %s", ctx.instance_id)
171-
result = yield ctx.call_activity(my_activity, input=input)
173+
result = yield ctx.call_activity(my_activity, input=payload)
172174
replay_logger.info("Activity returned: %s", result)
173175
return result
174176
```

tests/durabletask/test_orchestration_executor.py

Lines changed: 117 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,54 +1509,61 @@ class _RecordingHandler(logging.Handler):
15091509
def emit(self, record: logging.LogRecord) -> None:
15101510
log_calls.append(record.getMessage())
15111511

1512+
handler = _RecordingHandler()
15121513
inner_logger = logging.getLogger("test_replay_safe_logger")
15131514
inner_logger.setLevel(logging.DEBUG)
1514-
inner_logger.addHandler(_RecordingHandler())
1515-
1516-
activity_name = "say_hello"
1517-
1518-
def say_hello(_, name: str) -> str:
1519-
return f"Hello, {name}!"
1520-
1521-
def orchestrator(ctx: task.OrchestrationContext, _):
1522-
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1523-
replay_logger.info("Starting orchestration")
1524-
result = yield ctx.call_activity(say_hello, input="World")
1525-
replay_logger.info("Activity completed: %s", result)
1526-
return result
1527-
1528-
registry = worker._Registry()
1529-
activity_name = registry.add_activity(say_hello)
1530-
orchestrator_name = registry.add_orchestrator(orchestrator)
1531-
1532-
# First execution: starts the orchestration. The orchestrator runs without
1533-
# replay, so both log calls should be emitted.
1534-
new_events = [
1535-
helpers.new_orchestrator_started_event(datetime.now()),
1536-
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1537-
]
1538-
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1539-
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1540-
assert result.actions # should have scheduled the activity
1541-
1542-
assert log_calls == ["Starting orchestration"]
1543-
log_calls.clear()
1544-
1545-
# Second execution: the orchestrator replays from history and then processes the
1546-
# activity completion. The "Starting orchestration" message is emitted during
1547-
# replay and should be suppressed; "Activity completed" is emitted after replay
1548-
# ends and should appear exactly once.
1549-
old_events = new_events + [
1550-
helpers.new_task_scheduled_event(1, activity_name),
1551-
]
1552-
encoded_output = json.dumps(say_hello(None, "World"))
1553-
new_events = [helpers.new_task_completed_event(1, encoded_output)]
1554-
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1555-
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1556-
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1557-
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1558-
1559-
assert log_calls == ["Activity completed: Hello, World!"]
1515+
original_propagate = inner_logger.propagate
1516+
inner_logger.propagate = False
1517+
inner_logger.addHandler(handler)
1518+
1519+
try:
1520+
activity_name = "say_hello"
1521+
1522+
def say_hello(_, name: str) -> str:
1523+
return f"Hello, {name}!"
1524+
1525+
def orchestrator(ctx: task.OrchestrationContext, _):
1526+
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1527+
replay_logger.info("Starting orchestration")
1528+
result = yield ctx.call_activity(say_hello, input="World")
1529+
replay_logger.info("Activity completed: %s", result)
1530+
return result
1531+
1532+
registry = worker._Registry()
1533+
activity_name = registry.add_activity(say_hello)
1534+
orchestrator_name = registry.add_orchestrator(orchestrator)
1535+
1536+
# First execution: starts the orchestration. The orchestrator runs without
1537+
# replay, emits the initial log message, and then schedules the activity.
1538+
new_events = [
1539+
helpers.new_orchestrator_started_event(datetime.now()),
1540+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1541+
]
1542+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1543+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1544+
assert result.actions # should have scheduled the activity
1545+
1546+
assert log_calls == ["Starting orchestration"]
1547+
log_calls.clear()
1548+
1549+
# Second execution: the orchestrator replays from history and then processes the
1550+
# activity completion. The "Starting orchestration" message is emitted during
1551+
# replay and should be suppressed; "Activity completed" is emitted after replay
1552+
# ends and should appear exactly once.
1553+
old_events = new_events + [
1554+
helpers.new_task_scheduled_event(1, activity_name),
1555+
]
1556+
encoded_output = json.dumps(say_hello(None, "World"))
1557+
new_events = [helpers.new_task_completed_event(1, encoded_output)]
1558+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1559+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1560+
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1561+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1562+
1563+
assert log_calls == ["Activity completed: Hello, World!"]
1564+
finally:
1565+
inner_logger.removeHandler(handler)
1566+
inner_logger.propagate = original_propagate
15601567

15611568

15621569
def test_replay_safe_logger_all_levels():
@@ -1567,32 +1574,39 @@ class _LevelRecorder(logging.Handler):
15671574
def emit(self, record: logging.LogRecord) -> None:
15681575
log_levels.append(record.levelname)
15691576

1577+
handler = _LevelRecorder()
15701578
inner_logger = logging.getLogger("test_replay_safe_logger_levels")
15711579
inner_logger.setLevel(logging.DEBUG)
1572-
inner_logger.addHandler(_LevelRecorder())
1573-
1574-
def orchestrator(ctx: task.OrchestrationContext, _):
1575-
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1576-
replay_logger.debug("debug msg")
1577-
replay_logger.info("info msg")
1578-
replay_logger.warning("warning msg")
1579-
replay_logger.error("error msg")
1580-
replay_logger.critical("critical msg")
1581-
return "done"
1582-
1583-
registry = worker._Registry()
1584-
orchestrator_name = registry.add_orchestrator(orchestrator)
1585-
1586-
new_events = [
1587-
helpers.new_orchestrator_started_event(datetime.now()),
1588-
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1589-
]
1590-
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1591-
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1592-
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1593-
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1594-
1595-
assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
1580+
original_propagate = inner_logger.propagate
1581+
inner_logger.propagate = False
1582+
inner_logger.addHandler(handler)
1583+
1584+
try:
1585+
def orchestrator(ctx: task.OrchestrationContext, _):
1586+
replay_logger = ctx.create_replay_safe_logger(inner_logger)
1587+
replay_logger.debug("debug msg")
1588+
replay_logger.info("info msg")
1589+
replay_logger.warning("warning msg")
1590+
replay_logger.error("error msg")
1591+
replay_logger.critical("critical msg")
1592+
return "done"
1593+
1594+
registry = worker._Registry()
1595+
orchestrator_name = registry.add_orchestrator(orchestrator)
1596+
1597+
new_events = [
1598+
helpers.new_orchestrator_started_event(datetime.now()),
1599+
helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None),
1600+
]
1601+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1602+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1603+
complete_action = get_and_validate_complete_orchestration_action_list(1, result.actions)
1604+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED
1605+
1606+
assert log_levels == ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
1607+
finally:
1608+
inner_logger.removeHandler(handler)
1609+
inner_logger.propagate = original_propagate
15961610

15971611

15981612
def test_replay_safe_logger_direct():
@@ -1603,19 +1617,26 @@ class _RecordingHandler(logging.Handler):
16031617
def emit(self, record: logging.LogRecord) -> None:
16041618
log_calls.append(record.getMessage())
16051619

1620+
handler = _RecordingHandler()
16061621
inner_logger = logging.getLogger("test_replay_safe_logger_direct")
16071622
inner_logger.setLevel(logging.DEBUG)
1608-
inner_logger.addHandler(_RecordingHandler())
1623+
original_propagate = inner_logger.propagate
1624+
inner_logger.propagate = False
1625+
inner_logger.addHandler(handler)
16091626

1610-
replaying = True
1611-
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1627+
try:
1628+
replaying = True
1629+
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
16121630

1613-
replay_logger.info("should be suppressed")
1614-
assert log_calls == []
1631+
replay_logger.info("should be suppressed")
1632+
assert log_calls == []
16151633

1616-
replaying = False
1617-
replay_logger.info("should appear")
1618-
assert log_calls == ["should appear"]
1634+
replaying = False
1635+
replay_logger.info("should appear")
1636+
assert log_calls == ["should appear"]
1637+
finally:
1638+
inner_logger.removeHandler(handler)
1639+
inner_logger.propagate = original_propagate
16191640

16201641

16211642
def test_replay_safe_logger_log_method():
@@ -1626,19 +1647,26 @@ class _RecordingHandler(logging.Handler):
16261647
def emit(self, record: logging.LogRecord) -> None:
16271648
log_calls.append(record.getMessage())
16281649

1650+
handler = _RecordingHandler()
16291651
inner_logger = logging.getLogger("test_replay_safe_logger_log_method")
16301652
inner_logger.setLevel(logging.DEBUG)
1631-
inner_logger.addHandler(_RecordingHandler())
1632-
1633-
replaying = True
1634-
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1635-
1636-
replay_logger.log(logging.WARNING, "suppressed warning")
1637-
assert log_calls == []
1638-
1639-
replaying = False
1640-
replay_logger.log(logging.WARNING, "visible warning")
1641-
assert log_calls == ["visible warning"]
1653+
original_propagate = inner_logger.propagate
1654+
inner_logger.propagate = False
1655+
inner_logger.addHandler(handler)
1656+
1657+
try:
1658+
replaying = True
1659+
replay_logger = task.ReplaySafeLogger(inner_logger, lambda: replaying)
1660+
1661+
replay_logger.log(logging.WARNING, "suppressed warning")
1662+
assert log_calls == []
1663+
1664+
replaying = False
1665+
replay_logger.log(logging.WARNING, "visible warning")
1666+
assert log_calls == ["visible warning"]
1667+
finally:
1668+
inner_logger.removeHandler(handler)
1669+
inner_logger.propagate = original_propagate
16421670

16431671

16441672
def test_replay_safe_logger_is_enabled_for():

0 commit comments

Comments
 (0)