Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions pulsar/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,107 @@ def consumer_name(self) -> str:
"""
return self._consumer.consumer_name()

class Reader:
"""
The Pulsar topic reader, used to read messages from a topic.
"""

def __init__(self, reader: _pulsar.Reader, schema: pulsar.schema.Schema) -> None:
"""
Create the reader.
Users should not call this constructor directly. Instead, create the
reader via ``Client.create_reader``.

Parameters
----------
reader: _pulsar.Reader
The underlying Reader object from the C extension.
schema: pulsar.schema.Schema
The schema of the data that will be received by this reader.
"""
self._reader = reader
self._schema = schema

async def read_next(self) -> pulsar.Message:
"""
Read a single message asynchronously.

Returns
-------
pulsar.Message
The message received.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._reader.read_next_async(functools.partial(_set_future, future))
msg = await future
Comment thread
BewareMyPower marked this conversation as resolved.
m = pulsar.Message()
m._message = msg
m._schema = self._schema
return m

async def has_message_available(self) -> bool:
"""
Check if there is any message available to read from the current
position.
"""
future = asyncio.get_running_loop().create_future()
self._reader.has_message_available_async(functools.partial(_set_future, future))
return await future

async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
"""
Reset this reader to a specific message id or publish timestamp
asynchronously.

Parameters
----------
messageid : MessageId or int
The message id for seek, OR an integer event time (timestamp) to
seek to.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
if isinstance(messageid, pulsar.MessageId):
msg_id = messageid._msg_id
elif isinstance(messageid, int):
msg_id = messageid
else:
raise ValueError(f"invalid messageid type {type(messageid)}")
self._reader.seek_async(msg_id, functools.partial(_set_future, future, value=None))
await future

async def close(self) -> None:
"""
Close the reader asynchronously.

Raises
------
PulsarException
"""
future = asyncio.get_running_loop().create_future()
self._reader.close_async(functools.partial(_set_future, future, value=None))
await future

def topic(self) -> str:
"""
Return the topic this reader is reading from.
"""
return self._reader.topic()

def is_connected(self) -> bool:
"""
Check if the reader is connected or not.
"""
return self._reader.is_connected()


class Client:
"""
The asynchronous version of `pulsar.Client`.
Expand Down Expand Up @@ -777,6 +878,93 @@ async def subscribe(self, topic: Union[str, List[str]],
schema.attach_client(self._client)
return Consumer(await future, schema)

# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
async def create_reader(self, topic: str,
start_message_id: Union[pulsar.MessageId, _pulsar.MessageId],
schema: pulsar.schema.Schema | None = None,
receiver_queue_size: int = 1000,
reader_name: str | None = None,
subscription_role_prefix: str | None = None,
is_read_compacted: bool = False,
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
start_message_id_inclusive: bool = False,
crypto_failure_action: ConsumerCryptoFailureAction =
ConsumerCryptoFailureAction.FAIL,
) -> Reader:
"""
Create a reader on a particular topic.

Parameters
----------
topic: str
The name of the topic.
start_message_id: MessageId or _pulsar.MessageId
The initial reader positioning is done by specifying a message id.
The options are:

* ``MessageId.earliest``: Start reading from the earliest message
available in the topic.
* ``MessageId.latest``: Start reading from the end topic, only
getting messages published after the reader was created.
* ``MessageId``: When passing a particular message id, the reader
will position itself on that specific position.
schema: pulsar.schema.Schema | None, default=None
Define the schema of the data that will be received by this reader.
receiver_queue_size: int, default=1000
Sets the size of the reader receive queue.
reader_name: str | None, default=None
Sets the reader name.
subscription_role_prefix: str | None, default=None
Sets the subscription role prefix.
is_read_compacted: bool, default=False
Selects whether to read the compacted version of the topic.
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
Symmetric encryption class implementation.
start_message_id_inclusive: bool, default=False
Set the reader to include the startMessageId or given position of
any reset operation like Reader.seek.
crypto_failure_action: ConsumerCryptoFailureAction, \
default=ConsumerCryptoFailureAction.FAIL
Set the behavior when the decryption fails.

Returns
-------
Reader
The reader created

Raises
------
PulsarException
"""
if schema is None:
schema = pulsar.schema.BytesSchema()

if isinstance(start_message_id, pulsar.MessageId):
start_message_id = start_message_id._msg_id

_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')

conf = _pulsar.ReaderConfiguration()
conf.receiver_queue_size(receiver_queue_size)
if reader_name is not None:
conf.reader_name(reader_name)
if subscription_role_prefix is not None:
conf.subscription_role_prefix(subscription_role_prefix)
conf.schema(schema.schema_info())
conf.read_compacted(is_read_compacted)
if crypto_key_reader is not None:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.start_message_id_inclusive(start_message_id_inclusive)
conf.crypto_failure_action(crypto_failure_action)

future = asyncio.get_running_loop().create_future()
self._client.create_reader_async_v2(
topic, start_message_id, conf, functools.partial(_set_future_v2, future)
)
reader = await future
schema.attach_client(self._client)
return Reader(reader, schema)

def shutdown(self) -> None:
"""
Shutdown the client and all the associated producers and consumers
Expand Down
14 changes: 14 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ Reader Client_createReader(Client& client, const std::string& topic, const Messa
[&](ReaderCallback callback) { client.createReaderAsync(topic, startMessageId, conf, callback); });
}

void Client_createReaderAsync(Client& client, const std::string& topic, const MessageId& startMessageId,
ReaderConfiguration conf, ReaderCallback callback) {
py::gil_scoped_release release;
client.createReaderAsync(topic, startMessageId, conf, callback);
}

void Client_createReaderAsyncV2(Client& client, const std::string& topic, const MessageId& startMessageId,
ReaderConfiguration conf, ReaderV2Callback callback) {
py::gil_scoped_release release;
client.createReaderAsyncV2(topic, startMessageId, conf, std::move(callback));
}

std::vector<std::string> Client_getTopicPartitions(Client& client, const std::string& topic) {
return waitForAsyncValue<std::vector<std::string>>(
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
Expand Down Expand Up @@ -204,6 +216,8 @@ void export_client(py::module_& m) {
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("create_reader", &Client_createReader)
.def("create_reader_async", &Client_createReaderAsync)
.def("create_reader_async_v2", &Client_createReaderAsyncV2)
.def("create_table_view",
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
Expand Down
31 changes: 31 additions & 0 deletions src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/
#include "utils.h"
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>

namespace py = pybind11;
Expand Down Expand Up @@ -54,16 +55,46 @@ void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) {

bool Reader_is_connected(Reader& reader) { return reader.isConnected(); }

void Reader_readNextAsync(Reader& reader, ReadNextCallback callback) {
py::gil_scoped_release release;
reader.readNextAsync(callback);
}
Comment thread
BewareMyPower marked this conversation as resolved.

void Reader_closeAsync(Reader& reader, ResultCallback callback) {
py::gil_scoped_release release;
reader.closeAsync(callback);
}

void Reader_seekAsync(Reader& reader, const MessageId& msgId, ResultCallback callback) {
py::gil_scoped_release release;
reader.seekAsync(msgId, callback);
}

void Reader_seekAsync_timestamp(Reader& reader, uint64_t timestamp, ResultCallback callback) {
py::gil_scoped_release release;
reader.seekAsync(timestamp, callback);
}

void Reader_hasMessageAvailableAsync(Reader& reader, HasMessageAvailableCallback callback) {
py::gil_scoped_release release;
reader.hasMessageAvailableAsync(callback);
}

void export_reader(py::module_& m) {
using namespace py;

class_<Reader>(m, "Reader")
.def("topic", &Reader::getTopic, return_value_policy::copy)
.def("read_next", &Reader_readNext)
.def("read_next", &Reader_readNextTimeout)
.def("read_next_async", &Reader_readNextAsync)
Comment thread
BewareMyPower marked this conversation as resolved.
.def("has_message_available", &Reader_hasMessageAvailable)
.def("has_message_available_async", &Reader_hasMessageAvailableAsync)
.def("close", &Reader_close)
.def("close_async", &Reader_closeAsync)
.def("seek", &Reader_seek)
.def("seek", &Reader_seek_timestamp)
.def("seek_async", &Reader_seekAsync)
.def("seek_async", &Reader_seekAsync_timestamp)
.def("is_connected", &Reader_is_connected);
}
87 changes: 87 additions & 0 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Consumer,
Producer,
PulsarException,
Reader,
_set_future,
)
from pulsar.schema import ( # pylint: disable=import-error
Expand Down Expand Up @@ -465,6 +466,86 @@ async def test_seek_timestamp(self):
msg = await consumer.receive()
self.assertEqual(msg.data(), b'msg-3')

async def test_reader_simple(self):
Comment thread
BewareMyPower marked this conversation as resolved.
topic = f'asyncio-test-reader-simple-{time.time()}'
reader = await self._client.create_reader(topic, pulsar.MessageId.earliest)
self.assertTrue(reader.is_connected())
self.assertEqual(reader.topic(), f'persistent://public/default/{topic}')

producer = await self._client.create_producer(topic)
await producer.send(b'hello')
msg = await reader.read_next()
self.assertEqual(msg.data(), b'hello')
with self.assertRaises(asyncio.TimeoutError):
await asyncio.wait_for(reader.read_next(), 1)
Comment thread
BewareMyPower marked this conversation as resolved.
await reader.close()
self.assertFalse(reader.is_connected())

async def test_reader_on_last_message(self):
topic = f'asyncio-test-reader-on-last-message-{time.time()}'
producer = await self._client.create_producer(topic)
for i in range(10):
await producer.send(f'hello-{i}'.encode())
reader = await self._client.create_reader(topic, pulsar.MessageId.latest)
for i in range(10, 20):
await producer.send(f'hello-{i}'.encode())
for i in range(10, 20):
msg = await reader.read_next()
self.assertEqual(msg.data(), f'hello-{i}'.encode())
await reader.close()

async def test_reader_on_specific_message(self):
topic = f'asyncio-test-reader-on-specific-msg-{time.time()}'
producer = await self._client.create_producer(topic)
msg_ids = []
for i in range(10):
msg_id = await producer.send(f'hello-{i}'.encode())
msg_ids.append(msg_id)
reader1 = await self._client.create_reader(topic, pulsar.MessageId.earliest)
for i in range(5):
msg = await reader1.read_next()
self.assertEqual(msg.data(), f'hello-{i}'.encode())
last_msg_id = msg_ids[4]
reader2 = await self._client.create_reader(topic, last_msg_id)
for i in range(5, 10):
msg = await reader2.read_next()
self.assertEqual(msg.data(), f'hello-{i}'.encode())
await reader1.close()
await reader2.close()

async def test_reader_has_message_available(self):
topic = f'asyncio-test-reader-has-message-available-{time.time()}'
producer = await self._client.create_producer(topic)
reader = await self._client.create_reader(topic, pulsar.MessageId.latest)
self.assertFalse(await reader.has_message_available())
for i in range(10):
await producer.send(f'hello-{i}'.encode())
for _ in range(10):
self.assertTrue(await reader.has_message_available())
await reader.read_next()
self.assertFalse(await reader.has_message_available())
await reader.close()

async def test_reader_seek(self):
topic = f'asyncio-test-reader-seek-{time.time()}'
producer = await self._client.create_producer(topic)
msg_ids = []
for i in range(10):
msg_id = await producer.send(f'msg-{i}'.encode())
msg_ids.append(msg_id)
reader = await self._client.create_reader(topic, pulsar.MessageId.latest,
start_message_id_inclusive=False)
await reader.seek(msg_ids[2])
msg = await reader.read_next()
self.assertEqual(msg.data(), b'msg-3')
await reader.close()
reader_inclusive = await self._client.create_reader(topic, pulsar.MessageId.latest,
start_message_id_inclusive=True)
await reader_inclusive.seek(msg_ids[2])
msg = await reader_inclusive.read_next()
self.assertEqual(msg.data(), b'msg-2')
await reader_inclusive.close()

async def test_schema(self):
class ExampleRecord(Record): # pylint: disable=too-few-public-methods
"""Example record schema for testing."""
Expand Down Expand Up @@ -507,6 +588,12 @@ def raise_exception():
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
# TODO: we should fix the error message not included in pattern subscription case

with self.assertRaises(PulsarException) as e:
await client.create_reader("private/auth/asyncio-test-token-auth-reader",
pulsar.MessageId.earliest)
self.assertEqual(e.exception.error(), pulsar.Result.AuthenticationError)
self.assertIn("token supplier failed", str(e.exception))

await client.close()


Expand Down
Loading