Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 10 additions & 29 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,25 @@ jobs:
- black
- ruff
- mypy
- stubtest
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
python-version: "3.x"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- name: Install uv
uses: astral-sh/setup-uv@v7
- name: Run lint check
uses: pre-commit/action@v3.0.0
with:
extra_args: -a ${{ matrix.cmd }}
extra_args: -a -v ${{ matrix.cmd }}
fmt:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -52,33 +60,6 @@ jobs:
with:
token: ${{secrets.GITHUB_TOKEN}}
deny: warnings
stubtest:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- uses: actions/setup-python@v6
with:
python-version: 3.x
- name: Install uv
uses: astral-sh/setup-uv@v7
- id: setup-venv
name: Setup virtualenv
run: python -m venv .venv
- name: Build lib
uses: PyO3/maturin-action@v1
with:
command: dev --uv
sccache: true
- name: Run stubtest
run: |
set -e
source .venv/bin/activate
stubtest --ignore-disjoint-bases natsrpy
pytest:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
delete-me-*

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ repos:
name: python mypy
always_run: true
pass_filenames: false
args: ["python"]
args: ["python", "examples"]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.15.7
hooks:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-nats = "0.46"
bytes = "1.11.1"
futures-util = "0.3.32"
log = "0.4.29"
pyo3 = { version = "0.28", features = ["experimental-inspect"] }
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
pyo3-log = "0.13.3"
serde = { version = "1.0.228", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ async def main() -> None:
# We use messages() to get async iterator which we
# use to get messages for push_consumer.
async for push_message in await push_consumer.messages():
print(f"[FROM_PUSH] {push_message.payload}") # noqa: T201
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
await push_message.ack()
break

# Pull consumers have to request batches of messages.
for pull_message in await pull_consumer.fetch(max_messages=10):
print(f"[FROM_PULL] {pull_message.payload}") # noqa: T201
print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201
await pull_message.ack()

# Cleanup
Expand Down
2 changes: 0 additions & 2 deletions examples/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ async def main() -> None:

await kv.delete("test-key")

# Alternatively you can
# use await watcher.next()
async for event in watcher:
print("[EVENT]", event) # noqa: T201
break
Expand Down
4 changes: 2 additions & 2 deletions examples/request_reply.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ async def main() -> None:
# Here we create responder, that will be
# answering to our requests.
async def responder(message: Message) -> None:
print(f"[REQUEST]: {message.payload}, headers={message.headers}") # noqa: T201
print(f"[REQUEST]: {message.payload!r}, headers={message.headers}") # noqa: T201
if message.reply:
await nats.publish(
message.reply,
f"reply to {message.payload}",
f"reply to {message.payload!r}",
headers=message.headers,
)

Expand Down
8 changes: 3 additions & 5 deletions examples/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main() -> None:
cb_lock = asyncio.Event()

async def callback(message: Message) -> None:
print(f"[FROM_CALLBACK] {message.payload}") # noqa: T201
print(f"[FROM_CALLBACK] {message.payload!r}") # noqa: T201
cb_lock.set()

# When subscribing you can set callback.
Expand All @@ -21,8 +21,6 @@ async def callback(message: Message) -> None:

# When callback is not set, you get a subscription
# that should be used along with `async for`
# loop, or alternatively you can call
# `await iter_sub.next()` to get a single message.
iter_sub = await nats.subscribe("iter-subj")

# Subscriptions with queue argument create
Expand All @@ -40,10 +38,10 @@ async def callback(message: Message) -> None:
await queue_sub.unsubscribe(limit=1)

async for message in iter_sub:
print(f"[FROM_ITERATOR] {message.payload}") # noqa: T201
print(f"[FROM_ITERATOR] {message.payload!r}") # noqa: T201

async for message in queue_sub:
print(f"[FROM_QUEUED] {message.payload}") # noqa: T201
print(f"[FROM_QUEUED] {message.payload!r}") # noqa: T201

# Making sure that the message in callback is received.
await cb_lock.wait()
Expand Down
23 changes: 20 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ dynamic = ["version"]
name = "Pavel Kirilin"
email = "s3riussan@gmail.com"

[project.entry-points.opentelemetry_instrumentor]
natsrpy = "natsrpy.instrumentation:NatsrpyInstrumentor"

[project.optional-dependencies]
opentelemetry = [
"opentelemetry-api (>=1.38.0,<2.0.0)",
"opentelemetry-instrumentation (>=0.59b0,<1)",
"opentelemetry-semantic-conventions (>=0.59b0,<1)",
]

[dependency-groups]
dev = [
"anyio>=4,<5",
Expand All @@ -38,9 +48,6 @@ dev = [
requires = ["maturin>=1.12,<2.0"]
build-backend = "maturin"

[tool.uv]
package = false

[tool.maturin]
bindings = "pyo3"
features = ["pyo3/extension-module"]
Expand All @@ -55,11 +62,18 @@ packages = ["natsrpy"]
pretty = true
implicit_reexport = true
allow_untyped_decorators = true
namespace_packages = true
warn_return_any = false

[tool.pytest]
anyio_mode = "auto"

[tool.coverage.run]
omit = [
"python/tests/**/*",
"python/natsrpy/instrumentation/**/*",
]

[tool.ruff]
target-version = "py310"
exclude = [".venv/"]
Expand Down Expand Up @@ -128,3 +142,6 @@ ignore-decorators = ["typing.overload"]

[tool.ruff.lint.pylint]
allow-magic-value-types = ["int", "str", "float"]

[tool.uv]
package = false
37 changes: 19 additions & 18 deletions python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,12 @@ class Message:
description: str | None
length: int

def __len__(self) -> int: ...

@final
class IteratorSubscription:
"""Async iterator subscription for receiving NATS messages.

Returned by :meth:`Nats.subscribe` when no callback is provided.
Messages can be received using ``async for`` or by calling :meth:`next`
directly.
"""

def __aiter__(self) -> IteratorSubscription: ...
def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[Message]: ...
def next(self, timeout: float | timedelta | None = None) -> Future[Message]:
"""Receive the next message from the subscription.

:param timeout: maximum time to wait for a message in seconds
or as a timedelta, defaults to None (wait indefinitely).
:return: the next message.
:raises StopAsyncIteration: when the subscription is drained or
unsubscribed.
"""

def unsubscribe(self, limit: int | None = None) -> Future[None]:
"""Unsubscribe from the subject.

Expand Down Expand Up @@ -96,6 +81,22 @@ class Nats:
access over a connection to one or more NATS servers.
"""

@property
def addr(self) -> list[str]: ...
@property
def user_and_pass(self) -> tuple[str, str]: ...
@property
def nkey(self) -> str | None: ...
@property
def token(self) -> str | None: ...
@property
def custom_inbox_prefix(self) -> str | None: ...
@property
def read_buffer_capacity(self) -> int: ...
@property
def sender_capacity(self) -> int: ...
@property
def max_reconnects(self) -> int | None: ...
def __new__(
cls,
/,
Expand Down
6 changes: 6 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class JetStreamMessage:
def token(self) -> str | None:
"""Authentication token, if applicable."""

@property
def length(self) -> int:
"""Message's payload length."""

def ack(self, double: bool = False) -> Future[None]:
"""Acknowledge that a message was handled.

Expand Down Expand Up @@ -208,3 +212,5 @@ class JetStreamMessage:

:param double: whether to wait for server response, defaults to False.
"""

def __len__(self) -> int: ...
10 changes: 0 additions & 10 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,6 @@ class MessagesIterator:

def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[JetStreamMessage]: ...
def next(
self,
timeout: float | timedelta | None = None,
) -> Future[JetStreamMessage]:
"""Receive the next message from the consumer.

:param timeout: maximum time to wait in seconds or as a timedelta,
defaults to None (wait indefinitely).
:return: the next JetStream message.
"""

@final
class PushConsumer:
Expand Down
14 changes: 0 additions & 14 deletions python/natsrpy/_natsrpy_rs/js/kv.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -90,27 +90,13 @@ class KVEntryIterator:

def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[KVEntry]: ...
def next(self, timeout: float | timedelta | None = None) -> Future[KVEntry]:
"""Receive the next key-value entry.

:param timeout: maximum time to wait in seconds or as a timedelta,
defaults to None (wait indefinitely).
:return: the next entry.
"""

@final
class KeysIterator:
"""Async iterator over key-value bucket keys."""

def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[str]: ...
def next(self, timeout: float | timedelta | None = None) -> Future[str]:
"""Receive the next key.

:param timeout: maximum time to wait in seconds or as a timedelta,
defaults to None (wait indefinitely).
:return: the next key name.
"""

@final
class KVConfig:
Expand Down
21 changes: 0 additions & 21 deletions python/natsrpy/_natsrpy_rs/js/managers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ class ConsumersIterator:

def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[PullConsumer | PushConsumer]: ...
def next(
self,
timeout: float | timedelta | None = None,
) -> Future[PullConsumer | PushConsumer]:
"""Receive the next consumer from the stream.

:param timeout: maximum time to wait for a message in seconds
or as a timedelta, defaults to None (wait indefinitely).
:return: the next consumer.
:raises StopAsyncIteration: when the subscription is drained or
unsubscribed.
"""

@final
class ConsumersNamesIterator:
Expand All @@ -63,15 +51,6 @@ class ConsumersNamesIterator:

def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[str]: ...
def next(self, timeout: float | timedelta | None = None) -> Future[str]:
"""Receive the next consumer name from the stream.

:param timeout: maximum time to wait for a message in seconds
or as a timedelta, defaults to None (wait indefinitely).
:return: the next consumer name.
:raises StopAsyncIteration: when the subscription is drained or
unsubscribed.
"""

@final
class StreamsManager:
Expand Down
7 changes: 0 additions & 7 deletions python/natsrpy/_natsrpy_rs/js/object_store.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ class ObjectInfoIterator:

def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[ObjectInfo]: ...
def next(self, timeout: float | timedelta | None = None) -> Future[ObjectInfo]:
"""Receive the next object info entry.

:param timeout: maximum time to wait in seconds or as a timedelta,
defaults to None (wait indefinitely).
:return: the next object info.
"""

@final
class ObjectStore:
Expand Down
Loading
Loading