Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8f3fdaa
Test script for recieving arudino i2c data serially with pi
abhinavp5 Nov 6, 2025
ad10ce5
debuggin output to motor_interface.py
abhinavp5 Nov 7, 2025
cead4bc
3 bytes read for motorinterface.py
abhinavp5 Nov 7, 2025
c35156d
Working serial communication with pi and arduino
colbycheese55 Oct 20, 2025
8fcfa43
implementation of MotorInterfaceClass
abhinavp5 Nov 9, 2025
efcdb30
fixes to self.lock
abhinavp5 Nov 9, 2025
1eb35c8
confirmed working implemntatin of MotorInterface APi
colbycheese55 Oct 20, 2025
1dee8c5
updates to motor interface API
abhinavp5 Nov 13, 2025
c33d438
remove unused instance of MotorInterfaceTest and related test loop
abhinavp5 Nov 14, 2025
de7f1af
Adding serial port as member of class
abhinavp5 Nov 14, 2025
2cbfa67
fix: debug output
abhinavp5 Jan 22, 2026
70d59c9
test: added the arduino testscript
abhinavp5 Feb 1, 2026
cd0ed8b
added arudino script
abhinavp5 Feb 1, 2026
8f69c98
New File for System Wide Testing
colbycheese55 Jan 30, 2026
235dee9
Developing Testing for all Comms
pizzeman Feb 1, 2026
79bb5d4
Added Test for Digital, Analog and CAN
pizzeman Feb 8, 2026
018c0bd
Changing Tabs
colbycheese55 Jan 30, 2026
e69acd1
fix: exception when stop reading for less error-y exit from test
abhinavp5 Feb 12, 2026
6301b45
feat: add motor interface test script for throttle and regen modes
abhinavp5 Feb 12, 2026
f0242ab
Merge remote-tracking branch 'origin/main' into TestMotorInterface
abhinavp5 Feb 12, 2026
cab4a3a
Changed Syntax in Test_All_Comms
pizzeman Feb 15, 2026
94e23bf
Merge branch 'TestMotorInterface' of https://github.com/solarcaratuva…
pizzeman Feb 15, 2026
b60fea0
fix: added locks, and updated exceptions
abhinavp5 Feb 15, 2026
3ae1439
fix: deleted dupe ard script#
abhinavp5 Feb 15, 2026
67a8334
fix: capitalized lock
abhinavp5 Feb 15, 2026
2529569
fix: fixed deadlock issue
abhinavp5 Feb 15, 2026
5afb92b
fix: added close method to closer serial port
abhinavp5 Feb 15, 2026
dc6911a
fix: reverted TestRunner
abhinavp5 Feb 15, 2026
45bcb16
fix: final changes to review
abhinavp5 Feb 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
26 changes: 7 additions & 19 deletions Server/TestRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,15 @@
sys.path.append(new_root)
sys.path.append(os.getcwd())

from upload_wrapper import upload_firmware

# Custom TestSuite that overrides the run method to run setup and teardown code; added code to maek firmware upload once per board, not test
# Custom TestSuite that overrides the run method to run setup and teardown code
class CustomTestSuite(unittest.TestSuite):
def __init__(self, tests, board_name):
super().__init__(tests)
self.board_name = board_name

def run(self, result, debug=False):
setup_suite(self.board_name)
setup_suite()
super().run(result, debug)
# teardown_suite(self.board_name) # optional

def setup_suite(board_name):
print(f"[SUITE] Uploading firmware for {board_name}")
upload_firmware(board_name)
teardown_suite()

def setup_suite():
print("Setting up suite...") # replace with setup code

def teardown_suite():
print("Tearing down suite...") # replace with teardown code
Expand All @@ -44,8 +36,6 @@ def make_suite(board_folder):

def run_tests() -> None:
board_names = config.REPO_CONFIG["boards"].keys()
print("DEBUG: Boards in config:", board_names)


all_suites = unittest.TestSuite()

Expand All @@ -55,13 +45,11 @@ def run_tests() -> None:

if os.path.isdir(board_folder):
suite = make_suite(board_folder)
custom_suite = CustomTestSuite([suite], board)
all_suites.addTest(custom_suite)
print("Added test to custom suite")
custom_suite = CustomTestSuite([suite])
all_suites.addTests(custom_suite)
else:
print(f"Warning: Folder for board '{board}' not found at path: {board_folder}")

with open("test-results.xml", "wb") as output:
runner = xmlrunner.XMLTestRunner(output=output, buffer=True)
runner.run(all_suites)

122 changes: 122 additions & 0 deletions Testing_Library/MotorInterfaceTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import sys
import serial
import time
import threading

sys.path.append('/home/solarcar/solarcar/HiL_Testing/Server')

THROTTLE_ADDR = 0x2F
REGEN_ADDR = 0x2E
THROTTLE_START_BYTE = b'\xFF'
REGEN_START_BYTE = b'\xFE'
'''
while True:
byte_read = ser.read(1)
if byte_read == b'\xFF':
data = ser.read(3) # Read 3 bytes for two 9-bit values
if len(data) == 3:
# Unpack two 9-bit values from 3 bytes
# Byte 0: throttle[7:0]
# Byte 1: throttle[8] in bit 0, regen[6:0] in bits 7:1
# Byte 2: regen[8:7] in bits 1:0

# Extracting throttle and regen
throttle = data[0] | ((data[1] & 0x01) << 8)
regen = ((data[1] >> 1) & 0x7F) | ((data[2] & 0x03) << 7)

print(f"Throttle: {throttle}%) | Regen: {regen})")
else:
print(f"Incomplete data: got {len(data)} bytes instead of 3")
elif byte_read:
print(f"Unexpected byte:{byte_read}")
'''

class MotorInterfaceTest:
"""API for test Motor interface"""
def __init__(self, port='/dev/ttyACM0', baudrate=9600):
self.stop_thread = False
self.mru_throttle = None # Most Recently Used Throttle Value
self.mru_regen = None # Most Recently Used Regen Value
self.Lock = threading.Lock()
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
timeout=1
)
print("Waiting for Arduino to start...")
time.sleep(2) # Give Arduino time to boot
self.startReadThread()


# Run infinite thread to read in Throttle and Regen Values
def startReadThread(self):
read_thread = threading.Thread(target=self.readThrottleAndRegen)
read_thread.daemon = True
try:
read_thread.start()
except RuntimeError as e:
raise RuntimeError("Thread Was not Started")

# Constantly read thorttle and regen values
def readThrottleAndRegen(self):
while(not self.stop_thread):
try:
byte_read = self.ser.read(1)

if byte_read == THROTTLE_START_BYTE: # Throttle start marker
data = self.ser.read(2) # Read 2 bytes for 9-bit value
if len(data) == 2:
# motorinterface.cpp sends (0x100 - throttle)
# first byte contains lower 8 bits, second byte LSB is the 9th bit
raw_from_arduino = data[0] | ((data[1] & 0x01) << 8)
with self.Lock:
self.mru_throttle = 256 - raw_from_arduino # PowerBoard sends (0x100 - throttle) over I2C

elif byte_read == REGEN_START_BYTE:
data = self.ser.read(2) # Read 2 bytes for 9-bit value
if len(data) == 2:
# motorinterface.cpp sends (0x100 - regen)
# first byte contains lower 8 bits, second byte LSB is the 9th bit
raw_from_arduino = data[0] | ((data[1] & 0x01) << 8)
with self.Lock:
self.mru_regen = 256 - raw_from_arduino # PowerBoard sends (0x100 - regen) over I2C

except serial.SerialException as e:
print(e)
# setting Both to None so that Exception is triggered within the getter methods
self.mru_throttle = None
self.mru_regen = None

def get_throttle(self) -> float:
# Normalized between [0-1.0]
with self.Lock:
if self.mru_throttle is None:
raise ValueError('MRU-Throttle Value is None')
return self.mru_throttle / 256.0

def get_throttle_raw(self) -> int:
# raw between [0-256]
with self.Lock:
if self.mru_throttle is None:
raise ValueError('MRU-Raw-Throttle Value is None')
return self.mru_throttle

def get_regen(self) -> float:
# Normalized between [0,1.0]
with self.Lock:
if self.mru_regen is None:
raise ValueError('MRU-Regen Value is None')
return self.mru_regen / 256.0

def get_regen_raw(self) -> int:
# raw between [0-256]
with self.Lock:
if self.mru_regen is None:
raise ValueError('MRU-Regen-Raw Value is None')
return self.mru_regen

def close(self):
self.stop_thread = True
if self.ser and self.ser.is_open:
self.ser.close()

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include <Wire.h>

#define THROTTLE_ADDR 0x2F
#define REGEN_ADDR 0x2E
#define THROTTLE_MARKER 0xFF
#define REGEN_MARKER 0xFE
#define SWAP_COMMAND 0x1FF

// Test Mode: 0 = Throttle only (listen on 0x2F), 1 = Regen only (listen on 0x2E).
// Must match PowerBoard main.cpp I2C_TEST_MODE: test_throttle → 0, test_regen → 1.
#define TEST_MODE 1

int currentMode = 1; // 0 = Throttle, 1 = Regen

void setup() {
Serial.begin(9600);
while (!Serial);

// Initialize based on test mode
if (TEST_MODE == 1) {
// Start in regen mode
Wire.begin(REGEN_ADDR);
currentMode = 1;
} else {
// Start in throttle mode (default)
Wire.begin(THROTTLE_ADDR);
currentMode = 0;
}

Wire.onReceive(receiveEvent);
}

void loop() {
delay(100);
}

void receiveEvent(int howMany) {
if (Wire.available() < 2) return;

// Read 2 bytes (16-bit value)
int highByte = Wire.read();
int lowByte = Wire.read();
int value = ((highByte << 8) | lowByte) & 0x1FF;

// Clamp value to valid range [0, 256]
if (value > 256) value = 256;


// Ensure value stays in valid range after inversion
if (value > 256) value = 256;
if (value < 0) value = 0;

// Send based on current mode
if (currentMode == 0) {
Serial.write(THROTTLE_MARKER);
Serial.write(value & 0xFF); // Low byte
Serial.write((value >> 8) & 0x01); // Bit 8
} else {
Serial.write(REGEN_MARKER);
Serial.write(value & 0xFF); // Low byte
Serial.write((value >> 8) & 0x01); // Bit 8
}
}
106 changes: 106 additions & 0 deletions Testing_Library/test_all_comms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import sys
sys.path.append('/home/solarcar/solarcar/HiL_Testing/Server')

from CANBus import CANBus
from CANMessage import CanMessage
from CANPi import readIn
import time
import serial
from gpioPins import DigitalInput, DigitalOutput
import random
import gpiozero
from gpioPins import AnalogOutput

# Global Pins for Synchronization
pinOut = DigitalOutput("2")
pinIn = DigitalInput("11")

# -----------Testing CAN Comms-----------

def testAddMethod(canBus : CANBus):
canBus.printCANBus()

input("WAITING")
#Example CANMessage
name = "BPSError"
id = 0x106
signals = {"internal_communications_fault" : 1}
timestamp = 1.0
can_message = CanMessage(name, id, signals, timestamp)
canBus.addToCANBus(can_message)
canBus.printCANBus()

def testReadThread(canBus : CANBus):
time.sleep(1) # Wait for Nucleo to send CAN
counter = 0
while (counter < 5):
print(f'Reading number {counter}')
canBus.printCANBus()
time.sleep(0.5)
counter = counter + 1

#Send BPSError to Nucleo (currently turn LD2 off and on)
def testWrite(canBus : CANBus):
#Example CANMessage
name = "BPSError"
id = 0x106
signals = {"internal_communications_fault" : 1}
timestamp = 1.0
can_message = CanMessage(name, id, signals, timestamp)
counter = 0
canBus.sendMessage(can_message)

def testCAN():
canBus = CANBus()
testWrite(canBus)

pinOut.write(0) # Block nucleo from proceeding
# Check that nucleo modified pin to 1 to switch to receiving
while pinIn.read() == 0:
pinOut.write(0)
time.sleep(0.5)

testReadThread(canBus)

#----------------Testing Analog Comms---------------

#Test case without the wrapper api
def send_to_nucleo():
pinOut = gpiozero.PWMOutputDevice("GPIO5", frequency=5000)

#Sending analog value to nucleo
while True:
pinOut.value = 0.64
print(f"SENDING {pinOut.value}")
#pinOut.pulse() #Useful function to sweep all ranges

time.sleep(2)

#Test the wrapper api
def test_analog_class(pinOut):
value = round(random.uniform(1.0, 100.0), 2) # Generate random 2 d.p. float

pinOut.write_voltage(value)
print(f'READING IN PIN VALUE: {pinOut.read()}')

def testAnalog():
pinOut = AnalogOutput("5")
test_analog_class(pinOut)

if __name__ == "__main__":
# Wait for nucleo
while pinIn.read() == 1:
pinOut.write(0)
time.sleep(0.5)

pinOut.write(1)
testCAN()

# Make sure finished sending
while pinIn.read() == 1:
pinOut.write(0)
print("THERE WAS A SYNC ERROR WITH RECEIVING CAN")
time.sleep(0.5)

pinOut.write(1)
testAnalog()