diff --git a/hci/user-in-the-box/uitb/test/PhysicalAlignment/utils.py b/hci/user-in-the-box/uitb/test/PhysicalAlignment/utils.py new file mode 100644 index 00000000..aca4848c --- /dev/null +++ b/hci/user-in-the-box/uitb/test/PhysicalAlignment/utils.py @@ -0,0 +1,123 @@ +import pandas as pd +import numpy as np +import mujoco +from scipy.spatial.transform import Rotation + +## used for coordinate transformations +def transformation_matrix(pos, quat): + quat = np.roll(quat, -1) + matrix = Rotation.from_quat(quat).as_matrix() + + # Create the matrix + T = np.eye(4) + T[:3, :3] = matrix + T[:3, 3] = pos + return T + + +def ReachEnvelopeCheck(env, + use_vr_controller, + welded_body, + relpose, + endeffector_name, + trajectories_table_columns, + num_episodes, + video_output: bool, + figure_filename, + table_filename, + video_filename): + #return 123 + + trajectories_table = pd.DataFrame(columns=trajectories_table_columns) + #with imageio.get_writer(video_filename, fps=int(1 / (env._model.opt.timestep))) as video: + video = [] + + for video_episode_index in range(num_episodes): + print('Video - Episode {}/{}'.format(video_episode_index + 1, num_episodes)) + obs, info = env.reset() + print("Successfully reset.") + if video_output: + video.append(env.render()) + + nsteps_to_test = 100 + ##TODO: generalize to other biom. models (allow to define initial posture and list of joints over which to iterate) + range_elevation_angle = np.linspace( + env._model.joint('elv_angle').range[0], + env._model.joint('elv_angle').range[1], nsteps_to_test) + range_elevation = np.linspace( + env._model.joint('shoulder_elv').range[0], + env._model.joint('shoulder_elv').range[1], nsteps_to_test) + _steps = 0 + for i in range(nsteps_to_test): + for j in range(nsteps_to_test): + if (100*(_steps+1)/(nsteps_to_test**2) % 10) == 0: + print(f"{100*(_steps+1)/(nsteps_to_test**2)}% reached.") + + # if use_vr_controller: + # obs, info = env.reset() + env._data.joint('elv_angle').qpos = range_elevation_angle[i] + env._data.joint('shoulder_elv').qpos = range_elevation[j] + + # maximally extend the arm: + env._data.joint("elbow_flexion").qpos = env._model.joint("elbow_flexion").range[0] + + # adjust virtual joints according to active constraints: + _eq_act = getattr(env._model, "eq_active", None) + if _eq_act is None: + _eq_act = np.asarray(getattr(env._model, "eq_active0", np.ones(env._model.neq))) + for (virtual_joint_id, physical_joint_id, poly_coefs) in zip( + env._model.eq_obj1id[ + (env._model.eq_type == 2) & (_eq_act == 1)], + env._model.eq_obj2id[ + (env._model.eq_type == 2) & (_eq_act == 1)], + env._model.eq_data[(env._model.eq_type == 2) & + (_eq_act == 1), 4::-1]): + if physical_joint_id >= 0: + env._data.joint(virtual_joint_id).qpos = np.polyval(poly_coefs, env._data.joint(physical_joint_id).qpos) + # qpos[env._model.joint_name2id('shoulder1_r2')] = -qpos[env._model.joint_name2id( + # 'elv_angle')] # joint equality constraint between "elv_angle" and "shoulder1_r2" + + #env._data.qpos[:] = qpos + # if use_vr_controller: + # mujoco.mj_step(env._model, env._data) + # else: + # mujoco.mj_forward(env._model, env._data) + mujoco.mj_forward(env._model, env._data) + if video_output: + video.append(env.render()) + trajectories_table.loc[len(trajectories_table), :"shoulder_elv_pos"] = np.concatenate((env._data.joint('elv_angle').qpos, env._data.joint('shoulder_elv').qpos)) + if use_vr_controller: + #manually compute VR controller position using "right_controller_relpose" from config file and current position of the aux body/welded body + T1 = transformation_matrix(pos=env._data.body(welded_body).xpos, quat=env._data.body(welded_body).xquat) + T2 = transformation_matrix(pos=relpose[:3], quat=relpose[3:]) + T = np.matmul(T1, np.linalg.inv(T2)) + T[:3, 3] + trajectories_table.loc[len(trajectories_table) - 1, + "end-effector_xpos_x":"end-effector_xpos_z"] = T[:3, 3] + # _controller_quat = np.roll(Rotation.from_matrix(T[:3, :3]).as_quat(), 1) + else: + trajectories_table.loc[len(trajectories_table) - 1, + "end-effector_xpos_x":"end-effector_xpos_z"] = env._data.body(endeffector_name).xpos + # print(env._data.body(endeffector_name).xpos) + + _steps += 1 + + ## -> Store csv file + trajectories_table.to_csv(table_filename) + print(f"ReachEnvelope data stored at '{table_filename}'.") + + + ## -> Store mp4 file + print("\nCreating ReachEnvelope video...") + # temporarily override fps + _orig_fps = env._GUI_camera._fps + env._GUI_camera._fps = 100 + env._GUI_camera.write_video_set_path(video_filename) + # Write the video + # simulator._camera.write_video(imgs, os.path.join(evaluate_dir, args.out_file)) + for _img in video: + env._GUI_camera.write_video_add_frame(_img) + env._GUI_camera.write_video_close() + # reset to original fps from env + env._GUI_camera._fps = _orig_fps + print(f"ReachEnvelope video stored at '{video_filename}'.") diff --git a/hci/user-in-the-box/uitb/test/__init__.py b/hci/user-in-the-box/uitb/test/__init__.py new file mode 100644 index 00000000..fddecfd1 --- /dev/null +++ b/hci/user-in-the-box/uitb/test/__init__.py @@ -0,0 +1 @@ +from uitb.test.PhysicalAlignment.utils import ReachEnvelopeCheck \ No newline at end of file diff --git a/hci/user-in-the-box/uitb/test/evaluator.py b/hci/user-in-the-box/uitb/test/evaluator.py new file mode 100644 index 00000000..dfa4e328 --- /dev/null +++ b/hci/user-in-the-box/uitb/test/evaluator.py @@ -0,0 +1,269 @@ +import os +import numpy as np +from stable_baselines3 import PPO +import re +import argparse +import scipy.ndimage +from collections import defaultdict +import matplotlib.pyplot as pp +import cv2 + +from uitb.utils.logger import StateLogger, ActionLogger +from uitb.utils import interaction_metrics +from uitb.simulator import Simulator + +#from uitb.bm_models.effort_models import CumulativeFatigue3CCr, ConsumedEndurance + + +def natural_sort(l): + convert = lambda text: int(text) if text.isdigit() else text.lower() + alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] + return sorted(l, key=alphanum_key) + + +### DEPRECATED, use simulator.render() instead +# def grab_pip_image(simulator): +# # Grab an image from both 'for_testing' camera and 'oculomotor' camera, and display them 'picture-in-picture' +# +# # Grab images +# img, _ = simulator._GUI_camera.render() +# +# ocular_img = None +# for module in simulator.perception.perception_modules: +# if module.modality == "vision": +# # TODO would be better to have a class function that returns "human-viewable" rendering of the observation; +# # e.g. in case the vision model has two cameras, or returns a combination of rgb + depth images etc. +# ocular_img, _ = module._camera.render() +# +# if ocular_img is not None: +# +# # Resample +# resample_factor = 2 +# resample_height = ocular_img.shape[0]*resample_factor +# resample_width = ocular_img.shape[1]*resample_factor +# resampled_img = np.zeros((resample_height, resample_width, 3), dtype=np.uint8) +# for channel in range(3): +# resampled_img[:, :, channel] = scipy.ndimage.zoom(ocular_img[:, :, channel], resample_factor, order=0) +# +# # Embed ocular image into free image +# i = simulator._GUI_camera.height - resample_height +# j = simulator._GUI_camera.width - resample_width +# img[i:, j:] = resampled_img +# +# return img + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description='Evaluate a policy.') + parser.add_argument('simulator_folder', type=str, + help='the simulation folder') + parser.add_argument('--action_sample_freq', type=float, default=20, + help='action sample frequency (how many times per second actions are sampled from policy, default: 20)') + parser.add_argument('--checkpoint', type=str, default=None, + help='filename of a specific checkpoint (default: None, latest checkpoint is used)') + parser.add_argument('--num_episodes', type=int, default=10, + help='how many episodes are evaluated (default: 10)') + parser.add_argument('--uncloned', dest="cloned", action='store_false', help='use source code instead of files from cloned simulator module') + parser.add_argument('--app_condition', type=str, default=None, + help="can be used to override the 'condition' argument passed to a Unity app") + parser.add_argument('--record', action='store_true', help='enable recording') + parser.add_argument('--human', action='store_true', help='enable real-time PyGame rendering window') + parser.add_argument('--out_file', type=str, default='evaluate.mp4', + help='output file for recording if recording is enabled (default: ./evaluate.mp4)') + parser.add_argument('--logging', action='store_true', help='enable logging') + parser.add_argument('--state_log_file', default='state_log', + help='output file for state log if logging is enabled (default: ./state_log)') + parser.add_argument('--action_log_file', default='action_log', + help='output file for action log if logging is enabled (default: ./action_log)') + parser.add_argument('--metrics', action='store_true', + help='汇总人机交互精度指标(指向/跟踪:指尖-目标距离 RMSE 等;遥控驾驶:车-目标距离与停车成功率)') + parser.add_argument('--deterministic', action='store_true', + help='策略评测时使用确定性动作(降低方差,便于对比控制精度)') + args = parser.parse_args() + + # Define directories + checkpoint_dir = os.path.join(args.simulator_folder, 'checkpoints') + evaluate_dir = os.path.join(args.simulator_folder, 'evaluate') + + # Make sure output dir exists + os.makedirs(evaluate_dir, exist_ok=True) + + # Override run parameters + run_params = dict() + run_params["action_sample_freq"] = args.action_sample_freq + run_params["evaluate"] = True + + run_params["unity_record_gameplay"] = args.record #False + run_params["unity_logging"] = True + run_params["unity_output_folder"] = evaluate_dir + if args.app_condition is not None: + run_params["app_args"] = ['-condition', args.app_condition] + # run_params["unity_random_seed"] = 123 + + # Embed visual observations into main mp4 or store as separate mp4 files + render_mode_perception = "separate" if run_params["unity_record_gameplay"] else "embed" + + deterministic = bool(args.deterministic) + + # Initialise simulator + _render_mode = "human" if args.human else "rgb_array_list" + simulator = Simulator.get(args.simulator_folder, render_mode=_render_mode, render_mode_perception=render_mode_perception, run_parameters=run_params, use_cloned=args.cloned) + task_name = simulator.task.__class__.__name__ + metric_episodes = [] + + # ## Change effort model #TODO: delete + # simulator.bm_model._effort_model = CumulativeFatigue3CCr(simulator.bm_model, dt=simulator._run_parameters["dt"]) + + print(f"run parameters are: {simulator.run_parameters}") + + # Load latest model if filename not given + _policy_loaded = False + if args.checkpoint is not None: + model_file = args.checkpoint + _policy_loaded = True + else: + try: + files = natural_sort(os.listdir(checkpoint_dir)) + model_file = files[-1] + _policy_loaded = True + except (FileNotFoundError, IndexError): + print("No checkpoint found. Will continue evaluation with randomly sampled controls.") + + if _policy_loaded: + # Load policy TODO should create a load method for uitb.rl.BaseRLModel + print(f'Loading model: {os.path.join(checkpoint_dir, model_file)}\n') + model = PPO.load(os.path.join(checkpoint_dir, model_file)) + + # Set callbacks to match the value used for this training point (if the simulator had any) + simulator.update_callbacks(model.num_timesteps) + + if args.logging: + # Initialise log + state_logger = StateLogger(args.num_episodes, keys=simulator.get_state().keys()) + + # Actions are logged separately to make things easier + action_logger = ActionLogger(args.num_episodes) + + # Visualise evaluations + # statistics = defaultdict(list) + for episode_idx in range(args.num_episodes): + + print(f"Run episode {episode_idx + 1}/{args.num_episodes}.") + + # Reset environment + obs, info = simulator.reset() + terminated = False + truncated = False + reward = 0 + + if args.metrics: + episode_dist_samples = [] + episode_inside = [] + episode_car = [] + + if args.logging: + state = simulator.get_state() + state_logger.log(episode_idx, state) + + # Loop until episode ends + while not terminated and not truncated: + # #print(f"Episode {episode_idx}: {simulator.get_episode_statistics_str()}") + # print(reward) + + if _policy_loaded: + # Get actions from policy + action, _internal_policy_state = model.predict(obs, deterministic=deterministic) + else: + # choose random action from action space + action = simulator.action_space.sample() + + # Take a step + obs, r, terminated, truncated, info = simulator.step(action) + reward += r + + if args.metrics: + st = simulator.get_state() + st.update(info) + if task_name in ("Pointing", "Tracking", "SmallObjectTouch"): + if "dist_to_target_center" in st: + episode_dist_samples.append(float(st["dist_to_target_center"])) + episode_inside.append(bool(info.get("inside_target", False))) + elif task_name == "RemoteDriving": + if "dist_car_to_target" in st: + episode_car.append(float(st["dist_car_to_target"])) + + if args.logging: + action_logger.log(episode_idx, + {"steps": state["steps"], "timestep": state["timestep"], "action": action.copy(), + "reward": r}) + state = simulator.get_state() + state.update(info) + state_logger.log(episode_idx, state) + + if args.metrics: + st_end = simulator.get_state() + st_end.update(info) + if task_name in ("Pointing", "Tracking", "SmallObjectTouch"): + ep = {"dist_samples": list(episode_dist_samples), + "inside_target_frac": interaction_metrics.inside_fraction(episode_inside)} + if task_name in ("Pointing", "SmallObjectTouch"): + ep["targets_hit"] = int(st_end.get("targets_hit", 0)) + metric_episodes.append(ep) + elif task_name == "RemoteDriving": + metric_episodes.append({ + "dist_car_samples": list(episode_car), + "target_parking_success": bool(st_end.get("target_hit", False)), + "min_dist_car": float(min(episode_car)) if episode_car else float("nan"), + }) + + # print(f"Episode {episode_idx}: {simulator.get_episode_statistics_str()}") + + # episode_statistics = simulator.get_episode_statistics() + # for key in episode_statistics: + # statistics[key].append(episode_statistics[key]) + + # print(f'Averages over {args.num_episodes} episodes (std in parenthesis):', + # ', '.join(['{}: {:.2f} ({:.2f})'.format(k, np.mean(v), np.std(v)) for k, v in statistics.items()])) + + if args.metrics: + if task_name in ("Pointing", "Tracking", "SmallObjectTouch", "RemoteDriving"): + interaction_metrics.print_run_summary(task_name, metric_episodes) + else: + print(f"提示: 任务「{task_name}」暂无内置自动指标;请使用 --logging 记录状态后离线分析," + f"或在 uitb/utils/interaction_metrics.py 中扩展。") + + if args.logging: + # Output log + state_logger.save(os.path.join(evaluate_dir, args.state_log_file)) + action_logger.save(os.path.join(evaluate_dir, args.action_log_file)) + print(f'Log files have been saved files {os.path.join(evaluate_dir, args.state_log_file)}.pickle and ' + f'{os.path.join(evaluate_dir, args.action_log_file)}.pickle') + + if args.record: + simulator._GUI_camera.write_video_set_path(os.path.join(evaluate_dir, args.out_file)) + + # Write the video + # simulator._camera.write_video(imgs, os.path.join(evaluate_dir, args.out_file)) + _imgs = simulator.render() + for _img in _imgs: + simulator._GUI_camera.write_video_add_frame(_img) + + simulator._GUI_camera.write_video_close() + print(f'A recording has been saved to file {os.path.join(evaluate_dir, args.out_file)}') + + # Write additional videos for each perception module camera (only if simulator._render_mode_perception == "separate") + if simulator._render_mode_perception == "separate": + _perception_imgs = simulator.get_render_stack_perception() + for _module_name, _imgs in _perception_imgs.items(): + _out_file = os.path.splitext(args.out_file)[0] + f"_{_module_name.replace('/', '-')}" + os.path.splitext(args.out_file)[1] + + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + out = cv2.VideoWriter(os.path.join(evaluate_dir, _out_file), fourcc, simulator._GUI_camera._fps, (_imgs[0].shape[1], _imgs[0].shape[0])) + # out.open(_out_file) + for img in _imgs: + out.write(cv2.cvtColor(np.uint8(img), cv2.COLOR_BGR2RGB)) + out.release() + print(f'A recording has been saved to file {os.path.join(evaluate_dir, _out_file)}') + + simulator.close() diff --git a/hci/user-in-the-box/uitb/test/llc_controller.py b/hci/user-in-the-box/uitb/test/llc_controller.py new file mode 100644 index 00000000..63d88a07 --- /dev/null +++ b/hci/user-in-the-box/uitb/test/llc_controller.py @@ -0,0 +1,266 @@ +import os +import numpy as np +np.set_printoptions(precision=2, suppress=True) +from stable_baselines3 import PPO +import re +import argparse +import scipy.ndimage +from collections import defaultdict +import matplotlib.pyplot as pp +import matplotlib +matplotlib.use('TkAgg') +import cv2 +from time import sleep +import queue +from pynput.keyboard import Listener, Key + +from uitb.utils.logger import StateLogger, ActionLogger +from uitb.simulator import Simulator + + +def natural_sort(l): + convert = lambda text: int(text) if text.isdigit() else text.lower() + alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] + return sorted(l, key=alphanum_key) + +def grab_pip_image(simulator): + # Grab an image from both 'for_testing' camera and 'oculomotor' camera, and display them 'picture-in-picture' + + # Visualise muscle actuation + simulator._model.tendon_rgba[:, 0] = 0.3 + simulator._data.ctrl * 0.7 + + # Grab images + img, _ = simulator._GUI_camera.render() + + ocular_img = None + for module in simulator.perception.perception_modules: + if module.modality == "vision": + # TODO would be better to have a class function that returns "human-viewable" rendering of the observation; + # e.g. in case the vision model has two cameras, or returns a combination of rgb + depth images etc. + ocular_img, _ = module._camera.render() + + # Set back to default + simulator._model.tendon_rgba[:, 0] = 0.95 + + if ocular_img is not None: + + # Resample + resample_factor = 2 + resample_height = ocular_img.shape[0]*resample_factor + resample_width = ocular_img.shape[1]*resample_factor + resampled_img = np.zeros((resample_height, resample_width, 3), dtype=np.uint8) + for channel in range(3): + resampled_img[:, :, channel] = scipy.ndimage.zoom(ocular_img[:, :, channel], resample_factor, order=0) + + # Embed ocular image into free image + i = simulator._GUI_camera.height - resample_height + j = simulator._GUI_camera.width - resample_width + img[i:, j:] = resampled_img + + return img + +# def read_keyboard(queue): +# # Wait for input from user +# input = + +def on_press(key): + try: + + # Exit with esc + if key == Key.esc: + return False + + # Set to initial position with space + elif key == Key.space: + q.put(("set", initial_position.copy())) + + # First joint + elif key.char == "1": + q.put(("add", np.array([0.5, 0, 0, 0, 0]))) + elif key.char == "q": + q.put(("add", np.array([0.1, 0, 0, 0, 0]))) + elif key.char == "a": + q.put(("add", np.array([-0.1, 0, 0, 0, 0]))) + elif key.char == "z": + q.put(("add", np.array([-0.5, 0, 0, 0, 0]))) + + # Second joint + elif key.char == "2": + q.put(("add", np.array([0, 0.5, 0, 0, 0]))) + elif key.char == "w": + q.put(("add", np.array([0, 0.1, 0, 0, 0]))) + elif key.char == "s": + q.put(("add", np.array([0, -0.1, 0, 0, 0]))) + elif key.char == "x": + q.put(("add", np.array([0, -0.5, 0, 0, 0]))) + + # Third joint + elif key.char == "3": + q.put(("add", np.array([0, 0, 0.5, 0, 0]))) + elif key.char == "e": + q.put(("add", np.array([0, 0, 0.1, 0, 0]))) + elif key.char == "d": + q.put(("add", np.array([0, 0, -0.1, 0, 0]))) + elif key.char == "c": + q.put(("add", np.array([0, 0, -0.5, 0, 0]))) + + # Fourth joint + elif key.char == "4": + q.put(("add", np.array([0, 0, 0, 0.5, 0]))) + elif key.char == "r": + q.put(("add", np.array([0, 0, 0, 0.1, 0]))) + elif key.char == "f": + q.put(("add", np.array([0, 0, 0, -0.1, 0]))) + elif key.char == "v": + q.put(("add", np.array([0, 0, 0, -0.5, 0]))) + + # Fifth joint + elif key.char == "5": + q.put(("add", np.array([0, 0, 0, 0, 0.5]))) + elif key.char == "t": + q.put(("add", np.array([0, 0, 0, 0, 0.1]))) + elif key.char == "g": + q.put(("add", np.array([0, 0, 0, 0, -0.1]))) + elif key.char == "b": + q.put(("add", np.array([0, 0, 0, 0, -0.5]))) + + else: + return + + except AttributeError: + pass + +if __name__=="__main__": + + parser = argparse.ArgumentParser(description='Evaluate a policy.') + parser.add_argument('simulator_folder', type=str, + help='the simulation folder') + parser.add_argument('--action_sample_freq', type=float, default=100, + help='action sample frequency (how many times per second actions are sampled from policy, default: 20)') + parser.add_argument('--checkpoint', type=str, default=None, + help='filename of a specific checkpoint (default: None, latest checkpoint is used)') + parser.add_argument('--logging', action='store_true', help='enable logging') + parser.add_argument('--state_log_file', default='controller_state_log', + help='output file for state log if logging is enabled (default: ./controller_state_log)') + parser.add_argument('--action_log_file', default='controller_action_log', + help='output file for action log if logging is enabled (default: ./controller_action_log)') + args = parser.parse_args() + + # Define directories + checkpoint_dir = os.path.join(args.simulator_folder, 'checkpoints') + evaluate_dir = os.path.join(args.simulator_folder, 'evaluate') + + # Make sure output dir exists + os.makedirs(evaluate_dir, exist_ok=True) + + # Override run parameters + run_params = dict() + run_params["action_sample_freq"] = args.action_sample_freq + run_params["evaluate"] = True + + # Use deterministic actions? + deterministic = True + + # Initialise simulator + simulator = Simulator.get(args.simulator_folder, run_parameters=run_params) + + print(f"run parameters are: {simulator.run_parameters}\n") + + # Load latest model if filename not given + if args.checkpoint is not None: + model_file = args.checkpoint + else: + files = natural_sort(os.listdir(checkpoint_dir)) + model_file = files[-1] + + # Load policy TODO should create a load method for uitb.rl.BaseRLModel + print(f'Loading model: {os.path.join(checkpoint_dir, model_file)}\n') + model = PPO.load(os.path.join(checkpoint_dir, model_file)) + + # Set callbacks to match the value used for this training point (if the simulator had any) + simulator.update_callbacks(model.num_timesteps) + + if args.logging: + + # Initialise log + state_logger = StateLogger(args.num_episodes, keys=simulator.get_state().keys()) + + # Actions are logged separately to make things easier + action_logger = ActionLogger(args.num_episodes) + + # Visualise evaluations + statistics = defaultdict(list) + imgs = [] + + # Reset environment + simulator.reset() + simulator.task._target_qpos[:] = simulator.task._qpos.copy() + initial_position = simulator.task._qpos.copy() + obs = simulator.get_observation() + + + if args.logging: + state = simulator.get_state() + state_logger.log(0, state) + + # Initialise a queue so we can share data between threads + q = queue.Queue() + + # Start a keyboard press reader in a thread + # keyboard_reader = threading.Thread(target=read_keyboard, args=(q,), daemon=True) + # keyboard_reader.start() + # listener = Listener(on_press=lambda event: on_press(event, q)) + listener = Listener(on_press=on_press) + listener.start() + + # Run simulation here in main thread + while True: + + # If listener has exited (user pressed 'esc'), break + if not listener.is_alive(): + break + + # Read queue + if not q.empty(): + action = q.get() + if action[0] == "add": + simulator.task._target_qpos = np.clip(simulator.task._target_qpos + action[1], -1, 1) + elif action[0] == "set": + simulator.task._target_qpos = action[1] + obs = simulator.get_observation() + print(simulator.task._target_qpos) + + # Get actions from policy + action, _states = model.predict(obs, deterministic=deterministic) + + # Take a step + obs, r, terminated, truncated, info = simulator.step(action) + + if args.logging: + action_logger.log(0, {"steps": state["steps"], "timestep": state["timestep"], "action": action.copy(), + "reward": r}) + state = simulator.get_state() + state.update(info) + state_logger.log(0, state) + + # Do some rendering + img = grab_pip_image(simulator) + qpos = simulator.task._target_qpos + cv2.putText(img, f'{qpos[0]:.2f} elevation angle', (700, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) + cv2.putText(img, f'{qpos[1]:.2f} shoulder elevation', (700, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) + cv2.putText(img, f'{qpos[2]:.2f} shoulder rotation', (700, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) + cv2.putText(img, f'{qpos[3]:.2f} elbow flexion', (700, 200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) + cv2.putText(img, f'{qpos[4]:.2f} pronation/supination', (700, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA) + cv2.imshow("simulation", np.flip(img, axis=2)/255) + cv2.waitKey(1) + # sleep(0.01) + + # Run keyboard press reader in another thread + + + if args.logging: + # Output log + state_logger.save(os.path.join(evaluate_dir, args.state_log_file)) + action_logger.save(os.path.join(evaluate_dir, args.action_log_file)) + print(f'Log files have been saved files {os.path.join(evaluate_dir, args.state_log_file)}.pickle and ' + f'{os.path.join(evaluate_dir, args.action_log_file)}.pickle')