I'm testing to_kafka sink and its throughput is limited by polltime (0.2 sec). Looks like self.producer.poll(0) only polls for one message at a time and so only one callback is called every 0.2 seconds.
This fails:
def test_to_kafka_throughput():
ARGS = {'bootstrap.servers': 'localhost:9092'}
with kafka_service() as kafka:
_, TOPIC = kafka
source = Stream.from_iterable(range(100)).map(lambda x: str(x).encode())
kafka = source.to_kafka(TOPIC, ARGS)
out = kafka.sink_to_list()
source.start()
wait_for(
lambda: len(out) == 100,
5,
period=0.1,
fail_func=lambda: print("len(out) ==", len(out))
)
The existing test_to_kafka test doesn't catch this, because it starts waiting on the result only after all the items are emitted.
I spent some time tinkering with the code, but can't figure out what's wrong and how to fix this, so any ideas are appreciated.
I'm testing
to_kafkasink and its throughput is limited by polltime (0.2 sec). Looks likeself.producer.poll(0)only polls for one message at a time and so only one callback is called every 0.2 seconds.This fails:
The existing
test_to_kafkatest doesn't catch this, because it starts waiting on the result only after all the items are emitted.I spent some time tinkering with the code, but can't figure out what's wrong and how to fix this, so any ideas are appreciated.