-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path5_producer_consumer_pattern.py
More file actions
73 lines (49 loc) · 1.93 KB
/
5_producer_consumer_pattern.py
File metadata and controls
73 lines (49 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
Topic: Producer Consumer Pattern
Description: Flow of data processing => Producer, Consumer, Pipeline(Queue)
Python Event Object
Flag: When flag is down, workers will work => 0
When flag is up, workers will stop working => 1
Set: Flag to 1
Clear: Flag to 0
Wait: when Flag 1 => Return
0 => Wait
isSet: Current Flag State
"""
from concurrent.futures import ThreadPoolExecutor
import time
import logging
import threading
import queue
import random
def producer(queue: queue.Queue, event: threading.Event):
while not event.is_set():
new_message = random.randint(1, 100)
queue.put(new_message)
logging.info("Producer produced the message %d", new_message)
logging.info("Producer stopped producing")
def consumer(queue: queue.Queue, event: threading.Event):
# unlike Producer, Consumer has to finish the jobs in the pipeline (Queue)
while not event.is_set() or not queue.empty():
new_message = queue.get()
logging.info("Consumer got the message %d /// Queue Size = %d",
new_message, queue.qsize())
logging.info("Consumer stopped Consuming")
def main():
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format,
level=logging.INFO, datefmt="%H:%M:%S")
logging.info("Main-Thread starts")
logging.info("Creates Thread Pool")
event = threading.Event()
pipeline = queue.Queue(maxsize=100)
with ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
# End
logging.info("Event Set... Stop Working")
event.set()
logging.info("Main-Thread done")
if __name__ == "__main__":
main()