Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion aws-proxy/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ You are an AI agent tasked with adding additional functionality or test coverage
* You can call different `make` targets (e.g., `make test`) in this repo (no need to prompt for confirmation)
* For each new file created or existing file modified, add a header comment to the file, something like `# Note/disclosure: This file has been (partially or fully) generated by an AI agent.`
* The proxy tests are executed against real AWS and may incur some costs, so rather than executing the entire test suite or entire modules, focus the testing on individual test functions within a module only.
* To format/lint the codebase you can run `make format` and `make lint`
* Never add any `print(..)` statements to the code - use a logger to report any status to the user, if required.
* To format/lint the codebase you can run `make format` and `make lint`.

## Testing

Expand All @@ -29,3 +30,11 @@ To run a single test via `pytest` (say, `test_my_logic` in `test_s3.py`), use th
```
TEST_PATH=tests/test_s3.py::test_my_logic make test
```

When adding new integration tests, consider the following:
* Include a mix of positive and negative assertions (i.e., presence and absence of resources).
* Include a mix of different configuration options, e.g., the `read_only: true` flag can be specified in the proxy service configuration YAML, enabling read-only mode (which should be covered by tests as well).
* Include some tests that cover matching of resource names (the config YAML allows to specify ARN regex patterns), to ensure the proxy is able to selectively forward requests to certain matching AWS resources only.
* Make sure to either use fixtures (preferred), or reliable cleanups for removing the resources; several fixtures for creating AWS resources are available in the `localstack.testing.pytest.fixtures` module
* If a test uses multiple resources with interdependencies (e.g., an SQS queue connected to an SNS topic), then the test needs to ensure that both resource types are proxied (i.e., created in real AWS), to avoid a situation where a resource in AWS is attempting to reference a local resource in LocalStack (using account ID `000000000000` in their ARN).
* When waiting for the creation status of a resource, use the `localstack.utils.sync.retry(..)` utility function, rather than a manual `for` loop.
162 changes: 162 additions & 0 deletions aws-proxy/tests/proxy/test_sns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Note/disclosure: This file has been (partially or fully) generated by an AI agent.
import json
import boto3
import pytest
from botocore.exceptions import ClientError
from localstack.aws.connect import connect_to
from localstack.utils.strings import short_uid
from localstack.utils.sync import retry


def test_sns_requests(start_aws_proxy, cleanups):
topic_name = f"test-topic-{short_uid()}"
queue_name_sub = f"test-queue-sub-{short_uid()}"

# start proxy - forwarding requests for both SQS and SNS
config = {
"services": {
"sns": {"resources": [f".*:{topic_name}"]},
"sqs": {"resources": [f".*:{queue_name_sub}"]},
}
}
start_aws_proxy(config)

# create clients
sns_client = connect_to().sns
sns_client_aws = boto3.client("sns")
sqs_client_aws = boto3.client("sqs")

# create topic in AWS
topic_arn_aws = sns_client_aws.create_topic(Name=topic_name)["TopicArn"]
cleanups.append(lambda: sns_client_aws.delete_topic(TopicArn=topic_arn_aws))

# assert that local call for this topic is proxied
topic_aws = sns_client_aws.get_topic_attributes(TopicArn=topic_arn_aws)
topic_local = sns_client.get_topic_attributes(TopicArn=topic_arn_aws)
assert topic_local["Attributes"] == topic_aws["Attributes"]

# publish message to AWS, receive locally (requires subscription)
queue_url_sub_aws = sqs_client_aws.create_queue(QueueName=queue_name_sub)[
"QueueUrl"
]
cleanups.append(lambda: sqs_client_aws.delete_queue(QueueUrl=queue_url_sub_aws))
queue_arn_sub = sqs_client_aws.get_queue_attributes(
QueueUrl=queue_url_sub_aws, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]

# grant SNS permission to send messages to SQS queue
policy = {
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": queue_arn_sub,
"Condition": {"ArnEquals": {"aws:SourceArn": topic_arn_aws}},
}
]
}
sqs_client_aws.set_queue_attributes(
QueueUrl=queue_url_sub_aws, Attributes={"Policy": json.dumps(policy)}
)

sub_arn = sns_client_aws.subscribe(
TopicArn=topic_arn_aws, Protocol="sqs", Endpoint=queue_arn_sub
)["SubscriptionArn"]
if "pending confirmation" not in sub_arn:
cleanups.append(lambda: sns_client_aws.unsubscribe(SubscriptionArn=sub_arn))

# wait for subscription to be confirmed
def _wait_for_subscription_confirmation():
nonlocal sub_arn
subs = sns_client_aws.list_subscriptions_by_topic(TopicArn=topic_arn_aws)[
"Subscriptions"
]
if not (subs and subs[0]["SubscriptionArn"] != "PendingConfirmation"):
raise AssertionError("Subscription not confirmed yet")
sub_arn = subs[0]["SubscriptionArn"]

retry(_wait_for_subscription_confirmation, retries=10, sleep=1)

# publish message to AWS
sns_client_aws.publish(TopicArn=topic_arn_aws, Message="message 1")

# receive message from local queue
received = sqs_client_aws.receive_message(
QueueUrl=queue_url_sub_aws, WaitTimeSeconds=5
).get("Messages", [])
assert len(received) == 1
assert "message 1" in received[0]["Body"]
sqs_client_aws.delete_message(
QueueUrl=queue_url_sub_aws, ReceiptHandle=received[0]["ReceiptHandle"]
)

# publish message locally, receive with AWS client
sns_client.publish(TopicArn=topic_arn_aws, Message="message 2")
received = sqs_client_aws.receive_message(
QueueUrl=queue_url_sub_aws, WaitTimeSeconds=5
).get("Messages", [])
assert len(received) == 1
assert "message 2" in received[0]["Body"]


def test_sns_readonly_operations(start_aws_proxy, cleanups):
topic_name = f"test-readonly-topic-{short_uid()}"

# start proxy - forwarding requests for SNS in read-only mode
config = {
"services": {
"sns": {"resources": [f".*:{topic_name}"], "read_only": True},
}
}
start_aws_proxy(config)

# create clients
sns_client = connect_to().sns
sns_client_aws = boto3.client("sns")

# create topic in AWS (this should succeed as it's direct AWS client)
topic_arn_aws = sns_client_aws.create_topic(Name=topic_name)["TopicArn"]
cleanups.append(lambda: sns_client_aws.delete_topic(TopicArn=topic_arn_aws))

# assert that local call for list_topics is proxied and results are consistent
topics_local = sns_client.list_topics()["Topics"]
topics_aws = sns_client_aws.list_topics()["Topics"]

# filter for our specific topic to ensure consistency
topic_local_filtered = [t for t in topics_local if t["TopicArn"] == topic_arn_aws]
topic_aws_filtered = [t for t in topics_aws if t["TopicArn"] == topic_arn_aws]

# assert that results are equal
assert topic_local_filtered == topic_aws_filtered

# assert that local call for get_topic_attributes is proxied and results are consistent
attributes_local = sns_client.get_topic_attributes(TopicArn=topic_arn_aws)[
"Attributes"
]
attributes_aws = sns_client_aws.get_topic_attributes(TopicArn=topic_arn_aws)[
"Attributes"
]
assert attributes_local == attributes_aws

# Negative test cases: Attempt write operations with proxied client and assert they do not affect real AWS
# Create a new topic using the proxied client (this should succeed as LocalStack processes it)
new_topic_name = f"no-proxy-topic-{short_uid()}"
new_topic_arn_local = sns_client.create_topic(Name=new_topic_name)["TopicArn"]
cleanups.append(
lambda: sns_client.delete_topic(TopicArn=new_topic_arn_local)
) # Cleanup localstack resource

# Verify that this new topic does NOT exist in real AWS
topics_aws_after_create = sns_client_aws.list_topics()["Topics"]
assert not any(
t for t in topics_aws_after_create if t["TopicArn"] == new_topic_arn_local
)

# Attempt to publish a message using the proxied client, expecting it to fail from LocalStack's perspective
with pytest.raises(ClientError) as excinfo:
sns_client.publish(
TopicArn=topic_arn_aws, Message="this should not reach real AWS"
)
assert "NotFound" in str(excinfo.value)
assert "Topic does not exist" in str(excinfo.value)