From 751a4622d900323af78f160099ceb1b0bd014791 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 12 Feb 2025 16:15:10 +0000 Subject: [PATCH 1/7] Update the dlq reposting tool to not require zocalo --- src/murfey/cli/dlq_resubmit.py | 194 +++++++++++++++++++++++++-------- 1 file changed, 146 insertions(+), 48 deletions(-) diff --git a/src/murfey/cli/dlq_resubmit.py b/src/murfey/cli/dlq_resubmit.py index 129f55fb0..9e87db9a4 100644 --- a/src/murfey/cli/dlq_resubmit.py +++ b/src/murfey/cli/dlq_resubmit.py @@ -1,14 +1,122 @@ import argparse import json -import subprocess +import os +import time +from datetime import datetime +from functools import partial from pathlib import Path +from queue import Empty, Queue import requests +from jose import jwt +from workflows.transport.pika_transport import PikaTransport +dlq_dump_path = Path("./DLQ") -def handle_failed_posts(json_folder: Path, token: str): + +def dlq_purge(queue: str, rabbitmq_credentials: Path) -> list[Path]: + transport = PikaTransport() + transport.load_configuration_file(rabbitmq_credentials) + transport.connect() + + queue_to_purge = "dlq." + queue + idlequeue: Queue = Queue() + exported_messages = [] + + def receive_dlq_message(header: dict, message: dict) -> None: + idlequeue.put_nowait("start") + header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"]) + + timestamp = time.localtime(int(header["x-death"][0]["time"])) + filepath = dlq_dump_path / time.strftime("%Y-%m-%d", timestamp) + filepath.mkdir(parents=True, exist_ok=True) + filename = filepath / ( + f"{queue}-" + + time.strftime("%Y%m%d-%H%M%S", timestamp) + + "-" + + str(header["message-id"]) + ) + + dlqmsg = { + "exported": { + "date": time.strftime("%Y-%m-%d"), + "time": time.strftime("%H:%M:%S"), + }, + "header": header, + "message": message, + } + + with filename.open("w") as fh: + json.dump(dlqmsg, fh, indent=2, sort_keys=True) + print(f"Message {header['message-id']} exported to {filename}") + exported_messages.append(filename) + transport.ack(header) + idlequeue.put_nowait("done") + + print("Looking for DLQ messages in " + queue_to_purge) + transport.subscribe( + queue_to_purge, + partial(receive_dlq_message), + acknowledgement=True, + ) + try: + idlequeue.get(True, 3) + while True: + idlequeue.get(True, 0.1) + except Empty: + print("Done.") + transport.disconnect() + return exported_messages + + +def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path): + transport = PikaTransport() + transport.load_configuration_file(rabbitmq_credentials) + transport.connect() + + for f, dlqfile in enumerate(messages_path): + if not Path(dlqfile).is_file(): + print(f"Ignoring missing file {dlqfile}") + continue + with open(dlqfile) as fh: + dlqmsg = json.load(fh) + print(f"Parsing message from {dlqfile}") + if ( + not isinstance(dlqmsg, dict) + or not dlqmsg.get("header") + or not dlqmsg.get("message") + ): + print(f"File {dlqfile} is not a valid DLQ message.") + continue + + header = dlqmsg["header"] + header["dlq-reinjected"] = "True" + + drop_keys = { + "message-id", + "routing_key", + "redelivered", + "exchange", + "consumer_tag", + "delivery_mode", + } + clean_header = {k: str(v) for k, v in header.items() if k not in drop_keys} + + destination = header.get("x-death", [{}])[0].get("queue") + transport.send( + destination, + dlqmsg["message"], + headers=clean_header, + ) + dlqfile.unlink() + print(f"Done {dlqfile}\n") + + transport.disconnect() + + +def handle_failed_posts(messages_path: list[Path], token: str): """Deal with any messages that have been sent as failed client posts""" - for json_file in json_folder.glob("*"): + for json_file in messages_path: with open(json_file, "r") as json_data: message = json.load(json_data) @@ -28,20 +136,6 @@ def handle_failed_posts(json_folder: Path, token: str): json_file.unlink() -def handle_dlq_messages(json_folder: Path): - """Reinjected to the queue""" - for json_file in json_folder.glob("*"): - reinject_result = subprocess.run( - ["zocalo.dlq_reinject", "-e", "devrmq", str(json_file)], - capture_output=True, - ) - if reinject_result.returncode == 0: - print(f"Reinjected {json_file}") - json_file.unlink() - else: - print(f"Failed to reinject {json_file}") - - def run(): """ Method of checking and purging murfey queues on rabbitmq @@ -53,43 +147,47 @@ def run(): description="Purge and reinject failed murfey messages" ) parser.add_argument( - "--queue", - help="Queue to check and purge", - required=True, + "-c", + "--config", + help="Security config file", + required=False, ) parser.add_argument( - "--token", - help="Murfey token", + "-u", + "--username", + help="Token username", required=True, ) args = parser.parse_args() - purge_result = subprocess.run( - ["zocalo.dlq_purge", "-e", "devrmq", args.queue], - capture_output=True, + # Set the environment variable then read it by importing the security config + os.environ["MURFEY_SECURITY_CONFIGURATION"] = args.config + from murfey.util.config import get_security_config + + security_config = get_security_config() + + # Get the token to post to the api with + token = jwt.encode( + {"user": args.username}, + security_config.auth_key, + algorithm=security_config.auth_algorithm, ) - if purge_result.returncode != 0: - print(f"Failed to purge {args.queue}") - return - purge_stdout = purge_result.stdout.decode("utf8") - export_directories = [] - if "exported" in purge_stdout: - for line in purge_stdout.split("\n"): - if line.strip().startswith("DLQ/"): - dlq_dir = "DLQ/" + line.split("/")[1] - if dlq_dir not in export_directories: - print(f"Found messages in {dlq_dir}") - export_directories.append(dlq_dir) - - if not export_directories: - print("No exported messages found") - return - - for json_dir in export_directories: - handle_failed_posts(Path(json_dir), args.token) - handle_dlq_messages(Path(json_dir)) - print("Done") + # Purge the queue and repost/reinject any messages found + exported_messages = dlq_purge( + security_config.feedback_queue, security_config.rabbitmq_credentials + ) + handle_failed_posts(exported_messages, token) + handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials) -if __name__ == "__main__": - run() + # Clean up any created directories + for date_directory in dlq_dump_path.glob("*"): + try: + date_directory.rmdir() + except OSError: + print(f"Cannot remove {date_directory} as it is not empty") + try: + dlq_dump_path.rmdir() + except OSError: + print(f"Cannot remove {dlq_dump_path} as it is not empty") + print("Done") From 9e35e3af3a4f9d735182f9ea7743bdce14b3b4bb Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 13 Feb 2025 09:13:06 +0000 Subject: [PATCH 2/7] Simplification is possible as don't need to store messages --- src/murfey/cli/dlq_resubmit.py | 60 +++++++++------------------------- 1 file changed, 16 insertions(+), 44 deletions(-) diff --git a/src/murfey/cli/dlq_resubmit.py b/src/murfey/cli/dlq_resubmit.py index 9e87db9a4..b95809458 100644 --- a/src/murfey/cli/dlq_resubmit.py +++ b/src/murfey/cli/dlq_resubmit.py @@ -1,7 +1,6 @@ import argparse import json import os -import time from datetime import datetime from functools import partial from pathlib import Path @@ -11,10 +10,10 @@ from jose import jwt from workflows.transport.pika_transport import PikaTransport -dlq_dump_path = Path("./DLQ") - -def dlq_purge(queue: str, rabbitmq_credentials: Path) -> list[Path]: +def dlq_purge( + dlq_dump_path: Path, queue: str, rabbitmq_credentials: Path +) -> list[Path]: transport = PikaTransport() transport.load_configuration_file(rabbitmq_credentials) transport.connect() @@ -26,26 +25,8 @@ def dlq_purge(queue: str, rabbitmq_credentials: Path) -> list[Path]: def receive_dlq_message(header: dict, message: dict) -> None: idlequeue.put_nowait("start") header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"]) - - timestamp = time.localtime(int(header["x-death"][0]["time"])) - filepath = dlq_dump_path / time.strftime("%Y-%m-%d", timestamp) - filepath.mkdir(parents=True, exist_ok=True) - filename = filepath / ( - f"{queue}-" - + time.strftime("%Y%m%d-%H%M%S", timestamp) - + "-" - + str(header["message-id"]) - ) - - dlqmsg = { - "exported": { - "date": time.strftime("%Y-%m-%d"), - "time": time.strftime("%H:%M:%S"), - }, - "header": header, - "message": message, - } - + filename = dlq_dump_path / (f"{queue}-" + str(header["message-id"])) + dlqmsg = {"header": header, "message": message} with filename.open("w") as fh: json.dump(dlqmsg, fh, indent=2, sort_keys=True) print(f"Message {header['message-id']} exported to {filename}") @@ -60,11 +41,10 @@ def receive_dlq_message(header: dict, message: dict) -> None: acknowledgement=True, ) try: - idlequeue.get(True, 3) while True: idlequeue.get(True, 0.1) except Empty: - print("Done.") + print("Done dlq purge") transport.disconnect() return exported_messages @@ -75,20 +55,10 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path): transport.connect() for f, dlqfile in enumerate(messages_path): - if not Path(dlqfile).is_file(): - print(f"Ignoring missing file {dlqfile}") + if not dlqfile.is_file(): continue with open(dlqfile) as fh: dlqmsg = json.load(fh) - print(f"Parsing message from {dlqfile}") - if ( - not isinstance(dlqmsg, dict) - or not dlqmsg.get("header") - or not dlqmsg.get("message") - ): - print(f"File {dlqfile} is not a valid DLQ message.") - continue - header = dlqmsg["header"] header["dlq-reinjected"] = "True" @@ -109,7 +79,7 @@ def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path): headers=clean_header, ) dlqfile.unlink() - print(f"Done {dlqfile}\n") + print(f"Reinjected {dlqfile}\n") transport.disconnect() @@ -158,6 +128,9 @@ def run(): help="Token username", required=True, ) + parser.add_argument( + "-d", "--dir", default="DLQ", help="Directory to export messages to" + ) args = parser.parse_args() # Set the environment variable then read it by importing the security config @@ -174,18 +147,17 @@ def run(): ) # Purge the queue and repost/reinject any messages found + dlq_dump_path = Path(args.dir) + dlq_dump_path.mkdir(parents=True, exist_ok=True) exported_messages = dlq_purge( - security_config.feedback_queue, security_config.rabbitmq_credentials + dlq_dump_path, + security_config.feedback_queue, + security_config.rabbitmq_credentials, ) handle_failed_posts(exported_messages, token) handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials) # Clean up any created directories - for date_directory in dlq_dump_path.glob("*"): - try: - date_directory.rmdir() - except OSError: - print(f"Cannot remove {date_directory} as it is not empty") try: dlq_dump_path.rmdir() except OSError: From 8d9978d2dbabab1f247178b5dfece47366e33a36 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 13 Feb 2025 09:31:00 +0000 Subject: [PATCH 3/7] Rename the tool --- pyproject.toml | 2 +- src/murfey/cli/{dlq_resubmit.py => repost_failed_calls.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/murfey/cli/{dlq_resubmit.py => repost_failed_calls.py} (100%) diff --git a/pyproject.toml b/pyproject.toml index 6a7f51750..ba02c3dbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,10 +86,10 @@ murfey = "murfey.client:run" "murfey.create_db" = "murfey.cli.create_db:run" "murfey.db_sql" = "murfey.cli.murfey_db_sql:run" "murfey.decrypt_password" = "murfey.cli.decrypt_db_password:run" -"murfey.dlq_murfey" = "murfey.cli.dlq_resubmit:run" "murfey.generate_key" = "murfey.cli.generate_crypto_key:run" "murfey.generate_password" = "murfey.cli.generate_db_password:run" "murfey.instrument_server" = "murfey.instrument_server:run" +"murfey.repost_failed_calls" = "murfey.cli.repost_failed_calls:run" "murfey.server" = "murfey.server:run" "murfey.sessions" = "murfey.cli.db_sessions:run" "murfey.simulate" = "murfey.cli.dummy:run" diff --git a/src/murfey/cli/dlq_resubmit.py b/src/murfey/cli/repost_failed_calls.py similarity index 100% rename from src/murfey/cli/dlq_resubmit.py rename to src/murfey/cli/repost_failed_calls.py From e1984a219fe35bf605769f1e1c38f2943210d2b3 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 13 Feb 2025 10:43:39 +0000 Subject: [PATCH 4/7] Should add some tests --- src/murfey/cli/repost_failed_calls.py | 2 +- tests/cli/test_repost_failed_calls.py | 225 ++++++++++++++++++++++++++ tests/conftest.py | 19 +++ 3 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 tests/cli/test_repost_failed_calls.py diff --git a/src/murfey/cli/repost_failed_calls.py b/src/murfey/cli/repost_failed_calls.py index b95809458..6859819b9 100644 --- a/src/murfey/cli/repost_failed_calls.py +++ b/src/murfey/cli/repost_failed_calls.py @@ -120,7 +120,7 @@ def run(): "-c", "--config", help="Security config file", - required=False, + required=True, ) parser.add_argument( "-u", diff --git a/tests/cli/test_repost_failed_calls.py b/tests/cli/test_repost_failed_calls.py new file mode 100644 index 000000000..d980ed094 --- /dev/null +++ b/tests/cli/test_repost_failed_calls.py @@ -0,0 +1,225 @@ +import json +import subprocess +import sys +from pathlib import Path +from queue import Empty +from unittest import mock + +from murfey.cli import repost_failed_calls +from tests.conftest import mock_security_config_name + + +@mock.patch("murfey.cli.repost_failed_calls.PikaTransport") +@mock.patch("murfey.cli.repost_failed_calls.Queue") +def test_dlq_purge(mock_queue, mock_transport, tmp_path): + """Test the dlq purging function. + Currently doesn't test saving the message, as the subscribe is mocked out""" + mock_queue().get.return_value = {"message": "dummy"} + mock_queue().get.side_effect = [None, Empty] + + exported_messages = repost_failed_calls.dlq_purge( + tmp_path / "DLQ", "dummy", tmp_path / "config_file" + ) + + # The transport should be connected to and subscribes to the queue + mock_transport.assert_called_once() + mock_transport().load_configuration_file.assert_called_with( + tmp_path / "config_file" + ) + mock_transport().connect.assert_called_once() + mock_transport().subscribe.assert_called_with( + "dlq.dummy", + mock.ANY, + acknowledgement=True, + ) + mock_transport().disconnect.assert_called_once() + + # Should read from the queue + mock_queue().get.assert_any_call(True, 0.1) + + # Ideally this test would return the message, but the partial isn't called yet + assert exported_messages == [] + + +@mock.patch("murfey.cli.repost_failed_calls.PikaTransport") +def test_handle_dlq_messages(mock_transport, tmp_path): + """Reinject some example messages""" + # Create two sample messages + messages_paths_list: list[Path] = [tmp_path / "not_a_message"] + messages_dict: dict[str, dict] = { + "msg1": { + "header": { + "x-death": [{"queue": "queue_msg1"}], + "message-id": 1, + "routing_key": "dlq.queue_msg1", + "redelivered": True, + "exchange": "", + "consumer_tag": "1", + "delivery_mode": 2, + "other_key": "value", + }, + "message": {"parameters": "msg1"}, + }, + "msg2": { + "header": {"x-death": [{"queue": "queue_msg2"}]}, + "message": {"content": "msg2"}, + }, + } + for message in messages_dict.keys(): + messages_paths_list.append(tmp_path / message) + with open(tmp_path / message, "w") as msg_file: + json.dump(messages_dict[message], msg_file) + + # Send the two messages, plus a file that is not a message + repost_failed_calls.handle_dlq_messages( + messages_path=messages_paths_list, + rabbitmq_credentials=tmp_path / "config_file", + ) + + mock_transport.assert_called_once() + mock_transport().load_configuration_file.assert_called_with( + tmp_path / "config_file" + ) + mock_transport().connect.assert_called_once() + + # Only two messages should have been sent, the rest are invalid so are skipped + assert mock_transport().send.call_count == 2 + mock_transport().send.assert_any_call( + "queue_msg1", + {"parameters": "msg1"}, + headers={ + "x-death": "[{'queue': 'queue_msg1'}]", + "other_key": "value", + "dlq-reinjected": "True", + }, + ) + mock_transport().send.assert_any_call( + "queue_msg2", + {"content": "msg2"}, + headers={"x-death": "[{'queue': 'queue_msg2'}]", "dlq-reinjected": "True"}, + ) + + # Removal and waiting + assert not (tmp_path / "msg1").is_file() + assert not (tmp_path / "msg2").is_file() + mock_transport().disconnect.assert_called_once() + + +@mock.patch("murfey.cli.repost_failed_calls.requests") +def test_handle_failed_posts(mock_requests, tmp_path): + """Test that the API is called with any failed client post messages""" + # Create some sample messages + messages_paths_list: list[Path] = [] + messages_dict: dict[str, dict] = { + "msg1": { + "message": {"url": "sample/url", "json": {"content": "msg1"}}, + }, + "msg2": { + "message": {"url": "sample/url", "json": {"content": "msg2"}}, + }, + "msg3": { + "message": {"content": "msg3"}, # not a failed client post + }, + "msg4": { + "header": {"content": "msg3"}, # does not have a message + }, + } + for message in messages_dict.keys(): + messages_paths_list.append(tmp_path / message) + with open(tmp_path / message, "w") as msg_file: + json.dump(messages_dict[message], msg_file) + + class Response: + def __init__(self, status_code): + self.status_code = status_code + + mock_requests.post.side_effect = [Response(200), Response(300)] + + repost_failed_calls.handle_failed_posts(messages_paths_list, "dummy_token") + + # Check the failed posts were resent + assert mock_requests.post.call_count == 2 + mock_requests.post.assert_any_call( + "sample/url", + json={"content": "msg1"}, + headers={"Authorization": "Bearer dummy_token"}, + ) + mock_requests.post.assert_any_call( + "sample/url", + json={"content": "msg2"}, + headers={"Authorization": "Bearer dummy_token"}, + ) + + # Check only the failed post which was successfully reinjected got deleted + assert not (tmp_path / "msg1").is_file() # got resent + assert (tmp_path / "msg2").is_file() # failed reinjection + assert (tmp_path / "msg3").is_file() # not a failed client post + assert (tmp_path / "msg4").is_file() # does not have a message + + +@mock.patch("murfey.cli.repost_failed_calls.dlq_purge") +@mock.patch("murfey.cli.repost_failed_calls.handle_failed_posts") +@mock.patch("murfey.cli.repost_failed_calls.handle_dlq_messages") +@mock.patch("murfey.cli.repost_failed_calls.jwt") +def test_run_repost_failed_calls( + mock_jwt, + mock_reinject, + mock_repost, + mock_purge, + mock_security_configuration, + tmp_path, +): + mock_jwt.encode.return_value = "dummy_token" + mock_purge.return_value = ["/path/to/msg1"] + + config_file = tmp_path / mock_security_config_name + with open(config_file) as f: + security_config = json.load(f) + + sys.argv = [ + "murfey.repost_failed_calls", + "--config", + str(config_file), + "--username", + "user", + "--dir", + "DLQ_dir", + ] + repost_failed_calls.run() + + mock_jwt.encode.assert_called_with( + {"user": "user"}, + security_config["auth_key"], + algorithm=security_config["auth_algorithm"], + ) + + mock_purge.assert_called_once_with( + Path("DLQ_dir"), + "murfey_feedback", + Path(security_config["rabbitmq_credentials"]), + ) + mock_repost.assert_called_once_with(["/path/to/msg1"], "dummy_token") + mock_reinject.assert_called_once_with( + ["/path/to/msg1"], Path(security_config["rabbitmq_credentials"]) + ) + + +def test_repost_failed_calls_exists(): + """Test the CLI is made""" + result = subprocess.run( + [ + "murfey.repost_failed_calls", + "--help", + ], + capture_output=True, + ) + assert not result.returncode + + # Find the first line of the help and strip out all the spaces and newlines + stdout_as_string = result.stdout.decode("utf8", "replace") + cleaned_help_line = ( + stdout_as_string.split("\n\n")[0].replace("\n", "").replace(" ", "") + ) + assert cleaned_help_line == ( + "usage:murfey.repost_failed_calls[-h]-cCONFIG-uUSERNAME[-dDIR]" + ) diff --git a/tests/conftest.py b/tests/conftest.py index 1b87c0217..6672188ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,5 @@ +import json + import pytest from sqlmodel import Session @@ -5,6 +7,8 @@ from murfey.util.db import clear, setup from tests import engine, url +mock_security_config_name = "security_config.yaml" + @pytest.fixture def start_postgres(): @@ -15,3 +19,18 @@ def start_postgres(): with Session(engine) as murfey_db: murfey_db.add(murfey_session) murfey_db.commit() + + +@pytest.fixture() +def mock_security_configuration(tmp_path): + config_file = tmp_path / mock_security_config_name + security_config = { + "auth_key": "auth_key", + "auth_algorithm": "auth_algorithm", + "feedback_queue": "murfey_feedback", + "rabbitmq_credentials": "/path/to/rabbitmq.yaml", + "murfey_db_credentials": "/path/to/murfey_db_credentials", + "crypto_key": "crypto_key", + } + with open(config_file, "w") as f: + json.dump(security_config, f) From a758da603010758f1c9bbe2791e1a450f0a9a0f5 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 13 Feb 2025 11:04:40 +0000 Subject: [PATCH 5/7] Try setting env in conftest --- tests/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 6672188ba..631329a98 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import json +import os import pytest from sqlmodel import Session @@ -34,3 +35,4 @@ def mock_security_configuration(tmp_path): } with open(config_file, "w") as f: json.dump(security_config, f) + os.environ["MURFEY_SECURITY_CONFIGURATION"] = str(config_file) From c9d3babd3565cc542a7d17ee2f4a3be3c236fb49 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 13 Feb 2025 12:05:57 +0000 Subject: [PATCH 6/7] Better security config reading --- src/murfey/cli/repost_failed_calls.py | 10 ++++------ tests/conftest.py | 2 -- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/murfey/cli/repost_failed_calls.py b/src/murfey/cli/repost_failed_calls.py index 6859819b9..02b38fdb9 100644 --- a/src/murfey/cli/repost_failed_calls.py +++ b/src/murfey/cli/repost_failed_calls.py @@ -1,6 +1,5 @@ import argparse import json -import os from datetime import datetime from functools import partial from pathlib import Path @@ -10,6 +9,8 @@ from jose import jwt from workflows.transport.pika_transport import PikaTransport +from murfey.util.config import security_from_file + def dlq_purge( dlq_dump_path: Path, queue: str, rabbitmq_credentials: Path @@ -133,11 +134,8 @@ def run(): ) args = parser.parse_args() - # Set the environment variable then read it by importing the security config - os.environ["MURFEY_SECURITY_CONFIGURATION"] = args.config - from murfey.util.config import get_security_config - - security_config = get_security_config() + # Read the security config file + security_config = security_from_file(args.config) # Get the token to post to the api with token = jwt.encode( diff --git a/tests/conftest.py b/tests/conftest.py index 631329a98..6672188ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,4 @@ import json -import os import pytest from sqlmodel import Session @@ -35,4 +34,3 @@ def mock_security_configuration(tmp_path): } with open(config_file, "w") as f: json.dump(security_config, f) - os.environ["MURFEY_SECURITY_CONFIGURATION"] = str(config_file) From b4eab67922f00d81b4bb68f4e253cce02372deee Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 13 Feb 2025 16:04:06 +0000 Subject: [PATCH 7/7] Small cleanups --- src/murfey/cli/repost_failed_calls.py | 4 ++-- tests/cli/test_repost_failed_calls.py | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/murfey/cli/repost_failed_calls.py b/src/murfey/cli/repost_failed_calls.py index 02b38fdb9..c8a3f46b7 100644 --- a/src/murfey/cli/repost_failed_calls.py +++ b/src/murfey/cli/repost_failed_calls.py @@ -19,14 +19,14 @@ def dlq_purge( transport.load_configuration_file(rabbitmq_credentials) transport.connect() - queue_to_purge = "dlq." + queue + queue_to_purge = f"dlq.{queue}" idlequeue: Queue = Queue() exported_messages = [] def receive_dlq_message(header: dict, message: dict) -> None: idlequeue.put_nowait("start") header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"]) - filename = dlq_dump_path / (f"{queue}-" + str(header["message-id"])) + filename = dlq_dump_path / f"{queue}-{header['message-id']}" dlqmsg = {"header": header, "message": message} with filename.open("w") as fh: json.dump(dlqmsg, fh, indent=2, sort_keys=True) diff --git a/tests/cli/test_repost_failed_calls.py b/tests/cli/test_repost_failed_calls.py index d980ed094..9a018fb4f 100644 --- a/tests/cli/test_repost_failed_calls.py +++ b/tests/cli/test_repost_failed_calls.py @@ -65,10 +65,10 @@ def test_handle_dlq_messages(mock_transport, tmp_path): "message": {"content": "msg2"}, }, } - for message in messages_dict.keys(): - messages_paths_list.append(tmp_path / message) - with open(tmp_path / message, "w") as msg_file: - json.dump(messages_dict[message], msg_file) + for file_name, message in messages_dict.items(): + messages_paths_list.append(tmp_path / file_name) + with open(tmp_path / file_name, "w") as msg_file: + json.dump(message, msg_file) # Send the two messages, plus a file that is not a message repost_failed_calls.handle_dlq_messages( @@ -124,10 +124,10 @@ def test_handle_failed_posts(mock_requests, tmp_path): "header": {"content": "msg3"}, # does not have a message }, } - for message in messages_dict.keys(): - messages_paths_list.append(tmp_path / message) - with open(tmp_path / message, "w") as msg_file: - json.dump(messages_dict[message], msg_file) + for file_name, message in messages_dict.items(): + messages_paths_list.append(tmp_path / file_name) + with open(tmp_path / file_name, "w") as msg_file: + json.dump(message, msg_file) class Response: def __init__(self, status_code):