-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_app.py
More file actions
153 lines (126 loc) · 5.69 KB
/
async_app.py
File metadata and controls
153 lines (126 loc) · 5.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
'''Example shows the recommended way of how to run Kivy with the Python built
in asyncio event loop as just another async coroutine.
'''
import asyncio
from kivy.app import App
from kivy.lang.builder import Builder
from kivy.clock import Clock
#from blt_message_processor import BltMessageProcessorSimulation, STOP_BLT_COMMUNICATION_MESSAGE, BltMessageProcessorBleak
from kivy.uix.screenmanager import ScreenManager, Screen
from screens import MainScreen, MenuScreen
from kivy.garden.matplotlib import FigureCanvasKivyAgg
import matplotlib.pyplot as plt
#import numpy as np
from kivy.core.window import Window
from kivy.config import Config
from kivy.config import ConfigParser
import utility
import os
ASYNC_APP_UI_TEMPLATE_FILE = "async_app.kv"
COMMAND_QUEUE_CHECKUP_INTERVAL = 0.2
DEFAULT_CONNECTOR_CLASS = "Simulation"
Config.set('graphics','resizable', False)
# config = ConfigParser()
# config.read('config.ini')
class AsyncApp(App):
""" blbalbalba kivy app contains build, async run and queues for messages
"""
#blt_client_task = None
def __init__(self, config, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.lte_tasks = []
self.hardness_tester = utility.HardnessTesterData()
self.plt_obj = None
self.config = config
def build(self) -> ScreenManager:
Window.size = (1024,780)
return Builder.load_file(ASYNC_APP_UI_TEMPLATE_FILE)
async def app_run_with_externals(self):
'''This will run both methods asynchronously and then block until they
are finished
'''
self.command_queue = []
blt_messages_queue = asyncio.Queue()
try:
blt_class = utility.blt_connector_factory(self.config["COMMON"]["BluetoothClass"])
except KeyError:
blt_class = utility.blt_connector_factory(DEFAULT_CONNECTOR_CLASS)
print("ERROR: BluetoothClass is not specified in config.ini file")
self.blt_processor = blt_class(blt_messages_queue)
self.blt_message_consumer_task = asyncio.ensure_future(
self.process_lte_messages(blt_messages_queue) # TODO should not end with the end message, should always work, connecting/disconnecting to devices should be inside
)
self.command_q_processor = asyncio.ensure_future(
self.process_blt_commands_from_ui()
)
async def run_wrapper():
await self.async_run(async_lib='asyncio')
for task in self.lte_tasks:
task.cancel()
self.lte_tasks = []
self.command_q_processor.cancel()
self.blt_message_consumer_task.cancel()
try:
await asyncio.gather(run_wrapper(), self.blt_message_consumer_task, self.command_q_processor)
except asyncio.exceptions.CancelledError:
print(f"App coroutins finished")
except Exception as e:
print("Unexpected runtime exepction {e}")
raise e
async def process_blt_commands_from_ui(self):
while True:
if len(self.command_queue) > 0:
print("Command queue received a message")
command, args = self.command_queue.pop(0)
print(f"command {command} args P{args}")
loop = asyncio.get_event_loop()
new_task = loop.create_task(command(*args))
self.lte_tasks.append(new_task)
print(f"Got command to execute task {new_task}")
try:
await new_task
except Exception as exc:
print(exc) # TODO make as a pop-up
await asyncio.sleep(COMMAND_QUEUE_CHECKUP_INTERVAL)
async def process_lte_messages(self, blt_messages_queue):
while True:
data = await blt_messages_queue.get() # TODO implement health check - if nothing comes ping
#if data == STOP_BLT_COMMUNICATION_MESSAGE:
if data == None:
print(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
print("Received callback data via async queue: ", data)
if self.root is not None: #Self root is ScreenManager object
# TODO: come up with a solution to datalog. should it be here, should it be at all?
self.hardness_tester.update_data(data)
with open("datalog.txt", "a") as fstream:
fstream.write(str(data))
bluetooth_data_screen = self.root.get_screen(self.root.current)
device_data = self.hardness_tester
self.redraw_hardness_tester_data(bluetooth_data_screen, device_data)
def redraw_hardness_tester_data(self, data_screen, device_data):
# TODO get parameters type
# TODO find a way to delete plot
# TODO think about not using self.plt_obj
print("redraw_hardness_tester_data")
print(data_screen.ids.pltbox)
print(data_screen.ids.pltbox.__dict__)
data_screen.ids.blt_message.text = str(device_data.data)
data_screen.ids.blt_b.text = str(device_data.b)
data_screen.ids.blt_e.text = str(device_data.e)
if device_data.e: #subscribe to updates
e = device_data.e[::4]
e = list(map(int,e))
plt.plot(e)
if self.plt_obj:
data_screen.ids.pltbox.remove_widget(self.plt_obj)
self.plt_obj = FigureCanvasKivyAgg(plt.gcf())
data_screen.ids.pltbox.add_widget(self.plt_obj)
if __name__ == '__main__':
config = ConfigParser()
config.read('config.ini')
asyncio.run(
AsyncApp(config).app_run_with_externals()
)