This repository was archived by the owner on Mar 17, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzero.py
More file actions
220 lines (173 loc) · 7.01 KB
/
zero.py
File metadata and controls
220 lines (173 loc) · 7.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import multiprocessing as mp
import queue
import time
# Real sheet
sheet_id = "1u2M_qIo_XLVv5HPKYmyX_ehruxVFECf8vmXNKnErk4U"
sheets_key_file = "/creds/service_account.json"
# Test sheet
#sheet_id = "1H6hMCbpaxb5Ts8yTO4nVpE3nqBfmG2kIDezKE0Cf5QQ"
#sheets_key_file = "../service_account.json"
rows = 6
cols = 5
button_row_pins = [2, 3, 4, 17, 27, 22]
button_col_pins = [26, 19, 13, 6, 5]
button_scan_interval_ms = 10
# Seconds between refreshing data from Sheets API
# Rate limit is 60 read requests/min, so 10 seconds/request -> 6 requests/min
# should be safe
refresh_interval = 5
# Seconds after pressing a button for which it will do nothing
# Prevents issues from people pressing their button multiple times or bumping
# others' buttons off
bump_protection_time = 60
pico_port_name = "/dev/serial0"
pico_port_baud = 115200
pico_start_bit = 0b10000000
# Order of these should match order of buttons (down, then to right),
# and match the order of the people in the sheet
student_names = [
"Mason", "Cameron", "Jonah", "Jerry", "Henri K", "Liam",
"Ki Bae", "Oliver", "Clark", "Rain", "Malcom", "Henry N",
"Auggie", "Anders G", "Victor", "Finn", "Sam", "Neah",
"Sophia", "Emily", "Sadie", "Alena", "Anders T", "Owen",
"Anna", "Kaelan", "Drew"
]
def scan_buttons(press_queue, stop_event):
try:
import RPi.GPIO as GPIO
except RuntimeError:
print("This script must be run as superuser")
# FIXME: I don't know whether this should be BOARD or BCM
GPIO.setmode(GPIO.BCM)
for pin in button_row_pins:
GPIO.setup(pin, GPIO.IN, GPIO.PUD_UP)
for pin in button_col_pins:
GPIO.setup(pin, GPIO.OUT)
print("Button scanning started")
prev_pressed_buttons = []
while not stop_event.is_set():
pressed_buttons = []
for col in range(cols):
for i in range(cols):
GPIO.output(button_col_pins[i], i != col)
time.sleep(button_scan_interval_ms / 1000.0)
for row in range(rows):
if not GPIO.input(button_row_pins[row]):
index = row + col * rows
pressed_buttons.append(index)
# print("Pressed buttons:", pressed_buttons)
# Only handle presses if exactly one button is down
# Prevents matrix ghosting by ignoring the problem
if len(pressed_buttons) == 1:
button = pressed_buttons[0]
if button not in prev_pressed_buttons:
# Button was just pressed, enqueue event
press_queue.put(button)
prev_pressed_buttons = pressed_buttons
print("Button scanning stopped")
def get_clocked_in_from_sheet(sheet):
min_column = 2
max_column = min_column + len(student_names) - 1
values = sheet.range(f"J{min_column}:J{max_column}")
return [(cell.value == "Clocked In") for cell in values]
def send_leds_to_pico(port, led_states):
led_data = [pico_start_bit, 0, 0, 0, 0]
for i in range(len(led_states)):
row = i % 6
col = i // 6
if led_states[i]:
led_data[col] |= (1 << row)
led_bytes = bytes(led_data)
port.write(led_bytes)
should_stop = False
def signal_handler(signal_recv, frame):
global should_stop
should_stop = True
def main():
print("Starting")
from datetime import datetime
from google.oauth2.service_account import Credentials
from google.auth.transport.requests import AuthorizedSession
import gspread
import serial
credential_scopes = ["https://spreadsheets.google.com/feeds",
"https://www.googleapis.com/auth/drive"]
credentials = Credentials.from_service_account_file(sheets_key_file, scopes=credential_scopes)
sheets_client = gspread.authorize(credentials)
sheet = sheets_client.open_by_key(sheet_id).sheet1
print("Opened sheet")
try:
clocked_in = get_clocked_in_from_sheet(sheet)
print("Initial clocked in state:")
for name, is_clocked_in in zip(student_names, clocked_in):
print(f" {name}: {is_clocked_in}")
except Exception as e:
print("API Error! Assuming all are clocked out")
print("Error is:", e)
clocked_in = [False] * len(student_names)
press_queue = mp.Queue()
button_stop_event = mp.Event()
button_proc = mp.Process(target=scan_buttons, args=(press_queue, button_stop_event))
pico_serial = serial.Serial(pico_port_name, pico_port_baud)
# We need to handle signals ourselves since we're using multiprocessing
import signal
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
button_proc.start()
next_cache_refresh = time.time() + refresh_interval
last_press_times = [time.time() - bump_protection_time] * len(student_names)
print("Running...");
while not should_stop:
send_leds_to_pico(pico_serial, clocked_in)
#try:
# press_queue.put(int(input()))
#except:
# print("Not pressed this time")
# Poll button events
while True:
try:
pressed_button = press_queue.get_nowait()
print("Pressed:", pressed_button)
if pressed_button >= len(clocked_in):
# No person for this button
continue
if time.time() - last_press_times[pressed_button] < bump_protection_time:
# Duplicate press
continue
last_press_times[pressed_button] = time.time()
student_name = student_names[pressed_button]
is_clocked_in = clocked_in[pressed_button]
# Sam and Ki Bae are not allowed
if student_name == "Sam":
continue
if student_name == "Ki Bae":
continue
if is_clocked_in:
action = "Clock Out"
else:
action = "Clock In"
print(action + ": " + student_name)
date_str = datetime.now().strftime("%m/%d/%Y %H:%M:%S")
try:
sheet.append_row([date_str, action, student_name], value_input_option="USER_ENTERED")
# Update cache
clocked_in[pressed_button] = not is_clocked_in
except Exception as e:
print("Failed to append row:", e)
except queue.Empty:
# Queue is empty
break
time.sleep(0.1)
if time.time() > next_cache_refresh:
try:
clocked_in = get_clocked_in_from_sheet(sheet)
#print("Cache refreshed from sheet:", clocked_in)
except Exception as e:
print("API Error:", e)
next_cache_refresh = time.time() + refresh_interval
print("Stopping...")
button_stop_event.set()
button_proc.join()
print("Goodbye :3")
if __name__ == "__main__":
main()