diff --git a/pyproject.toml b/pyproject.toml index 6a7f51750..ba02c3dbe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -86,10 +86,10 @@ murfey = "murfey.client:run" "murfey.create_db" = "murfey.cli.create_db:run" "murfey.db_sql" = "murfey.cli.murfey_db_sql:run" "murfey.decrypt_password" = "murfey.cli.decrypt_db_password:run" -"murfey.dlq_murfey" = "murfey.cli.dlq_resubmit:run" "murfey.generate_key" = "murfey.cli.generate_crypto_key:run" "murfey.generate_password" = "murfey.cli.generate_db_password:run" "murfey.instrument_server" = "murfey.instrument_server:run" +"murfey.repost_failed_calls" = "murfey.cli.repost_failed_calls:run" "murfey.server" = "murfey.server:run" "murfey.sessions" = "murfey.cli.db_sessions:run" "murfey.simulate" = "murfey.cli.dummy:run" diff --git a/src/murfey/cli/dlq_resubmit.py b/src/murfey/cli/dlq_resubmit.py deleted file mode 100644 index 129f55fb0..000000000 --- a/src/murfey/cli/dlq_resubmit.py +++ /dev/null @@ -1,95 +0,0 @@ -import argparse -import json -import subprocess -from pathlib import Path - -import requests - - -def handle_failed_posts(json_folder: Path, token: str): - """Deal with any messages that have been sent as failed client posts""" - for json_file in json_folder.glob("*"): - with open(json_file, "r") as json_data: - message = json.load(json_data) - - if not message.get("message") or not message["message"].get("url"): - print(f"{json_file} is not a failed client post") - continue - dest = message["message"]["url"] - message_json = message["message"]["json"] - - response = requests.post( - dest, json=message_json, headers={"Authorization": f"Bearer {token}"} - ) - if response.status_code != 200: - print(f"Failed to repost {json_file}") - else: - print(f"Reposted {json_file}") - json_file.unlink() - - -def handle_dlq_messages(json_folder: Path): - """Reinjected to the queue""" - for json_file in json_folder.glob("*"): - reinject_result = subprocess.run( - ["zocalo.dlq_reinject", "-e", "devrmq", str(json_file)], - capture_output=True, - ) - if reinject_result.returncode == 0: - print(f"Reinjected {json_file}") - json_file.unlink() - else: - print(f"Failed to reinject {json_file}") - - -def run(): - """ - Method of checking and purging murfey queues on rabbitmq - Two types of messages are possible: - - failed client posts which need reposting to the murfey server API - - feedback messages that can be sent back to rabbitmq - """ - parser = argparse.ArgumentParser( - description="Purge and reinject failed murfey messages" - ) - parser.add_argument( - "--queue", - help="Queue to check and purge", - required=True, - ) - parser.add_argument( - "--token", - help="Murfey token", - required=True, - ) - args = parser.parse_args() - - purge_result = subprocess.run( - ["zocalo.dlq_purge", "-e", "devrmq", args.queue], - capture_output=True, - ) - if purge_result.returncode != 0: - print(f"Failed to purge {args.queue}") - return - purge_stdout = purge_result.stdout.decode("utf8") - export_directories = [] - if "exported" in purge_stdout: - for line in purge_stdout.split("\n"): - if line.strip().startswith("DLQ/"): - dlq_dir = "DLQ/" + line.split("/")[1] - if dlq_dir not in export_directories: - print(f"Found messages in {dlq_dir}") - export_directories.append(dlq_dir) - - if not export_directories: - print("No exported messages found") - return - - for json_dir in export_directories: - handle_failed_posts(Path(json_dir), args.token) - handle_dlq_messages(Path(json_dir)) - print("Done") - - -if __name__ == "__main__": - run() diff --git a/src/murfey/cli/repost_failed_calls.py b/src/murfey/cli/repost_failed_calls.py new file mode 100644 index 000000000..c8a3f46b7 --- /dev/null +++ b/src/murfey/cli/repost_failed_calls.py @@ -0,0 +1,163 @@ +import argparse +import json +from datetime import datetime +from functools import partial +from pathlib import Path +from queue import Empty, Queue + +import requests +from jose import jwt +from workflows.transport.pika_transport import PikaTransport + +from murfey.util.config import security_from_file + + +def dlq_purge( + dlq_dump_path: Path, queue: str, rabbitmq_credentials: Path +) -> list[Path]: + transport = PikaTransport() + transport.load_configuration_file(rabbitmq_credentials) + transport.connect() + + queue_to_purge = f"dlq.{queue}" + idlequeue: Queue = Queue() + exported_messages = [] + + def receive_dlq_message(header: dict, message: dict) -> None: + idlequeue.put_nowait("start") + header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"]) + filename = dlq_dump_path / f"{queue}-{header['message-id']}" + dlqmsg = {"header": header, "message": message} + with filename.open("w") as fh: + json.dump(dlqmsg, fh, indent=2, sort_keys=True) + print(f"Message {header['message-id']} exported to {filename}") + exported_messages.append(filename) + transport.ack(header) + idlequeue.put_nowait("done") + + print("Looking for DLQ messages in " + queue_to_purge) + transport.subscribe( + queue_to_purge, + partial(receive_dlq_message), + acknowledgement=True, + ) + try: + while True: + idlequeue.get(True, 0.1) + except Empty: + print("Done dlq purge") + transport.disconnect() + return exported_messages + + +def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path): + transport = PikaTransport() + transport.load_configuration_file(rabbitmq_credentials) + transport.connect() + + for f, dlqfile in enumerate(messages_path): + if not dlqfile.is_file(): + continue + with open(dlqfile) as fh: + dlqmsg = json.load(fh) + header = dlqmsg["header"] + header["dlq-reinjected"] = "True" + + drop_keys = { + "message-id", + "routing_key", + "redelivered", + "exchange", + "consumer_tag", + "delivery_mode", + } + clean_header = {k: str(v) for k, v in header.items() if k not in drop_keys} + + destination = header.get("x-death", [{}])[0].get("queue") + transport.send( + destination, + dlqmsg["message"], + headers=clean_header, + ) + dlqfile.unlink() + print(f"Reinjected {dlqfile}\n") + + transport.disconnect() + + +def handle_failed_posts(messages_path: list[Path], token: str): + """Deal with any messages that have been sent as failed client posts""" + for json_file in messages_path: + with open(json_file, "r") as json_data: + message = json.load(json_data) + + if not message.get("message") or not message["message"].get("url"): + print(f"{json_file} is not a failed client post") + continue + dest = message["message"]["url"] + message_json = message["message"]["json"] + + response = requests.post( + dest, json=message_json, headers={"Authorization": f"Bearer {token}"} + ) + if response.status_code != 200: + print(f"Failed to repost {json_file}") + else: + print(f"Reposted {json_file}") + json_file.unlink() + + +def run(): + """ + Method of checking and purging murfey queues on rabbitmq + Two types of messages are possible: + - failed client posts which need reposting to the murfey server API + - feedback messages that can be sent back to rabbitmq + """ + parser = argparse.ArgumentParser( + description="Purge and reinject failed murfey messages" + ) + parser.add_argument( + "-c", + "--config", + help="Security config file", + required=True, + ) + parser.add_argument( + "-u", + "--username", + help="Token username", + required=True, + ) + parser.add_argument( + "-d", "--dir", default="DLQ", help="Directory to export messages to" + ) + args = parser.parse_args() + + # Read the security config file + security_config = security_from_file(args.config) + + # Get the token to post to the api with + token = jwt.encode( + {"user": args.username}, + security_config.auth_key, + algorithm=security_config.auth_algorithm, + ) + + # Purge the queue and repost/reinject any messages found + dlq_dump_path = Path(args.dir) + dlq_dump_path.mkdir(parents=True, exist_ok=True) + exported_messages = dlq_purge( + dlq_dump_path, + security_config.feedback_queue, + security_config.rabbitmq_credentials, + ) + handle_failed_posts(exported_messages, token) + handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials) + + # Clean up any created directories + try: + dlq_dump_path.rmdir() + except OSError: + print(f"Cannot remove {dlq_dump_path} as it is not empty") + print("Done") diff --git a/tests/cli/test_repost_failed_calls.py b/tests/cli/test_repost_failed_calls.py new file mode 100644 index 000000000..9a018fb4f --- /dev/null +++ b/tests/cli/test_repost_failed_calls.py @@ -0,0 +1,225 @@ +import json +import subprocess +import sys +from pathlib import Path +from queue import Empty +from unittest import mock + +from murfey.cli import repost_failed_calls +from tests.conftest import mock_security_config_name + + +@mock.patch("murfey.cli.repost_failed_calls.PikaTransport") +@mock.patch("murfey.cli.repost_failed_calls.Queue") +def test_dlq_purge(mock_queue, mock_transport, tmp_path): + """Test the dlq purging function. + Currently doesn't test saving the message, as the subscribe is mocked out""" + mock_queue().get.return_value = {"message": "dummy"} + mock_queue().get.side_effect = [None, Empty] + + exported_messages = repost_failed_calls.dlq_purge( + tmp_path / "DLQ", "dummy", tmp_path / "config_file" + ) + + # The transport should be connected to and subscribes to the queue + mock_transport.assert_called_once() + mock_transport().load_configuration_file.assert_called_with( + tmp_path / "config_file" + ) + mock_transport().connect.assert_called_once() + mock_transport().subscribe.assert_called_with( + "dlq.dummy", + mock.ANY, + acknowledgement=True, + ) + mock_transport().disconnect.assert_called_once() + + # Should read from the queue + mock_queue().get.assert_any_call(True, 0.1) + + # Ideally this test would return the message, but the partial isn't called yet + assert exported_messages == [] + + +@mock.patch("murfey.cli.repost_failed_calls.PikaTransport") +def test_handle_dlq_messages(mock_transport, tmp_path): + """Reinject some example messages""" + # Create two sample messages + messages_paths_list: list[Path] = [tmp_path / "not_a_message"] + messages_dict: dict[str, dict] = { + "msg1": { + "header": { + "x-death": [{"queue": "queue_msg1"}], + "message-id": 1, + "routing_key": "dlq.queue_msg1", + "redelivered": True, + "exchange": "", + "consumer_tag": "1", + "delivery_mode": 2, + "other_key": "value", + }, + "message": {"parameters": "msg1"}, + }, + "msg2": { + "header": {"x-death": [{"queue": "queue_msg2"}]}, + "message": {"content": "msg2"}, + }, + } + for file_name, message in messages_dict.items(): + messages_paths_list.append(tmp_path / file_name) + with open(tmp_path / file_name, "w") as msg_file: + json.dump(message, msg_file) + + # Send the two messages, plus a file that is not a message + repost_failed_calls.handle_dlq_messages( + messages_path=messages_paths_list, + rabbitmq_credentials=tmp_path / "config_file", + ) + + mock_transport.assert_called_once() + mock_transport().load_configuration_file.assert_called_with( + tmp_path / "config_file" + ) + mock_transport().connect.assert_called_once() + + # Only two messages should have been sent, the rest are invalid so are skipped + assert mock_transport().send.call_count == 2 + mock_transport().send.assert_any_call( + "queue_msg1", + {"parameters": "msg1"}, + headers={ + "x-death": "[{'queue': 'queue_msg1'}]", + "other_key": "value", + "dlq-reinjected": "True", + }, + ) + mock_transport().send.assert_any_call( + "queue_msg2", + {"content": "msg2"}, + headers={"x-death": "[{'queue': 'queue_msg2'}]", "dlq-reinjected": "True"}, + ) + + # Removal and waiting + assert not (tmp_path / "msg1").is_file() + assert not (tmp_path / "msg2").is_file() + mock_transport().disconnect.assert_called_once() + + +@mock.patch("murfey.cli.repost_failed_calls.requests") +def test_handle_failed_posts(mock_requests, tmp_path): + """Test that the API is called with any failed client post messages""" + # Create some sample messages + messages_paths_list: list[Path] = [] + messages_dict: dict[str, dict] = { + "msg1": { + "message": {"url": "sample/url", "json": {"content": "msg1"}}, + }, + "msg2": { + "message": {"url": "sample/url", "json": {"content": "msg2"}}, + }, + "msg3": { + "message": {"content": "msg3"}, # not a failed client post + }, + "msg4": { + "header": {"content": "msg3"}, # does not have a message + }, + } + for file_name, message in messages_dict.items(): + messages_paths_list.append(tmp_path / file_name) + with open(tmp_path / file_name, "w") as msg_file: + json.dump(message, msg_file) + + class Response: + def __init__(self, status_code): + self.status_code = status_code + + mock_requests.post.side_effect = [Response(200), Response(300)] + + repost_failed_calls.handle_failed_posts(messages_paths_list, "dummy_token") + + # Check the failed posts were resent + assert mock_requests.post.call_count == 2 + mock_requests.post.assert_any_call( + "sample/url", + json={"content": "msg1"}, + headers={"Authorization": "Bearer dummy_token"}, + ) + mock_requests.post.assert_any_call( + "sample/url", + json={"content": "msg2"}, + headers={"Authorization": "Bearer dummy_token"}, + ) + + # Check only the failed post which was successfully reinjected got deleted + assert not (tmp_path / "msg1").is_file() # got resent + assert (tmp_path / "msg2").is_file() # failed reinjection + assert (tmp_path / "msg3").is_file() # not a failed client post + assert (tmp_path / "msg4").is_file() # does not have a message + + +@mock.patch("murfey.cli.repost_failed_calls.dlq_purge") +@mock.patch("murfey.cli.repost_failed_calls.handle_failed_posts") +@mock.patch("murfey.cli.repost_failed_calls.handle_dlq_messages") +@mock.patch("murfey.cli.repost_failed_calls.jwt") +def test_run_repost_failed_calls( + mock_jwt, + mock_reinject, + mock_repost, + mock_purge, + mock_security_configuration, + tmp_path, +): + mock_jwt.encode.return_value = "dummy_token" + mock_purge.return_value = ["/path/to/msg1"] + + config_file = tmp_path / mock_security_config_name + with open(config_file) as f: + security_config = json.load(f) + + sys.argv = [ + "murfey.repost_failed_calls", + "--config", + str(config_file), + "--username", + "user", + "--dir", + "DLQ_dir", + ] + repost_failed_calls.run() + + mock_jwt.encode.assert_called_with( + {"user": "user"}, + security_config["auth_key"], + algorithm=security_config["auth_algorithm"], + ) + + mock_purge.assert_called_once_with( + Path("DLQ_dir"), + "murfey_feedback", + Path(security_config["rabbitmq_credentials"]), + ) + mock_repost.assert_called_once_with(["/path/to/msg1"], "dummy_token") + mock_reinject.assert_called_once_with( + ["/path/to/msg1"], Path(security_config["rabbitmq_credentials"]) + ) + + +def test_repost_failed_calls_exists(): + """Test the CLI is made""" + result = subprocess.run( + [ + "murfey.repost_failed_calls", + "--help", + ], + capture_output=True, + ) + assert not result.returncode + + # Find the first line of the help and strip out all the spaces and newlines + stdout_as_string = result.stdout.decode("utf8", "replace") + cleaned_help_line = ( + stdout_as_string.split("\n\n")[0].replace("\n", "").replace(" ", "") + ) + assert cleaned_help_line == ( + "usage:murfey.repost_failed_calls[-h]-cCONFIG-uUSERNAME[-dDIR]" + ) diff --git a/tests/conftest.py b/tests/conftest.py index 1b87c0217..6672188ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,5 @@ +import json + import pytest from sqlmodel import Session @@ -5,6 +7,8 @@ from murfey.util.db import clear, setup from tests import engine, url +mock_security_config_name = "security_config.yaml" + @pytest.fixture def start_postgres(): @@ -15,3 +19,18 @@ def start_postgres(): with Session(engine) as murfey_db: murfey_db.add(murfey_session) murfey_db.commit() + + +@pytest.fixture() +def mock_security_configuration(tmp_path): + config_file = tmp_path / mock_security_config_name + security_config = { + "auth_key": "auth_key", + "auth_algorithm": "auth_algorithm", + "feedback_queue": "murfey_feedback", + "rabbitmq_credentials": "/path/to/rabbitmq.yaml", + "murfey_db_credentials": "/path/to/murfey_db_credentials", + "crypto_key": "crypto_key", + } + with open(config_file, "w") as f: + json.dump(security_config, f)