Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions src/taskiq_sqs/broker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import contextlib
import logging
from collections.abc import AsyncGenerator, Awaitable, Callable, Generator
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from aiobotocore.session import get_session
Expand Down Expand Up @@ -30,7 +29,7 @@
class SQSBroker(AsyncBroker):
"""AWS SQS TaskIQ broker."""

def __init__( # noqa: PLR0913
def __init__(
self,
queue_name: str,
endpoint_url: str | None = None,
Expand All @@ -39,7 +38,6 @@ def __init__( # noqa: PLR0913
aws_secret_access_key: str | None = None,
wait_time_seconds: int = 0,
max_number_of_messages: int = 1,
force_ecs_container_credentials: bool = False,
) -> None:
"""Initialize the SQS broker.

Expand All @@ -50,9 +48,6 @@ def __init__( # noqa: PLR0913
:param aws_secret_access_key: The AWS secret access key.
:param: wait_time_seconds: The wait time used for long polling.
:param: max_number_of_messages: Size of batch to receive from the queue.
:param: force_ecs_container_credentials: This bypasses the normal order of operations for boto3 auth and
goes straight to using the ECS role creds from the metadata service. This can be useful in edge cases
where there are higher priority credentials you do not want to use for this service.
"""
super().__init__()

Expand Down Expand Up @@ -83,9 +78,6 @@ def __init__( # noqa: PLR0913
except ValueError as error:
raise BrokerInitError(details="Invalid default queue configuration.") from error

self._force_ecs_container_credentials = force_ecs_container_credentials
self._creds_expiration: datetime | None = None

@contextlib.contextmanager
def _handle_exceptions(self) -> Generator[None, None, None]:
"""Handle exceptions raised by the SQS client."""
Expand All @@ -104,10 +96,6 @@ def _handle_exceptions(self) -> Generator[None, None, None]:
else:
raise BrokerInitError(details=code or "") from e

@property
def _sqs_credentials_expired(self) -> datetime | bool | None:
return self._creds_expiration and self._creds_expiration < datetime.now(tz=timezone.utc)

async def _get_sqs_client(self) -> "SQSClient":
self._client_context_creator = self._session.create_client(
"sqs",
Expand Down