diff --git a/quadruped_pympc/config.py b/quadruped_pympc/config.py index 7a8ceb1..cb07724 100644 --- a/quadruped_pympc/config.py +++ b/quadruped_pympc/config.py @@ -6,7 +6,7 @@ # These are used both for a real experiment and a simulation ----------- # These are the only attributes needed per quadruped, the rest can be computed automatically ---------------------- -robot = 'aliengo' # 'go2', 'aliengo', 'hyqreal', 'mini_cheetah' # TODO: Load from robot_descriptions.py +robot = 'aliengo' # 'go2', 'aliengo', 'hyqreal', 'mini_cheetah' aliengo_arm # TODO: Load from robot_descriptions.py robot_leg_joints = dict(FL=['FL_hip_joint', 'FL_thigh_joint', 'FL_calf_joint',], # TODO: Make configs per robot. FR=['FR_hip_joint', 'FR_thigh_joint', 'FR_calf_joint',], RL=['RL_hip_joint', 'RL_thigh_joint', 'RL_calf_joint',], @@ -52,30 +52,41 @@ hip_height = 0.225 qpos0_js = np.concatenate((np.array([0, -np.pi/2, 0] * 2), np.array([0, np.pi/2, 0] * 2))) + mpc_params = { # 'nominal' optimized directly the GRF # 'input_rates' optimizes the delta GRF # 'sampling' is a gpu-based mpc that samples the GRF # 'collaborative' optimized directly the GRF and has a passive arm model inside - 'type': 'nominal', - + 'type': 'sampling', # 'nominal', 'input_rates', 'sampling', 'collaborative' # horizon is the number of timesteps in the future that the mpc will optimize # dt is the discretization time used in the mpc 'horizon': 12, 'dt': 0.02, + 'mu': 0.5, # GRF limits for each single leg "grf_max": mass * 9.81, "grf_min": 0, - 'mu': 0.5, + + # if this is true, we optimize the step frequency as well + # for the sampling controller, this is done in the rollout + # for the gradient-based controller, this is done with a batched version of the ocp + 'optimize_step_freq': True, + 'step_freq_available': [1.4, 2.0, 2.4], + 'use_nonuniform_discretization': False, - # ----- START properties only for the gradient-based mpc ----- + } +mpc_nominal_params = { + # ----- START properties only for the gradient-based mpc ----- + "grf_max": mass * 9.81, + "grf_min": 0, # this is used if you want to manually warm start the mpc 'use_warm_start': False, # this enables integrators for height, linear velocities, roll and pitch - 'use_integrators': False, + 'use_integrators': True, 'alpha_integrator': 0.1, 'integrator_cap': [0.5, 0.2, 0.2, 0.0, 0.0, 1.0], @@ -133,10 +144,10 @@ # compensate for the external wrench in the prediction (only collaborative) 'passive_arm_compensation': True, - # ----- END properties for the gradient-based mpc ----- - + # ----- END properties for the gradient-based mpc ----- +} +mpc_sampling_params = { # ----- START properties only for the sampling-based mpc ----- - # this is used only in the case 'sampling'. 'sampling_method': 'random_sampling', # 'random_sampling', 'mppi', 'cem_mppi' 'control_parametrization': 'cubic_spline_1', @@ -148,19 +159,11 @@ 'sigma_mppi': 3, 'sigma_random_sampling': [0.2, 3, 10], 'shift_solution': False, - - # ----- END properties for the sampling-based mpc ----- - - # if this is true, we optimize the step frequency as well - # for the sampling controller, this is done in the rollout - # for the gradient-based controller, this is done with a batched version of the ocp - 'optimize_step_freq': False, - 'step_freq_available': [1.4, 2.0, 2.4] - - } +} # ----------------------------------------------------------------------- simulation_params = { + 'swing_generator': 'scipy', # 'scipy', 'explicit', 'ndcurves' 'swing_position_gain_fb': 5000, 'swing_velocity_gain_fb': 100, @@ -186,7 +189,7 @@ # the MPC will be called every 1/(mpc_frequency*dt) timesteps # this helps to evaluate more realistically the performance of the controller - 'mpc_frequency': 200, + 'mpc_frequency': 100, 'use_inertia_recomputation': True, diff --git a/quadruped_pympc/controllers/gradient/collaborative/centroidal_nmpc_collaborative.py b/quadruped_pympc/controllers/gradient/collaborative/centroidal_nmpc_collaborative.py index 6d6687d..99e9ce5 100644 --- a/quadruped_pympc/controllers/gradient/collaborative/centroidal_nmpc_collaborative.py +++ b/quadruped_pympc/controllers/gradient/collaborative/centroidal_nmpc_collaborative.py @@ -1,41 +1,33 @@ # Description: This file contains the class for the NMPC controller +import pathlib + # Authors: Giulio Turrisi - from acados_template import AcadosOcp, AcadosOcpSolver -from centroidal_model_collaborative import Centroidal_Model_Collaborative +from .centroidal_model_collaborative import Centroidal_Model_Collaborative import numpy as np import scipy.linalg import casadi as cs import copy -import time - -import os -dir_path = os.path.dirname(os.path.realpath(__file__)) - -import sys -sys.path.append(dir_path) -sys.path.append(dir_path + '/../../') - -from quadruped_pympc import config - # Class for the Acados NMPC, the model is in another file! class Acados_NMPC_Collaborative: - def __init__(self): + def __init__(self,mpc_parameters=None,robot_name=None): - self.horizon = config.mpc_params['horizon'] # Define the number of discretization steps - self.dt = config.mpc_params['dt'] + self.robot_name = robot_name + self.horizon = self.mpc_parameters['horizon'] # Define the number of discretization steps + self.dt = self.mpc_parameters['dt'] self.T_horizon = self.horizon*self.dt - self.use_RTI = config.mpc_params['use_RTI'] - self.use_integrators = config.mpc_params['use_integrators'] - self.use_warm_start = config.mpc_params['use_warm_start'] - self.use_foothold_constraints = config.mpc_params['use_foothold_constraints'] + self.use_RTI = self.mpc_parameters['use_RTI'] + self.use_integrators = self.mpc_parameters['use_integrators'] + self.use_warm_start = self.mpc_parameters['use_warm_start'] + self.use_foothold_constraints = self.mpc_parameters['use_foothold_constraints'] - self.use_static_stability = config.mpc_params['use_static_stability'] - self.use_zmp_stability = config.mpc_params['use_zmp_stability'] + self.use_static_stability = self.mpc_parameters['use_static_stability'] + self.use_zmp_stability = self.mpc_parameters['use_zmp_stability'] self.use_stability_constraints = self.use_static_stability or self.use_zmp_stability @@ -64,9 +56,20 @@ def __init__(self): # Create the acados ocp solver self.ocp = self.create_ocp_solver_description(acados_model) - self.ocp.code_export_directory = dir_path + '/c_generated_code' - #if (not os.path.isdir(dir_path + "/c_generated_code") or os.listdir(dir_path + "/c_generated_code") == []): - self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + ".json") + # self.ocp.code_export_directory = dir_path + '/c_generated_code' + # #if (not os.path.isdir(dir_path + "/c_generated_code") or os.listdir(dir_path + "/c_generated_code") == []): + # self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + ".json") + + code_export_dir = pathlib.Path(__file__).parent / 'c_generated_code' + self.ocp.code_export_directory = str(code_export_dir) + # # self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json") + + if ((not code_export_dir) or code_export_dir == []): + self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json") + + else : + self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json", build = False, generate = True) + #else : # self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + ".json", build = False, generate = True) @@ -221,32 +224,32 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: ocp.solver_options.nlp_solver_max_iter = 1 # Set the RTI type for the advanced RTI method # (see https://arxiv.org/pdf/2403.07101.pdf) - if(config.mpc_params['as_rti_type'] == "AS-RTI-A"): + if(self.mpc_parameters['as_rti_type'] == "AS-RTI-A"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 0 - elif(config.mpc_params['as_rti_type'] == "AS-RTI-B"): + elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-B"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 1 - elif(config.mpc_params['as_rti_type'] == "AS-RTI-C"): + elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-C"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 2 - elif(config.mpc_params['as_rti_type'] == "AS-RTI-D"): + elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-D"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 3 else: ocp.solver_options.nlp_solver_type = "SQP" - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] #ocp.solver_options.globalization = "MERIT_BACKTRACKING" # FIXED_STEP, MERIT_BACKTRACKING - if(config.mpc_params['solver_mode'] == "balance"): + if(self.mpc_parameters['solver_mode'] == "balance"): ocp.solver_options.hpipm_mode = "BALANCE" - elif(config.mpc_params['solver_mode'] == "robust"): + elif(self.mpc_parameters['solver_mode'] == "robust"): ocp.solver_options.hpipm_mode = "ROBUST" - elif(config.mpc_params['solver_mode'] == "fast"): + elif(self.mpc_parameters['solver_mode'] == "fast"): ocp.solver_options.qp_solver_iter_max = 10 ocp.solver_options.hpipm_mode = "SPEED" - elif(config.mpc_params['solver_mode'] == "crazy_speed"): + elif(self.mpc_parameters['solver_mode'] == "crazy_speed"): ocp.solver_options.qp_solver_iter_max = 5 ocp.solver_options.hpipm_mode = "SPEED_ABS" @@ -261,9 +264,9 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: # Nonuniform discretization - if(config.mpc_params['use_nonuniform_discretization']): - time_steps_fine_grained = np.tile(config.mpc_params['dt_fine_grained'], config.mpc_params['horizon_fine_grained']) - time_steps = np.concatenate((time_steps_fine_grained, np.tile(self.dt, self.horizon - config.mpc_params['horizon_fine_grained']))) + if(self.mpc_parameters['use_nonuniform_discretization']): + time_steps_fine_grained = np.tile(self.mpc_parameters['dt_fine_grained'], self.mpc_parameters['horizon_fine_grained']) + time_steps = np.concatenate((time_steps_fine_grained, np.tile(self.dt, self.horizon - self.mpc_parameters['horizon_fine_grained']))) shooting_nodes = np.zeros((self.horizon+1,)) for i in range(len(time_steps)): shooting_nodes[i+1] = shooting_nodes[i] + time_steps[i] @@ -496,8 +499,8 @@ def create_friction_cone_constraints(self,) ->None: t = np.array([1, 0, 0]) b = np.array([0, 1, 0]) mu = self.centroidal_model.mu_friction - f_max = config.mpc_params['grf_max'] - f_min = config.mpc_params['grf_min'] + f_max = self.mpc_parameters['grf_max'] + f_min = self.mpc_parameters['grf_min'] # Derivation can be found in the paper # "High-slope terrain locomotion for torque-controlled quadruped robots", @@ -575,7 +578,7 @@ def set_weight(self, nx, nu): Q_passive_arm_force = np.array([0, 0, 0, 0, 0, 0]) #end_effector_position_z R_foot_vel = np.array([0.0001, 0.0001, 0.00001]) #v_x, v_y, v_z (should be 4 times this, once per foot) - if(config.robot == "hyqreal"): + if(self.robot_name == "hyqreal"): R_foot_force = np.array([0.00001, 0.00001, 0.00001])#f_x, f_y, f_z (should be 4 times this, once per foot) else: R_foot_force = np.array([0.001, 0.001, 0.001]) @@ -952,7 +955,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif(np.array_equal(FL_contact_sequence, RR_contact_sequence) and np.array_equal(FR_contact_sequence, RL_contact_sequence)): #TROT - stability_margin = config.mpc_params['trot_stability_margin'] + stability_margin = self.mpc_parameters['trot_stability_margin'] if(FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_FL_RR = 0 + stability_margin lb_support_FL_RR = 0 - stability_margin @@ -964,7 +967,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif(np.array_equal(FL_contact_sequence, RL_contact_sequence) and np.array_equal(FR_contact_sequence, RR_contact_sequence)): #PACE - stability_margin = config.mpc_params['pace_stability_margin'] + stability_margin = self.mpc_parameters['pace_stability_margin'] if(FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_RL_FL = 0 + stability_margin lb_support_RL_FL = 0 - stability_margin @@ -976,7 +979,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h else: #CRAWL BACKDIAGONALCRAWL ONLY - stability_margin = config.mpc_params['crawl_stability_margin'] + stability_margin = self.mpc_parameters['crawl_stability_margin'] if(FL_contact_sequence[j] == 1): if(FR_contact_sequence[j] == 1): @@ -1312,7 +1315,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, # Fill stance param, friction and stance proximity # (stance proximity will disable foothold optimization near a stance!!) - mu = config.mpc_params['mu'] + mu = self.mpc_parameters['mu'] yaw = state["orientation"][2] # Stance Proximity ugly routine. Basically we disable foothold optimization @@ -1362,8 +1365,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, for j in range(self.horizon): # If we have estimated an external wrench, we can compensate it for all steps # or less (maybe the disturbance is not costant along the horizon!) - if(config.mpc_params['external_wrenches_compensation'] and - j < config.mpc_params['external_wrenches_compensation_num_step']): + if(self.mpc_parameters['external_wrenches_compensation'] and + j < self.mpc_parameters['external_wrenches_compensation_num_step']): external_wrenches_estimated_param = copy.deepcopy(external_wrenches) external_wrenches_estimated_param = external_wrenches_estimated_param.reshape((6, )) else: @@ -1427,7 +1430,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, if(self.use_integrators): # Compute error for integral action - alpha_integrator = config.mpc_params["alpha_integrator"] + alpha_integrator = self.mpc_parameters["alpha_integrator"] self.integral_errors[0] += (state["position"][2] - reference["ref_position"][2])*alpha_integrator self.integral_errors[1] += (state["linear_velocity"][0] - reference["ref_linear_velocity"][0])*alpha_integrator self.integral_errors[2] += (state["linear_velocity"][1] - reference["ref_linear_velocity"][1])*alpha_integrator @@ -1436,12 +1439,12 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, self.integral_errors[5] += (state["orientation"][1] - reference["ref_orientation"][1])*alpha_integrator - cap_integrator_z = config.mpc_params["integrator_cap"][0] - cap_integrator_x_dot = config.mpc_params["integrator_cap"][1] - cap_integrator_y_dot = config.mpc_params["integrator_cap"][2] - cap_integrator_z_dot = config.mpc_params["integrator_cap"][3] - cap_integrator_roll = config.mpc_params["integrator_cap"][4] - cap_integrator_pitch = config.mpc_params["integrator_cap"][5] + cap_integrator_z = self.mpc_parameters["integrator_cap"][0] + cap_integrator_x_dot = self.mpc_parameters["integrator_cap"][1] + cap_integrator_y_dot = self.mpc_parameters["integrator_cap"][2] + cap_integrator_z_dot = self.mpc_parameters["integrator_cap"][3] + cap_integrator_roll = self.mpc_parameters["integrator_cap"][4] + cap_integrator_pitch = self.mpc_parameters["integrator_cap"][5] self.integral_errors[0] = np.where(np.abs(self.integral_errors[0]) > cap_integrator_z, cap_integrator_z*np.sign(self.integral_errors[0]), self.integral_errors[0]) self.integral_errors[1] = np.where(np.abs(self.integral_errors[1]) > cap_integrator_x_dot, cap_integrator_x_dot*np.sign(self.integral_errors[1]), self.integral_errors[1]) @@ -1470,6 +1473,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, # Set stage constraint + h_R_w = np.array([np.cos(yaw), np.sin(yaw), + -np.sin(yaw), np.cos(yaw)]) if(self.use_foothold_constraints or self.use_stability_constraints): h_R_w = np.array([np.cos(yaw), np.sin(yaw), @@ -1498,7 +1503,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, optimal_footholds_assigned = np.zeros((4, ), dtype='bool') # Saturation of the GRF - optimal_GRF = np.clip(optimal_GRF, 0.0, config.mpc_params['grf_max']) + optimal_GRF = np.clip(optimal_GRF, 0.0, self.mpc_parameters['grf_max']) # We need to provide the next touchdown foothold position. @@ -1625,8 +1630,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, optimal_foothold[3] = reference["ref_foot_RR"][0] - if(config.mpc_params['dt'] <= 0.02 or ( - config.mpc_params['use_nonuniform_discretization'] and config.mpc_params['dt_fine_grained'] <= 0.02)): + if(self.mpc_parameters['dt'] <= 0.02 or ( + self.mpc_parameters['use_nonuniform_discretization'] and self.mpc_parameters['dt_fine_grained'] <= 0.02)): optimal_next_state_index = 2 else: optimal_next_state_index = 1 diff --git a/quadruped_pympc/controllers/gradient/input_rates/centroidal_nmpc_input_rates.py b/quadruped_pympc/controllers/gradient/input_rates/centroidal_nmpc_input_rates.py index f4e6048..ef5568d 100644 --- a/quadruped_pympc/controllers/gradient/input_rates/centroidal_nmpc_input_rates.py +++ b/quadruped_pympc/controllers/gradient/input_rates/centroidal_nmpc_input_rates.py @@ -10,12 +10,11 @@ from typing import Tuple # TODO: Check acados installation and trow error + instructions if needed -> pip install quadruped_pympc[acados] from acados_template import AcadosOcp, AcadosOcpSolver -from quadruped_pympc import config from .centroidal_model_input_rates import Centroidal_Model_InputRates # Class for the Acados NMPC, the model is in another file! class Acados_NMPC_InputRates: - def __init__(self): + def __init__(self,mpc_parameters=None,inertia=None,mass=None,robot_name=None): """ Initialize the Centroidal_NMPC_InputRates class. @@ -26,24 +25,29 @@ def __init__(self): use_warm_start (bool): Flag indicating whether to use warm start. use_integrators (bool): Flag indicating whether to use integrators. """ + self.mpc_parameters = mpc_parameters - self.horizon = config.mpc_params['horizon'] # Define the number of discretization steps - self.dt = config.mpc_params['dt'] + self.inertia=inertia + self.mass=mass + self.robot_name=robot_name #this is only used when using hyqreal robot + + self.horizon = self.mpc_parameters['horizon'] # Define the number of discretization steps + self.dt = self.mpc_parameters['dt'] self.T_horizon = self.horizon*self.dt - self.use_RTI = config.mpc_params['use_RTI'] - self.use_integrators = config.mpc_params['use_integrators'] - self.use_warm_start = config.mpc_params['use_warm_start'] - self.use_foothold_constraints = config.mpc_params['use_foothold_constraints'] + self.use_RTI = self.mpc_parameters['use_RTI'] + self.use_integrators = self.mpc_parameters['use_integrators'] + self.use_warm_start = self.mpc_parameters['use_warm_start'] + self.use_foothold_constraints = self.mpc_parameters['use_foothold_constraints'] - self.use_static_stability = config.mpc_params['use_static_stability'] - self.use_zmp_stability = config.mpc_params['use_zmp_stability'] + self.use_static_stability = self.mpc_parameters['use_static_stability'] + self.use_zmp_stability = self.mpc_parameters['use_zmp_stability'] self.use_stability_constraints = self.use_static_stability or self.use_zmp_stability - self.use_input_prediction = config.mpc_params['use_input_prediction'] + self.use_input_prediction = self.mpc_parameters['use_input_prediction'] - self.use_DDP = config.mpc_params['use_DDP'] + self.use_DDP = self.mpc_parameters['use_DDP'] self.previous_status = -1 @@ -222,8 +226,8 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: init_base = np.array([0, 0, 0]) init_base_yaw = np.array([0]) init_external_wrench = np.array([0, 0, 0, 0, 0, 0]) - init_inertia = config.inertia.reshape((9,)) - init_mass = np.array([config.mass]) + init_inertia = self.inertia.reshape((9,)) + init_mass = np.array([self.mass]) ocp.parameter_values = np.concatenate((init_contact_status, init_mu, init_stance_proximity, init_base, init_base_yaw, init_external_wrench, @@ -236,7 +240,7 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: ocp.solver_options.integrator_type = "ERK" #ERK IRK GNSF DISCRETE if(self.use_DDP): ocp.solver_options.nlp_solver_type = 'DDP' - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] #ocp.solver_options.globalization = 'MERIT_BACKTRACKING' ocp.solver_options.with_adaptive_levenberg_marquardt = True @@ -252,32 +256,32 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: ocp.solver_options.nlp_solver_max_iter = 1 # Set the RTI type for the advanced RTI method # (see https://arxiv.org/pdf/2403.07101.pdf) - if(config.mpc_params['as_rti_type'] == "AS-RTI-A"): + if(self.mpc_parameters['as_rti_type'] == "AS-RTI-A"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 0 - elif(config.mpc_params['as_rti_type'] == "AS-RTI-B"): + elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-B"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 1 - elif(config.mpc_params['as_rti_type'] == "AS-RTI-C"): + elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-C"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 2 - elif(config.mpc_params['as_rti_type'] == "AS-RTI-D"): + elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-D"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 3 else: ocp.solver_options.nlp_solver_type = "SQP" - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] #ocp.solver_options.globalization = "MERIT_BACKTRACKING" # FIXED_STEP, MERIT_BACKTRACKING - if(config.mpc_params['solver_mode'] == "balance"): + if(self.mpc_parameters['solver_mode'] == "balance"): ocp.solver_options.hpipm_mode = "BALANCE" - elif(config.mpc_params['solver_mode'] == "robust"): + elif(self.mpc_parameters['solver_mode'] == "robust"): ocp.solver_options.hpipm_mode = "ROBUST" - elif(config.mpc_params['solver_mode'] == "fast"): + elif(self.mpc_parameters['solver_mode'] == "fast"): ocp.solver_options.qp_solver_iter_max = 10 ocp.solver_options.hpipm_mode = "SPEED" - elif(config.mpc_params['solver_mode'] == "crazy_speed"): + elif(self.mpc_parameters['solver_mode'] == "crazy_speed"): ocp.solver_options.qp_solver_iter_max = 5 ocp.solver_options.hpipm_mode = "SPEED_ABS" @@ -294,9 +298,9 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: # Nonuniform discretization - if(config.mpc_params['use_nonuniform_discretization']): - time_steps_fine_grained = np.tile(config.mpc_params['dt_fine_grained'], config.mpc_params['horizon_fine_grained']) - time_steps = np.concatenate((time_steps_fine_grained, np.tile(self.dt, self.horizon - config.mpc_params['horizon_fine_grained']))) + if(self.mpc_parameters['use_nonuniform_discretization']): + time_steps_fine_grained = np.tile(self.mpc_parameters['dt_fine_grained'], self.mpc_parameters['horizon_fine_grained']) + time_steps = np.concatenate((time_steps_fine_grained, np.tile(self.dt, self.horizon - self.mpc_parameters['horizon_fine_grained']))) shooting_nodes = np.zeros((self.horizon+1,)) for i in range(len(time_steps)): shooting_nodes[i+1] = shooting_nodes[i] + time_steps[i] @@ -516,8 +520,8 @@ def create_friction_cone_constraints(self,): t = np.array([1, 0, 0]) b = np.array([0, 1, 0]) mu = self.centroidal_model.mu_friction - f_max = config.mpc_params['grf_max'] - f_min = config.mpc_params['grf_min'] + f_max = self.mpc_parameters['grf_max'] + f_min = self.mpc_parameters['grf_min'] # Derivation can be found in the paper # "High-slope terrain locomotion for torque-controlled quadruped robots", @@ -605,7 +609,7 @@ def set_weight(self, nx: int, nu: int): Q_pitch_integral_integral = np.array([10]) #integral of pitch - if(config.robot == "hyqreal"): + if(self.robot_name == "hyqreal"): Q_foot_force = np.array([0.00001, 0.00001, 0.00001]) #f_x, f_y, f_z (should be 4 times this, once per foot) else: Q_foot_force = np.array([0.0001, 0.0001, 0.0001]) #f_x, f_y, f_z (should be 4 times this, once per foot) @@ -991,7 +995,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif(np.array_equal(FL_contact_sequence, RR_contact_sequence) and np.array_equal(FR_contact_sequence, RL_contact_sequence)): #TROT - stability_margin = config.mpc_params['trot_stability_margin'] + stability_margin = self.mpc_parameters['trot_stability_margin'] if(FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_FL_RR = 0 + stability_margin lb_support_FL_RR = 0 - stability_margin @@ -1003,7 +1007,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif(np.array_equal(FL_contact_sequence, RL_contact_sequence) and np.array_equal(FR_contact_sequence, RR_contact_sequence)): #PACE - stability_margin = config.mpc_params['pace_stability_margin'] + stability_margin = self.mpc_parameters['pace_stability_margin'] if(FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_RL_FL = 0 + stability_margin lb_support_RL_FL = 0 - stability_margin @@ -1015,7 +1019,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h else: #CRAWL BACKDIAGONALCRAWL ONLY - stability_margin = config.mpc_params['crawl_stability_margin'] + stability_margin = self.mpc_parameters['crawl_stability_margin'] if(FL_contact_sequence[j] == 1): if(FR_contact_sequence[j] == 1): @@ -1207,7 +1211,7 @@ def perform_scaling(self, state, reference, constraint = None): # Main loop for computing the control def compute_control(self, state, reference, contact_sequence, constraint = None, external_wrenches = np.zeros((6,)), - inertia = config.inertia.reshape((9,)), mass = config.mass): + inertia = None, mass = None): # Take the array of the contact sequence and split it in 4 arrays, @@ -1315,7 +1319,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, # Fill stance param, friction and stance proximity # (stance proximity will disable foothold optimization near a stance!!) - mu = config.mpc_params['mu'] + mu = self.mpc_parameters['mu'] yaw = state["orientation"][2] # Stance Proximity ugly routine. Basically we disable foothold optimization @@ -1367,9 +1371,9 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, # If we have estimated an external wrench, we can compensate it for all steps # or less (maybe the disturbance is not costant along the horizon!) - if(config.mpc_params['external_wrenches_compensation'] and - config.mpc_params['external_wrenches_compensation_num_step'] and - j < config.mpc_params['external_wrenches_compensation_num_step']): + if(self.mpc_parameters['external_wrenches_compensation'] and + self.mpc_parameters['external_wrenches_compensation_num_step'] and + j < self.mpc_parameters['external_wrenches_compensation_num_step']): external_wrenches_estimated_param = copy.deepcopy(external_wrenches) external_wrenches_estimated_param = external_wrenches_estimated_param.reshape((6, )) else: @@ -1415,7 +1419,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, if(self.use_integrators): # Compute error for integral action - alpha_integrator = config.mpc_params["alpha_integrator"] + alpha_integrator = self.mpc_parameters["alpha_integrator"] self.integral_errors[0] += (state["position"][2] - reference["ref_position"][2])*alpha_integrator self.integral_errors[1] += (state["linear_velocity"][0] - reference["ref_linear_velocity"][0])*alpha_integrator self.integral_errors[2] += (state["linear_velocity"][1] - reference["ref_linear_velocity"][1])*alpha_integrator @@ -1424,12 +1428,12 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, self.integral_errors[5] += (state["orientation"][1] - reference["ref_orientation"][1])*alpha_integrator - cap_integrator_z = config.mpc_params["integrator_cap"][0] - cap_integrator_x_dot = config.mpc_params["integrator_cap"][1] - cap_integrator_y_dot = config.mpc_params["integrator_cap"][2] - cap_integrator_z_dot = config.mpc_params["integrator_cap"][3] - cap_integrator_roll = config.mpc_params["integrator_cap"][4] - cap_integrator_pitch = config.mpc_params["integrator_cap"][5] + cap_integrator_z = self.mpc_parameters["integrator_cap"][0] + cap_integrator_x_dot = self.mpc_parameters["integrator_cap"][1] + cap_integrator_y_dot = self.mpc_parameters["integrator_cap"][2] + cap_integrator_z_dot = self.mpc_parameters["integrator_cap"][3] + cap_integrator_roll = self.mpc_parameters["integrator_cap"][4] + cap_integrator_pitch = self.mpc_parameters["integrator_cap"][5] self.integral_errors[0] = np.where(np.abs(self.integral_errors[0]) > cap_integrator_z, cap_integrator_z*np.sign(self.integral_errors[0]), self.integral_errors[0]) self.integral_errors[1] = np.where(np.abs(self.integral_errors[1]) > cap_integrator_x_dot, cap_integrator_x_dot*np.sign(self.integral_errors[1]), self.integral_errors[1]) @@ -1492,8 +1496,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None, - if(config.mpc_params['dt'] <= 0.02 or ( - config.mpc_params['use_nonuniform_discretization'] and config.mpc_params['dt_fine_grained'] <= 0.02)): + if(self.mpc_parameters['dt'] <= 0.02 or ( + self.mpc_parameters['use_nonuniform_discretization'] and self.mpc_parameters['dt_fine_grained'] <= 0.02)): optimal_next_state_index = 2 else: optimal_next_state_index = 1 diff --git a/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_gait_adaptive.py b/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_gait_adaptive.py index fa7b27b..6d4b84a 100644 --- a/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_gait_adaptive.py +++ b/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_gait_adaptive.py @@ -11,27 +11,33 @@ import casadi as cs from acados_template import AcadosOcp, AcadosOcpBatchSolver -from quadruped_pympc import config from .centroidal_model_nominal import Centroidal_Model_Nominal # Class for the Acados NMPC, the model is in another file! class Acados_NMPC_GaitAdaptive(): - def __init__(self): + def __init__(self,mpc_parameters=None,inertia=None,mass=None,robot_name=None): - self.horizon = config.mpc_params['horizon'] # Define the number of discretization steps - self.dt = config.mpc_params['dt'] + + + self.mpc_parameters = mpc_parameters + self.inertia=inertia + self.mass=mass + self.robot_name=robot_name #this is only used when using hyqreal robot + + self.horizon = self.mpc_parameters['horizon'] # Define the number of discretization steps + self.dt = self.mpc_parameters['dt'] self.T_horizon = self.horizon * self.dt - self.use_RTI = config.mpc_params['use_RTI'] - self.use_integrators = config.mpc_params['use_integrators'] - self.use_warm_start = config.mpc_params['use_warm_start'] - self.use_foothold_constraints = config.mpc_params['use_foothold_constraints'] + self.use_RTI = self.mpc_parameters['use_RTI'] + self.use_integrators = self.mpc_parameters['use_integrators'] + self.use_warm_start = self.mpc_parameters['use_warm_start'] + self.use_foothold_constraints = self.mpc_parameters['use_foothold_constraints'] - self.use_static_stability = config.mpc_params['use_static_stability'] - self.use_zmp_stability = config.mpc_params['use_zmp_stability'] + self.use_static_stability = self.mpc_parameters['use_static_stability'] + self.use_zmp_stability = self.mpc_parameters['use_zmp_stability'] self.use_stability_constraints = self.use_static_stability or self.use_zmp_stability - self.use_DDP = config.mpc_params['use_DDP'] + self.use_DDP = self.mpc_parameters['use_DDP'] self.previous_status = -1 self.previous_contact_sequence = np.zeros((4, self.horizon)) @@ -51,11 +57,11 @@ def __init__(self): # Create the acados ocp solver self.ocp = self.create_ocp_solver_description(acados_model, num_threads_in_batch_solve=len( - config.mpc_params['step_freq_available'])) + self.mpc_parameters['step_freq_available'])) # Batch solver - use_batch_solver = config.mpc_params['optimize_step_freq'] - num_batch = len(config.mpc_params['step_freq_available']) + use_batch_solver = self.mpc_parameters['optimize_step_freq'] + num_batch = len(self.mpc_parameters['step_freq_available']) self.batch = num_batch # batch_ocp = self.create_ocp_solver_description(acados_model, num_threads_in_batch_solve) batch_ocp = self.ocp @@ -183,8 +189,8 @@ def create_ocp_solver_description(self, acados_model, num_threads_in_batch_solve init_base_position = np.array([0, 0, 0]) init_base_yaw = np.array([0]) init_external_wrench = np.array([0, 0, 0, 0, 0, 0]) - init_inertia = config.inertia.reshape((9,)) - init_mass = np.array([config.mass]) + init_inertia = self.inertia.reshape((9,)) + init_mass = np.array([self.mass]) ocp.parameter_values = np.concatenate((init_contact_status, init_mu, init_stance_proximity, init_base_position, init_base_yaw, init_external_wrench, @@ -197,7 +203,7 @@ def create_ocp_solver_description(self, acados_model, num_threads_in_batch_solve ocp.solver_options.integrator_type = "ERK" # ERK IRK GNSF DISCRETE if (self.use_DDP): ocp.solver_options.nlp_solver_type = 'DDP' - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] # ocp.solver_options.globalization = 'MERIT_BACKTRACKING' ocp.solver_options.with_adaptive_levenberg_marquardt = True @@ -213,32 +219,32 @@ def create_ocp_solver_description(self, acados_model, num_threads_in_batch_solve ocp.solver_options.nlp_solver_max_iter = 1 # Set the RTI type for the advanced RTI method # (see https://arxiv.org/pdf/2403.07101.pdf) - if (config.mpc_params['as_rti_type'] == "AS-RTI-A"): + if (self.mpc_parameters['as_rti_type'] == "AS-RTI-A"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 0 - elif (config.mpc_params['as_rti_type'] == "AS-RTI-B"): + elif (self.mpc_parameters['as_rti_type'] == "AS-RTI-B"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 1 - elif (config.mpc_params['as_rti_type'] == "AS-RTI-C"): + elif (self.mpc_parameters['as_rti_type'] == "AS-RTI-C"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 2 - elif (config.mpc_params['as_rti_type'] == "AS-RTI-D"): + elif (self.mpc_parameters['as_rti_type'] == "AS-RTI-D"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 3 else: ocp.solver_options.nlp_solver_type = "SQP" - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] # ocp.solver_options.globalization = "MERIT_BACKTRACKING" # FIXED_STEP, MERIT_BACKTRACKING - if (config.mpc_params['solver_mode'] == "balance"): + if (self.mpc_parameters['solver_mode'] == "balance"): ocp.solver_options.hpipm_mode = "BALANCE" - elif (config.mpc_params['solver_mode'] == "robust"): + elif (self.mpc_parameters['solver_mode'] == "robust"): ocp.solver_options.hpipm_mode = "ROBUST" - elif (config.mpc_params['solver_mode'] == "fast"): + elif (self.mpc_parameters['solver_mode'] == "fast"): ocp.solver_options.qp_solver_iter_max = 10 ocp.solver_options.hpipm_mode = "SPEED" - elif (config.mpc_params['solver_mode'] == "crazy_speed"): + elif (self.mpc_parameters['solver_mode'] == "crazy_speed"): ocp.solver_options.qp_solver_iter_max = 5 ocp.solver_options.hpipm_mode = "SPEED_ABS" @@ -252,11 +258,11 @@ def create_ocp_solver_description(self, acados_model, num_threads_in_batch_solve ocp.solver_options.tf = self.T_horizon # Nonuniform discretization - if (config.mpc_params['use_nonuniform_discretization']): - time_steps_fine_grained = np.tile(config.mpc_params['dt_fine_grained'], - config.mpc_params['horizon_fine_grained']) + if (self.mpc_parameters['use_nonuniform_discretization']): + time_steps_fine_grained = np.tile(self.mpc_parameters['dt_fine_grained'], + self.mpc_parameters['horizon_fine_grained']) time_steps = np.concatenate( - (time_steps_fine_grained, np.tile(self.dt, self.horizon - config.mpc_params['horizon_fine_grained']))) + (time_steps_fine_grained, np.tile(self.dt, self.horizon - self.mpc_parameters['horizon_fine_grained']))) shooting_nodes = np.zeros((self.horizon + 1,)) for i in range(len(time_steps)): shooting_nodes[i + 1] = shooting_nodes[i] + time_steps[i] @@ -455,8 +461,8 @@ def create_friction_cone_constraints(self, ) -> None: t = np.array([1, 0, 0]) b = np.array([0, 1, 0]) mu = self.centroidal_model.mu_friction - f_max = config.mpc_params['grf_max'] - f_min = config.mpc_params['grf_min'] + f_max = self.mpc_parameters['grf_max'] + f_min = self.mpc_parameters['grf_min'] # Derivation can be found in the paper # "High-slope terrain locomotion for torque-controlled quadruped robots", @@ -529,7 +535,7 @@ def set_weight(self, nx, nu): Q_pitch_integral_integral = np.array([10]) # integral of pitch R_foot_vel = np.array([0.0001, 0.0001, 0.00001]) # v_x, v_y, v_z (should be 4 times this, once per foot) - if (config.robot == "hyqreal"): + if (self.robot_name == "hyqreal"): R_foot_force = np.array( [0.00001, 0.00001, 0.00001]) # f_x, f_y, f_z (should be 4 times this, once per foot) else: @@ -886,7 +892,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif (np.array_equal(FL_contact_sequence, RR_contact_sequence) and np.array_equal(FR_contact_sequence, RL_contact_sequence)): # TROT - stability_margin = config.mpc_params['trot_stability_margin'] + stability_margin = self.mpc_parameters['trot_stability_margin'] if (FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_FL_RR = 0 + stability_margin lb_support_FL_RR = 0 - stability_margin @@ -898,7 +904,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif (np.array_equal(FL_contact_sequence, RL_contact_sequence) and np.array_equal(FR_contact_sequence, RR_contact_sequence)): # PACE - stability_margin = config.mpc_params['pace_stability_margin'] + stability_margin = self.mpc_parameters['pace_stability_margin'] if (FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_RL_FL = 0 + stability_margin lb_support_RL_FL = 0 - stability_margin @@ -909,7 +915,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h else: # CRAWL BACKDIAGONALCRAWL ONLY - stability_margin = config.mpc_params['crawl_stability_margin'] + stability_margin = self.mpc_parameters['crawl_stability_margin'] if (FL_contact_sequence[j] == 1): if (FR_contact_sequence[j] == 1): @@ -1026,7 +1032,7 @@ def perform_scaling(self, state, reference, constraint=None): # Main loop for computing batched control def compute_batch_control(self, state, reference, contact_sequence, constraint=None, external_wrenches=np.zeros((6,)), - inertia=config.inertia.reshape((9,)), mass=config.mass): + inertia=None, mass=None): start = time.time() costs = [] @@ -1076,7 +1082,7 @@ def compute_batch_control(self, state, reference, contact_sequence, constraint=N # Fill stance param, friction and stance proximity # (stance proximity will disable foothold optimization near a stance!!) - mu = config.mpc_params['mu'] + mu = self.mpc_parameters['mu'] state_acados = np.concatenate((state["position"], state["linear_velocity"], state["orientation"], state["angular_velocity"], @@ -1157,12 +1163,12 @@ def compute_batch_control(self, state, reference, contact_sequence, constraint=N cost_single_qp = self.batch_solver.ocp_solvers[n].get_cost() if (n != 0): cost_single_qp += 3 * ( - config.mpc_params["step_freq_available"][n] - config.mpc_params["step_freq_available"][ + self.mpc_parameters["step_freq_available"][n] - self.mpc_parameters["step_freq_available"][ 0]) ** 2 costs.append(cost_single_qp) best_freq_index = np.argmin(costs) - best_freq = config.mpc_params["step_freq_available"][best_freq_index] + best_freq = self.mpc_parameters["step_freq_available"][best_freq_index] print("costs: ", costs) print("best_freq: ", best_freq) diff --git a/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_nominal.py b/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_nominal.py index be3e87b..6f8b6f2 100644 --- a/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_nominal.py +++ b/quadruped_pympc/controllers/gradient/nominal/centroidal_nmpc_nominal.py @@ -10,26 +10,30 @@ import casadi as cs import copy -import quadruped_pympc.config as config # Class for the Acados NMPC, the model is in another file! class Acados_NMPC_Nominal: - def __init__(self): + def __init__(self,mpc_parameters=None,inertia=None,mass=None,robot_name=None): - self.horizon = config.mpc_params['horizon'] # Define the number of discretization steps - self.dt = config.mpc_params['dt'] + self.mpc_parameters = mpc_parameters + self.inertia=inertia + self.mass=mass + self.robot_name=robot_name #this is only used when using hyqreal robot + + self.horizon = self.mpc_parameters['horizon'] # Define the number of discretization steps + self.dt = self.mpc_parameters['dt'] self.T_horizon = self.horizon * self.dt - self.use_RTI = config.mpc_params['use_RTI'] - self.use_integrators = config.mpc_params['use_integrators'] - self.use_warm_start = config.mpc_params['use_warm_start'] - self.use_foothold_constraints = config.mpc_params['use_foothold_constraints'] + self.use_RTI = self.mpc_parameters['use_RTI'] + self.use_integrators = self.mpc_parameters['use_integrators'] + self.use_warm_start = self.mpc_parameters['use_warm_start'] + self.use_foothold_constraints = self.mpc_parameters['use_foothold_constraints'] - self.use_static_stability = config.mpc_params['use_static_stability'] - self.use_zmp_stability = config.mpc_params['use_zmp_stability'] + self.use_static_stability = self.mpc_parameters['use_static_stability'] + self.use_zmp_stability = self.mpc_parameters['use_zmp_stability'] self.use_stability_constraints = self.use_static_stability or self.use_zmp_stability - self.use_DDP = config.mpc_params['use_DDP'] + self.use_DDP = self.mpc_parameters['use_DDP'] self.previous_status = -1 self.previous_contact_sequence = np.zeros((4, self.horizon)) @@ -47,16 +51,25 @@ def __init__(self): self.states_dim = acados_model.x.size()[0] self.inputs_dim = acados_model.u.size()[0] - # Create the acados ocp solver - self.ocp = self.create_ocp_solver_description(acados_model) + # # # Create the acados ocp solver + # # self.ocp = self.create_ocp_solver_description(acados_model) + + # # code_export_dir = pathlib.Path(__file__).parent / 'c_generated_code' + # # self.ocp.code_export_directory = str(code_export_dir) + # # self.acados_ocp_solver = AcadosOcpSolver(self.ocp, + # # json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + + # # ".json") + self.ocp = self.create_ocp_solver_description(acados_model) + code_export_dir = pathlib.Path(__file__).parent / 'c_generated_code' self.ocp.code_export_directory = str(code_export_dir) - - self.acados_ocp_solver = AcadosOcpSolver(self.ocp, - json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + - ".json") - + if ((not code_export_dir) or code_export_dir == []): + self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json") + + else : + self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json", build = False, generate = True) + # Initialize solver for stage in range(self.horizon + 1): self.acados_ocp_solver.set(stage, "x", np.zeros((self.states_dim,))) @@ -178,8 +191,8 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: init_base_position = np.array([0, 0, 0]) init_base_yaw = np.array([0]) init_external_wrench = np.array([0, 0, 0, 0, 0, 0]) - init_inertia = config.inertia.reshape((9,)) - init_mass = np.array([config.mass]) + init_inertia = self.inertia.reshape((9,)) + init_mass = np.array([self.mass]) ocp.parameter_values = np.concatenate((init_contact_status, init_mu, init_stance_proximity, init_base_position, init_base_yaw, init_external_wrench, @@ -192,7 +205,7 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: ocp.solver_options.integrator_type = "ERK" # ERK IRK GNSF DISCRETE if (self.use_DDP): ocp.solver_options.nlp_solver_type = 'DDP' - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] # ocp.solver_options.globalization = 'MERIT_BACKTRACKING' ocp.solver_options.with_adaptive_levenberg_marquardt = True @@ -208,32 +221,32 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: ocp.solver_options.nlp_solver_max_iter = 1 # Set the RTI type for the advanced RTI method # (see https://arxiv.org/pdf/2403.07101.pdf) - if (config.mpc_params['as_rti_type'] == "AS-RTI-A"): + if (self.mpc_parameters['as_rti_type'] == "AS-RTI-A"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 0 - elif (config.mpc_params['as_rti_type'] == "AS-RTI-B"): + elif (self.mpc_parameters['as_rti_type'] == "AS-RTI-B"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 1 - elif (config.mpc_params['as_rti_type'] == "AS-RTI-C"): + elif (self.mpc_parameters['as_rti_type'] == "AS-RTI-C"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 2 - elif (config.mpc_params['as_rti_type'] == "AS-RTI-D"): + elif (self.mpc_parameters['as_rti_type'] == "AS-RTI-D"): ocp.solver_options.as_rti_iter = 1 ocp.solver_options.as_rti_level = 3 else: ocp.solver_options.nlp_solver_type = "SQP" - ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations'] + ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations'] # ocp.solver_options.globalization = "MERIT_BACKTRACKING" # FIXED_STEP, MERIT_BACKTRACKING - if (config.mpc_params['solver_mode'] == "balance"): + if (self.mpc_parameters['solver_mode'] == "balance"): ocp.solver_options.hpipm_mode = "BALANCE" - elif (config.mpc_params['solver_mode'] == "robust"): + elif (self.mpc_parameters['solver_mode'] == "robust"): ocp.solver_options.hpipm_mode = "ROBUST" - elif (config.mpc_params['solver_mode'] == "fast"): + elif (self.mpc_parameters['solver_mode'] == "fast"): ocp.solver_options.qp_solver_iter_max = 10 ocp.solver_options.hpipm_mode = "SPEED" - elif (config.mpc_params['solver_mode'] == "crazy_speed"): + elif (self.mpc_parameters['solver_mode'] == "crazy_speed"): ocp.solver_options.qp_solver_iter_max = 5 ocp.solver_options.hpipm_mode = "SPEED_ABS" @@ -246,11 +259,11 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp: ocp.solver_options.tf = self.T_horizon # Nonuniform discretization - if (config.mpc_params['use_nonuniform_discretization']): - time_steps_fine_grained = np.tile(config.mpc_params['dt_fine_grained'], - config.mpc_params['horizon_fine_grained']) + if (self.mpc_parameters['use_nonuniform_discretization']): + time_steps_fine_grained = np.tile(self.mpc_parameters['dt_fine_grained'], + self.mpc_parameters['horizon_fine_grained']) time_steps = np.concatenate( - (time_steps_fine_grained, np.tile(self.dt, self.horizon - config.mpc_params['horizon_fine_grained']))) + (time_steps_fine_grained, np.tile(self.dt, self.horizon - self.mpc_parameters['horizon_fine_grained']))) shooting_nodes = np.zeros((self.horizon + 1,)) for i in range(len(time_steps)): shooting_nodes[i + 1] = shooting_nodes[i] + time_steps[i] @@ -449,8 +462,8 @@ def create_friction_cone_constraints(self, ) -> None: t = np.array([1, 0, 0]) b = np.array([0, 1, 0]) mu = self.centroidal_model.mu_friction - f_max = config.mpc_params['grf_max'] - f_min = config.mpc_params['grf_min'] + f_max = self.mpc_parameters['grf_max'] + f_min = self.mpc_parameters['grf_min'] # Derivation can be found in the paper # "High-slope terrain locomotion for torque-controlled quadruped robots", @@ -523,7 +536,7 @@ def set_weight(self, nx, nu): Q_pitch_integral_integral = np.array([10]) # integral of pitch R_foot_vel = np.array([0.0001, 0.0001, 0.00001]) # v_x, v_y, v_z (should be 4 times this, once per foot) - if (config.robot == "hyqreal"): + if (self.robot_name == "hyqreal"): R_foot_force = np.array( [0.00001, 0.00001, 0.00001]) # f_x, f_y, f_z (should be 4 times this, once per foot) else: @@ -884,7 +897,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif (np.array_equal(FL_contact_sequence, RR_contact_sequence) and np.array_equal(FR_contact_sequence, RL_contact_sequence)): # TROT - stability_margin = config.mpc_params['trot_stability_margin'] + stability_margin = self.mpc_parameters['trot_stability_margin'] if (FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_FL_RR = 0 + stability_margin lb_support_FL_RR = 0 - stability_margin @@ -896,7 +909,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h elif (np.array_equal(FL_contact_sequence, RL_contact_sequence) and np.array_equal(FR_contact_sequence, RR_contact_sequence)): # PACE - stability_margin = config.mpc_params['pace_stability_margin'] + stability_margin = self.mpc_parameters['pace_stability_margin'] if (FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0): ub_support_RL_FL = 0 + stability_margin lb_support_RL_FL = 0 - stability_margin @@ -907,7 +920,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h else: # CRAWL BACKDIAGONALCRAWL ONLY - stability_margin = config.mpc_params['crawl_stability_margin'] + stability_margin = self.mpc_parameters['crawl_stability_margin'] if (FL_contact_sequence[j] == 1): if (FR_contact_sequence[j] == 1): @@ -1084,8 +1097,9 @@ def perform_scaling(self, state, reference, constraint=None): # Main loop for computing the control def compute_control(self, state, reference, contact_sequence, constraint=None, external_wrenches=np.zeros((6,)), - inertia=config.inertia.reshape((9,)), mass=config.mass): - + inertia=None, mass=None): + + # Take the array of the contact sequence and split it in 4 arrays, # one for each leg FL_contact_sequence = contact_sequence[0] @@ -1179,7 +1193,7 @@ def compute_control(self, state, reference, contact_sequence, constraint=None, e # Fill stance param, friction and stance proximity # (stance proximity will disable foothold optimization near a stance!!) - mu = config.mpc_params['mu'] + mu = self.mpc_parameters['mu'] yaw = state["orientation"][2] # Stance Proximity ugly routine. Basically we disable foothold optimization @@ -1227,9 +1241,9 @@ def compute_control(self, state, reference, contact_sequence, constraint=None, e for j in range(self.horizon): # If we have estimated an external wrench, we can compensate it for all steps # or less (maybe the disturbance is not costant along the horizon!) - if (config.mpc_params['external_wrenches_compensation'] and - config.mpc_params['external_wrenches_compensation_num_step'] and - j < config.mpc_params['external_wrenches_compensation_num_step']): + if (self.mpc_parameters['external_wrenches_compensation'] and + self.mpc_parameters['external_wrenches_compensation_num_step'] and + j < self.mpc_parameters['external_wrenches_compensation_num_step']): external_wrenches_estimated_param = copy.deepcopy(external_wrenches) external_wrenches_estimated_param = external_wrenches_estimated_param.reshape((6,)) else: @@ -1268,7 +1282,7 @@ def compute_control(self, state, reference, contact_sequence, constraint=None, e if (self.use_integrators): # Compute error for integral action - alpha_integrator = config.mpc_params["alpha_integrator"] + alpha_integrator = self.mpc_parameters["alpha_integrator"] self.integral_errors[0] += (state["position"][2] - reference["ref_position"][2]) * alpha_integrator self.integral_errors[1] += (state["linear_velocity"][0] - reference["ref_linear_velocity"][ 0]) * alpha_integrator @@ -1279,12 +1293,12 @@ def compute_control(self, state, reference, contact_sequence, constraint=None, e self.integral_errors[4] += (state["orientation"][0] - reference["ref_orientation"][0]) * (alpha_integrator) self.integral_errors[5] += (state["orientation"][1] - reference["ref_orientation"][1]) * alpha_integrator - cap_integrator_z = config.mpc_params["integrator_cap"][0] - cap_integrator_x_dot = config.mpc_params["integrator_cap"][1] - cap_integrator_y_dot = config.mpc_params["integrator_cap"][2] - cap_integrator_z_dot = config.mpc_params["integrator_cap"][3] - cap_integrator_roll = config.mpc_params["integrator_cap"][4] - cap_integrator_pitch = config.mpc_params["integrator_cap"][5] + cap_integrator_z = self.mpc_parameters["integrator_cap"][0] + cap_integrator_x_dot = self.mpc_parameters["integrator_cap"][1] + cap_integrator_y_dot = self.mpc_parameters["integrator_cap"][2] + cap_integrator_z_dot = self.mpc_parameters["integrator_cap"][3] + cap_integrator_roll = self.mpc_parameters["integrator_cap"][4] + cap_integrator_pitch = self.mpc_parameters["integrator_cap"][5] self.integral_errors[0] = np.where(np.abs(self.integral_errors[0]) > cap_integrator_z, cap_integrator_z * np.sign(self.integral_errors[0]), @@ -1478,8 +1492,8 @@ def compute_control(self, state, reference, contact_sequence, constraint=None, e if (optimal_footholds_assigned[3] == False): optimal_foothold[3] = reference["ref_foot_RR"][0] - if (config.mpc_params['dt'] <= 0.02 or ( - config.mpc_params['use_nonuniform_discretization'] and config.mpc_params['dt_fine_grained'] <= 0.02)): + if (self.mpc_parameters['dt'] <= 0.02 or ( + self.mpc_parameters['use_nonuniform_discretization'] and self.mpc_parameters['dt_fine_grained'] <= 0.02)): optimal_next_state_index = 2 else: optimal_next_state_index = 1 diff --git a/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax.py b/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax.py index 7455c56..4a1acea 100644 --- a/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax.py +++ b/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax.py @@ -13,7 +13,6 @@ sys.path.append(dir_path) sys.path.append(dir_path + '/../') -from quadruped_pympc import config from centroidal_model_jax import Centroidal_Model_JAX import time @@ -31,7 +30,7 @@ class Sampling_MPC: """This is a small class that implements a sampling based control law""" - def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, sampling_method = 'random_sampling', control_parametrization = "linear_spline_1", device="gpu"): + def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, sampling_method = 'random_sampling', control_parametrization = "linear_spline_1", device="gpu",mpc_parameters = None): """ Args: horizon (int): how much to look into the future for optimizing the gains @@ -45,13 +44,16 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, self.num_parallel_computations = num_parallel_computations self.max_sampling_forces = 30 - + self.control_parametrization = control_parametrization self.states_dim = 24 self.inputs_dim = 24 + ## parameters for the controller this are set from a dictionary + self.mpc_parameters = mpc_parameters + if(device=="gpu"): @@ -134,13 +136,13 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, if(sampling_method == 'random_sampling'): self.compute_control = self.compute_control_random_sampling - self.sigma_random_sampling = config.mpc_params['sigma_random_sampling'] + self.sigma_random_sampling = self.mpc_parameters['sigma_random_sampling'] elif(sampling_method == 'mppi'): self.compute_control = self.compute_control_mppi - self.sigma_mppi = config.mpc_params['sigma_mppi'] + self.sigma_mppi = self.mpc_parameters['sigma_mppi'] elif(sampling_method == 'cem_mppi'): self.compute_control = self.compute_control_cem_mppi - self.sigma_cem_mppi = jnp.ones(self.num_control_parameters, dtype=dtype_general) * config.mpc_params['sigma_cem_mppi'] + self.sigma_cem_mppi = jnp.ones(self.num_control_parameters, dtype=dtype_general) * self.mpc_parameters['sigma_cem_mppi'] else: # return error and stop execution print("Error: sampling method not recognized") @@ -198,11 +200,11 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, self.R = self.R.at[23,23].set(0.001) #foot_force_z_RR # mu is the friction coefficient - self.mu = config.mpc_params['mu'] + self.mu = self.mpc_parameters['mu'] # maximum allowed z contact forces - self.f_z_max = config.mpc_params['grf_max'] - self.f_z_min = config.mpc_params['grf_min'] + self.f_z_max = self.mpc_parameters['grf_max'] + self.f_z_min = self.mpc_parameters['grf_min'] self.best_control_parameters = jnp.zeros((self.num_control_parameters,), dtype=dtype_general) @@ -610,7 +612,7 @@ def prepare_state_and_reference(self, state_current, reference_state, """ # Shift the previous solution ahead - if (config.mpc_params['shift_solution']): + if (self.mpc_parameters['shift_solution']): index_shift = 1./mpc_frequency self.best_control_parameters = self.shift_solution(self.best_control_parameters, index_shift) diff --git a/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax_gait_adaptive.py b/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax_gait_adaptive.py index 6f79d5e..2f7fcac 100644 --- a/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax_gait_adaptive.py +++ b/quadruped_pympc/controllers/sampling/centroidal_nmpc_jax_gait_adaptive.py @@ -14,7 +14,6 @@ sys.path.append(dir_path + '/../') sys.path.append(dir_path + '/../helpers/') -from quadruped_pympc import config from centroidal_model_jax import Centroidal_Model_JAX from quadruped_pympc.helpers.periodic_gait_generator_jax import PeriodicGaitGeneratorJax @@ -33,7 +32,7 @@ class Sampling_MPC: """This is a small class that implements a sampling based control law""" - def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, sampling_method = 'random_sampling', control_parametrization = "linear_spline_1", device="gpu"): + def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, sampling_method = 'random_sampling', control_parametrization = "linear_spline_1", device="gpu",mpc_parameters = None): """ Args: horizon (int): how much to look into the future for optimizing the gains @@ -55,7 +54,9 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, self.inputs_dim = 24 - + ## parameters for the controller this are set from a dictionary + self.mpc_parameters = mpc_parameters + if(device=="gpu"): self.device = gpu_device else: @@ -136,13 +137,13 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, if(sampling_method == 'random_sampling'): self.compute_control = self.compute_control_random_sampling - self.sigma_random_sampling = config.mpc_params['sigma_random_sampling'] + self.sigma_random_sampling = self.mpc_parameters['sigma_random_sampling'] elif(sampling_method == 'mppi'): self.compute_control = self.compute_control_mppi - self.sigma_mppi = config.mpc_params['sigma_mppi'] + self.sigma_mppi = self.mpc_parameters['sigma_mppi'] elif(sampling_method == 'cem_mppi'): self.compute_control = self.compute_control_cem_mppi - self.sigma_cem_mppi = jnp.ones(self.num_control_parameters, dtype=dtype_general) * config.mpc_params['sigma_cem_mppi'] + self.sigma_cem_mppi = jnp.ones(self.num_control_parameters, dtype=dtype_general) * self.mpc_parameters['sigma_cem_mppi'] else: # return error and stop execution print("Error: sampling method not recognized") @@ -200,11 +201,11 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, self.R = self.R.at[23,23].set(0.001) #foot_force_z_RR # mu is the friction coefficient - self.mu = config.mpc_params['mu'] + self.mu = self.mpc_parameters['mu'] # maximum allowed z contact forces - self.f_z_max = config.mpc_params['grf_max'] - self.f_z_min = config.mpc_params['grf_min'] + self.f_z_max = self.mpc_parameters['grf_max'] + self.f_z_min = self.mpc_parameters['grf_min'] @@ -242,7 +243,7 @@ def __init__(self, horizon = 200, dt = 0.01, num_parallel_computations = 10000, temp3) - self.step_freq_delta = jnp.array(config.mpc_params['step_freq_available']) + self.step_freq_delta = jnp.array(self.mpc_parameters['step_freq_available']) @@ -628,7 +629,7 @@ def prepare_state_and_reference(self, state_current, reference_state, """ # Shift the previous solution ahead - if (config.mpc_params['shift_solution']): + if (self.mpc_parameters['shift_solution']): index_shift = 1./mpc_frequency self.best_control_parameters = self.shift_solution(self.best_control_parameters, index_shift) diff --git a/quadruped_pympc/controllers/srb_controller.py b/quadruped_pympc/controllers/srb_controller.py new file mode 100644 index 0000000..d4915c3 --- /dev/null +++ b/quadruped_pympc/controllers/srb_controller.py @@ -0,0 +1,558 @@ + +import numpy as np + +# Parameters for both MPC and simulation +from quadruped_pympc.helpers.foothold_reference_generator import FootholdReferenceGenerator +from quadruped_pympc.helpers.periodic_gait_generator import PeriodicGaitGenerator +from quadruped_pympc.helpers.swing_trajectory_controller import SwingTrajectoryController +from quadruped_pympc.helpers.terrain_estimator import TerrainEstimator + +#### + +#------------ +class SingleRigidBodyController: + #constructor + def __init__(self, + hip_height=None, + legs_order=None, + inertia=None, + mass=None, + lift_off_positions=None, + simulation_parameters:dict=None, + mpc_parameters:dict=None, + jac_feet_dot=None, + jac_feet_prev=None, + tau=None, + robot_name=None, + ): + # + + #Robot dependent parameters + self.hip_height = hip_height + self.inertia=inertia + self.mass=mass + self.lift_off_positions=lift_off_positions + self.legs_order = legs_order + self.jac_feet_prev = jac_feet_prev + self.jac_feet_dot = jac_feet_dot + self.tau = tau + self.robot_name = robot_name + # MPC parameters + self.mpc_parameters = mpc_parameters + ### All this parameter are for the WBC in simulation come from the dicotionary + self.swing_generator = simulation_parameters['swing_generator'] + self.position_gain_fb = simulation_parameters['swing_position_gain_fb'] + self.velocity_gain_fb = simulation_parameters['swing_velocity_gain_fb'] + self.swing_integral_gain_fb=simulation_parameters['swing_integral_gain_fb'] + self.step_height =simulation_parameters['step_height'] + self.simulation_dt=simulation_parameters['dt'] #remove from here + gait_name=simulation_parameters['gait'] + gait_params=simulation_parameters['gait_params'][gait_name] + self.gait_type, self.duty_factor, self.step_frequency = gait_params['type'], gait_params['duty_factor'], gait_params['step_freq'] + self.use_inertia_recomputation=simulation_parameters['use_inertia_recomputation'] + self.mpc_frequency = simulation_parameters['mpc_frequency'] + self.ref_z=simulation_parameters['ref_z'] + + ## Common MPC parameters + self.controller_type=mpc_parameters['type'] + self.horizon = self.mpc_parameters['horizon'] + self.mpc_dt = self.mpc_parameters['dt'] + self.optimize_step_freq=self.mpc_parameters['optimize_step_freq'] + self.step_freq_available=self.mpc_parameters['step_freq_available'] + ## all of this go in gradient based or collaborative one + self.use_nonuniform_discretization=mpc_parameters['use_nonuniform_discretization'] #this is a common parameter? + + + + + # compute stance and swing tme based on gate selection! + self.stance_time = (1 / self.step_frequency) * self.duty_factor + self.swing_period = (1 - self.duty_factor) * (1 / self.step_frequency) # + 0.07 + ### initialized controller function + self.set_controller() + ## Initialize the foothold reference generator + self.frg = FootholdReferenceGenerator(stance_time=self.stance_time, hip_height=self.hip_height, + lift_off_positions= self.lift_off_positions) + # Create swing trajectory generator + + self.stc = SwingTrajectoryController(step_height=self.step_height, swing_period=self.swing_period, + position_gain_fb=self.position_gain_fb, velocity_gain_fb=self.velocity_gain_fb, + generator=self.swing_generator) + # periodic gate generator + + self.pgg = PeriodicGaitGenerator(duty_factor=self.duty_factor, step_freq=self.step_frequency, gait_type=self.gait_type, + horizon=self.horizon * 2, contact_sequence_dt=self.mpc_dt/2.) + # full stance + if self.gait_type== 7: + self.contact_sequence = np.ones((4, self.horizon*2)) + else: + self.contact_sequence = self.pgg.compute_contact_sequence() + self.nominal_sample_freq = self.step_frequency + + + # Online computation of the inertia parameter + # Terrain estimator + self.terrain_computation = TerrainEstimator() + + #### + self.ref_orientation = np.array([0.0, 0.0, 0.0]) + # # SET REFERENCE AS DICTIONARY + # TODO: I would suggest to create a DataClass for "BaseConfig" used in the PotatoModel controllers. + self.reference_state = {} + # Starting contact sequence + self.previous_contact = np.array([1, 1, 1, 1]) + self.previous_contact_mpc = np.array([1, 1, 1, 1]) + self.current_contact = np.array([1, 1, 1, 1]) + + self.nmpc_GRFs = np.zeros((12,)) + self.nmpc_wrenches = np.zeros((6,)) + self.nmpc_footholds = np.zeros((12,)) + # State + self.state_current, self.state_prev = {}, {} + self.feet_pos = None + + def set_controller(self): + ############################################## + ############ SET THE CONTROLLER TYPE ######## + ############################################## + + if(self.controller_type == 'nominal'): + from quadruped_pympc.controllers.gradient.nominal.centroidal_nmpc_nominal import Acados_NMPC_Nominal + + self.controller = Acados_NMPC_Nominal(mpc_parameters=self.mpc_parameters,inertia=self.inertia,mass=self.mass,robot_name=self.robot_name) + + if self.optimize_step_freq==True: + from quadruped_pympc.controllers.gradient.nominal.centroidal_nmpc_gait_adaptive import Acados_NMPC_GaitAdaptive + self.batched_controller = Acados_NMPC_GaitAdaptive(mpc_parameters=self.mpc_parameters,inertia=self.inertia,mass=self.mass,robot_name=self.robot_name) + + elif(self.controller_type == 'collaborative'): + + from quadruped_pympc.controllers.gradient.collaborative.centroidal_nmpc_collaborative import Acados_NMPC_Collaborative + self.controller_collaborative = Acados_NMPC_Collaborative(mpc_parameters=self.mpc_parameters,robot_name=self.robot_name) + + elif(self.controller_type == 'input_rates'): + from quadruped_pympc.controllers.gradient.input_rates.centroidal_nmpc_input_rates import Acados_NMPC_InputRates + + self.controller = Acados_NMPC_InputRates(mpc_parameters=self.mpc_parameters,inertia=self.inertia,mass=self.mass,robot_name=self.robot_name) + + if self.optimize_step_freq==True: + from quadruped_pympc.controllers.gradient.nominal.centroidal_nmpc_gait_adaptive import Acados_NMPC_GaitAdaptive + self.batched_controller = Acados_NMPC_GaitAdaptive(mpc_parameters=self.mpc_parameters,inertia=self.inertia,mass=self.mass,robot_name=self.robot_name) + + elif(self.controller_type == 'sampling'): + self.sampling_method = self.mpc_parameters['sampling_method'] + self.num_sampling_iterations = self.mpc_parameters['num_sampling_iterations'] + self.sigma_cem_mppi=self.mpc_parameters['sigma_cem_mppi'] + self.shift_solution = self.mpc_parameters['shift_solution'] + + if self.optimize_step_freq==True: + from quadruped_pympc.controllers.sampling.centroidal_nmpc_jax_gait_adaptive import Sampling_MPC + + else: + from quadruped_pympc.controllers.sampling.centroidal_nmpc_jax import Sampling_MPC + + import jax + import jax.numpy as jnp + self.controller = Sampling_MPC(horizon = self.horizon, + dt = self.mpc_dt, + num_parallel_computations = self.mpc_parameters['num_parallel_computations'], + sampling_method = self.sampling_method, + control_parametrization = self.mpc_parameters['control_parametrization'], + device="gpu", + mpc_parameters = self.mpc_parameters) + + self.best_control_parameters = jnp.zeros((self.controller.num_control_parameters, )) + self.jitted_compute_control = jax.jit(self.controller.compute_control, device=self.controller.device) + # jitted_get_key = jax.jit(controller.get_key, device=controller.device) + self.jitted_prepare_state_and_reference = self.controller.prepare_state_and_reference + + self.index_shift = 0 + + # Tracking mpc error (z, x_dot, y_dot, z_dot, roll, pitch, roll_dot, pitch_dot, yaw_dot) + self.mean_tracking = np.zeros((9,)) + self.mean_num_sample = 1 + self.mean_optimizer_cost = 0 + self.optimizer_cost = 0 + + def update_mpc_reference_and_state(self,nv:float,feet_pos:list,hip_pos:list,base_pos:list,base_ori_euler_xyz:list,base_lin_vel:list, + base_ang_vel:list,ref_base_lin_vel:list, ref_base_ang_vel:list): + """ + Update the MPC reference. + + Parameters: + nv (int): mujoco number of variables + feet_pos (list or np.ndarray): Feet positions; + hip_pos (list or np.ndarray): Hip positions; + base_pos (list or np.ndarray): Base position; + base_ori_euler_xyz (list or np.ndarray): Base orientation in Euler angles (XYZ); + base_lin_vel (list or np.ndarray): Base linear velocity; + base_ang_vel (list or np.ndarray): Base angular velocity; + ref_base_lin_vel (list or np.ndarray): Reference base linear velocity; + ref_base_ang_vel (list or np.ndarray): Reference base angular velocity; + """ + # Initialization of variables used in the main control loop + # ____________________________________________________________ + # Set the reference for the state + #if first robot + # self.feet_pos=feet_pos + + # self.feet_pos = [remap_keys[leg] for leg in feet_pos] + self.feet_pos=feet_pos + self.hip_pos = hip_pos + + # Update the robot state -------------------------------- + + + # Update target base velocity + # ------------------------------------------------------- + + # Update the desired contact sequence --------------------------- + self.pgg.run(self.simulation_dt, self.pgg.step_freq) + # full stance + if self.gait_type== 7: + self.contact_sequence = np.ones((4, self.horizon)) + else: + self.contact_sequence = self.pgg.compute_contact_sequence() + + # in the case of nonuniform discretization, we need to subsample the contact sequence + if self.use_nonuniform_discretization==True: + self.contact_sequence = self.pgg.sample_contact_sequence(self.contact_sequence, self.mpc_dt, self.mpc_parameters['dt_fine_grained'], self.mpc_parameters['horizon_fine_grained']) + + + previous_contact = self.current_contact + self.current_contact = np.array([self.contact_sequence[0][0], + self.contact_sequence[1][0], + self.contact_sequence[2][0], + self.contact_sequence[3][0]]) + + + + + # Compute the reference for the footholds --------------------------------------------------- + self.frg.update_lift_off_positions(previous_contact, self.current_contact, self.feet_pos, self.legs_order[0:4]) + ref_feet_pos = self.frg.compute_footholds_reference( + com_position=base_pos, #this comes as input from env + base_ori_euler_xyz=base_ori_euler_xyz, #this comes as input from env + base_xy_lin_vel=base_lin_vel[0:2], #this comes as input from env + ref_base_xy_lin_vel=ref_base_lin_vel[0:2], + hips_position=self.hip_pos, + com_height_nominal=self.ref_z,) + + + # Estimate the terrain slope and elevation ------------------------------------------------------- + terrain_roll, \ + terrain_pitch, \ + terrain_height = self.terrain_computation.compute_terrain_estimation( + base_position=base_pos, + yaw=base_ori_euler_xyz[2], + feet_pos=self.frg.lift_off_positions, + current_contact=self.current_contact) + + self.ref_pos = np.array([0, 0, self.hip_height]) + self.ref_pos[2] = self.ref_z+ terrain_height + # REFERENCE STATE + # Update state reference ------------------------------------------------------------------------ + + # Reference state collaborative formulation + if(self.controller_type == 'collaborative'): + self.reference_state |=dict(ref_position=np.array([0, 0, self.hip_height]), + ref_linear_velocity=ref_base_lin_vel, + ref_angular_velocity=ref_base_ang_vel, + ref_orientation=np.array([terrain_roll, terrain_pitch, 0.0]), + ref_foot_FL=ref_feet_pos.FL.reshape((1, 3)), + ref_foot_FR=ref_feet_pos.FR.reshape((1, 3)), + ref_foot_RL=ref_feet_pos.RL.reshape((1, 3)), + ref_foot_RR=ref_feet_pos.RR.reshape((1, 3)), + ref_end_effector_position_z=np.array([0.50]), + ref_linear_end_effector_velocity_z=np.array([0.0]) + ) + else: + self.reference_state |= dict(ref_foot_FL=ref_feet_pos.FL.reshape((1, 3)), + ref_foot_FR=ref_feet_pos.FR.reshape((1, 3)), + ref_foot_RL=ref_feet_pos.RL.reshape((1, 3)), + ref_foot_RR=ref_feet_pos.RR.reshape((1, 3)), + # Also update the reference base linear velocity and + ref_linear_velocity=ref_base_lin_vel, + ref_angular_velocity=ref_base_ang_vel, + ref_orientation=np.array([terrain_roll, terrain_pitch, 0.0]), + ref_position=self.ref_pos + ) + contact_sequence = self.contact_sequence + reference_state = self.reference_state + state_current = self.state_current + + # CURRENT STATE + #collaborative formulation current state + if(self.controller_type == 'collaborative'): + self.state_current = dict( + position= base_pos, + linear_velocity= base_lin_vel, + orientation= base_ori_euler_xyz, + angular_velocity= base_ang_vel, + foot_FL=self.feet_pos.FL, + foot_FR=self.feet_pos.FR, + foot_RL=self.feet_pos.RL, + foot_RR=self.feet_pos.RR, + passive_arm_force=np.zeros((6,1)) + ) + # State current gradient formulation + else: + self.state_current = dict( + position= base_pos, + linear_velocity= base_lin_vel, + orientation= base_ori_euler_xyz, + angular_velocity= base_ang_vel, + foot_FL=self.feet_pos.FL, + foot_FR=self.feet_pos.FR, + foot_RL=self.feet_pos.RL, + foot_RR=self.feet_pos.RR + ) + + return reference_state,state_current,ref_feet_pos,contact_sequence + + # ------------------------------------------------------------------------------------------------- + + def solve_MPC_main_loop(self,inertia_env: list,ref_feet_pos: list,spring_gain:list,eef_position:list,eef_jacobian:list,ext_wrenches:list): + """ + Main loop to solve the Model Predictive Control (MPC). + + Parameters: + alpha (float): A numeric parameter representing simulation time for solving the MPC; + inertia_env (list or np.ndarray): Inertia values from QuadrupedEnv, + + """ + # Solve OCP --------------------------------------------------------------------------------------- + # if alpha % round(1 / (self.mpc_frequency * self.simulation_dt)) == 0: + + # We can recompute the inertia of the single rigid body model + # or use the fixed one in cfg.py + if self.use_inertia_recomputation==True: + # TODO: d.qpos is not defined + #inertia = srb_inertia_computation.compute_inertia(d.qpos) + inertia = inertia_env # Reflected inertia of base at qpos, in world frame + #this comes as inout from env + else: + inertia = self.inertia.flatten() + if (self.optimize_step_freq == True): + # we can always optimize the step freq, or just at the apex of the swing + # to avoid possible jittering in the solution + optimize_swing = 0 # 1 for always, 0 for apex + for leg_id in range(4): + # Swing time check + if (self.current_contact[leg_id] == 0): + if ((self.stc.swing_time[leg_id] > (self.swing_period / 2.) - 0.02) and \ + (self.stc.swing_time[leg_id] < (self.swing_period / 2.) + 0.02)): + optimize_swing = 1 + self.nominal_sample_freq = self.step_frequency + else: + optimize_swing = 0 + # If we use sampling + if (self.controller_type == 'sampling'): + + # Convert data to jax and shift previous solution + state_current_jax, \ + reference_state_jax, = self.controller.prepare_state_and_reference(self.state_current, + self.reference_state, + self.current_contact, + self.previous_contact_mpc) + + self.previous_contact_mpc = self.current_contact + + + for iter_sampling in range(self.num_sampling_iterations): + + self.controller = self.controller.with_newkey() + + if (self.sampling_method == 'cem_mppi'): + if (iter_sampling == 0): + self.controller = self.controller.with_newsigma(self.sigma_cem_mppi) + + nmpc_GRFs, \ + nmpc_footholds, \ + self.controller.best_control_parameters, \ + best_cost, \ + best_sample_freq, \ + costs, \ + sigma_cem_mppi = self.jitted_compute_control(state_current_jax, reference_state_jax, + self.contact_sequence, self.best_control_parameters, + self.controller.master_key, self.controller.sigma_cem_mppi) + self.controller = self.controller.with_newsigma(sigma_cem_mppi) + else: + nmpc_GRFs, \ + nmpc_footholds, \ + self.best_control_parameters, \ + best_cost, \ + best_sample_freq, \ + costs = self.jitted_compute_control(state_current_jax, reference_state_jax, self.contact_sequence, + self.best_control_parameters, self.controller.master_key, self.pgg.phase_signal, + self.nominal_sample_freq, optimize_swing) + + nmpc_footholds = np.array([ref_feet_pos['FL'], ref_feet_pos['FR'], ref_feet_pos['RL'], ref_feet_pos['RR']]) + nmpc_GRFs = np.array(nmpc_GRFs) + + # if ((self.optimize_step_freq==True) and (optimize_swing == 1)): + # self.pgg.step_freq = np.array([best_sample_freq])[0] + # nominal_sample_freq = self.pgg.step_freq + # self.stance_time = (1 / self.pgg.step_freq) * self.duty_factor + # self.frg.stance_time = self.stance_time + # self.swing_period = (1 - self.duty_factor) * (1 / self.pgg.step_freq) # + 0.07 + # self.stc.regenerate_swing_trajectory_generator(step_height=self.step_height, swing_period=self.swing_period) + # nmpc_footholds = ref_feet_pos #todo return ref feet pos from the controller + # nmpc_GRFs = np.array(nmpc_GRFs) + # self.previous_contact_mpc = self.current_contact + # index_shift = 0 + + # optimizer_cost = best_cost + # cOLLABORATIVE CONTROLLER + elif(self.controller_type == 'collaborative'): + self.state_current['passive_arm_force'] = ext_wrenches + nmpc_GRFs, nmpc_footholds, _, status = self.controller_collaborative.compute_control( + self.state_current, + self.reference_state, + self.contact_sequence, + external_wrenches=ext_wrenches, + end_effector_position=eef_position, + end_effector_jacobian=eef_jacobian, + end_effector_gain=spring_gain) + + # If we use Gradient-Based MPC + else: + nmpc_GRFs, nmpc_footholds, _, status = self.controller.compute_control( + self.state_current, + self.reference_state, + self.contact_sequence, + inertia=inertia, + mass=self.mass) + # optimizer_cost = controller.acados_ocp_solver.get_cost() + if (self.optimize_step_freq==True) and optimize_swing == 1: + contact_sequence_temp = np.zeros((len(self.step_freq_available), 4, self.horizon * 2)) + for j in range(len(self.step_freq_available)): + self.pgg_temp = PeriodicGaitGenerator(duty_factor=self.duty_factor, + step_freq=self.step_freq_available[j], + gait_type=self.gait_type, + horizon=self.horizon * 2, + contact_sequence_dt=self.mpc_dt/2.) + self.pgg_temp.set_phase_signal(self.pgg.phase_signal, init=self.pgg._init) # TODO: Need init? + contact_sequence_temp[j] = self.pgg_temp.compute_contact_sequence() + + # in the case of nonuniform discretization, we need to subsample the contact sequence + if (self.use_nonuniform_discretization==True): + contact_sequence_temp[j] = self.pgg.sample_contact_sequence(self.contact_sequence, self.mpc_dt,self.mpc_parameters['dt_fine_grained'], self.mpc_parameters['horizon_fine_grained']) + + costs, best_sample_freq = self.batched_controller.compute_batch_control( + self.state_current, + self.reference_state, + contact_sequence_temp, + inertia=inertia,) + self.pgg.step_freq = best_sample_freq + self.stance_time = (1 / self.pgg.step_freq) * self.duty_factor + self.frg.stance_time = self.stance_time + self.swing_period = (1 - self.duty_factor) * (1 / self.pgg.step_freq) # + 0.07 + self.stc.regenerate_swing_trajectory_generator(step_height=self.step_height, swing_period=self.swing_period) + # If the controller is using RTI, we need to linearize the mpc after its computation + # this helps to minize the delay between new state->control, but only in a real case. + # Here we are in simulation and does not make any difference for now + if (self.mpc_parameters['use_RTI']==True): + # preparation phase + self.controller.acados_ocp_solver.options_set('rti_phase', 1) + status = self.controller.acados_ocp_solver.solve() + # print("preparation phase time: ", controller.acados_ocp_solver.get_stats('time_tot')) + # If we have optimized the gait, we set all the timing parameters + if (self.optimize_step_freq==True) and optimize_swing == 1: + self.pgg.step_freq = np.array([best_sample_freq])[0] + self.nominal_sample_freq = self.pgg.step_freq + self.frg.stance_time = (1 / self.pgg.step_freq) * self.pgg.duty_factor + swing_period = (1 - self.pgg.duty_factor) * (1 / self.pgg.step_freq) + self.stc.regenerate_swing_trajectory_generator(step_height=self.step_height, swing_period=swing_period) + # current_contact=self.current_contact + # print(nmpc_footholds,"\n", nmpc_GRFs,"\n",current_contact) + assert self.nmpc_footholds is not None,f"nmpc is Nonve" + assert nmpc_GRFs is not None,f"grf is Nonve" + assert self.current_contact is not None,f"grf is Nonve" + current_contact=self.current_contact + self.nmpc_footholds = nmpc_footholds + + self.nmpc_GRFs=nmpc_GRFs + print("nmpc_footholds",nmpc_footholds) + print("nmpc_GRFs",nmpc_GRFs) + print("current_contact",current_contact) + ### + return nmpc_footholds, nmpc_GRFs,current_contact + + def compute_stance_and_swing_torque(self,simulation: bool,feet_jac: dict,feet_vel: dict,legs_qvel_idx: dict,qpos:list, qvel:list,legs_qfrc_bias:dict,legs_mass_matrix:dict, + nmpc_GRFs,nmpc_footholds,tau=None,current_contact=None): + """ + Compute the forward action for the robot based on the provided simulation state and parameters. + + Parameters: + simulation (bool): Indicates if the simulation is active. + feet_jac (dict): Jacobian matrices of the feet. + feet_vel (dict): Feet velocities. + legs_qvel_idx (dict): Index of the joint velocities. + qpos (np.ndarray): Generalized positions vectors. + qvel (np.ndarray): Generalized velocities vector. + legs_qfrc_bias (dict): Bias forces for the legs. + legs_mass_matrix (dict): Mass matrix for the legs. + Returns: + tau (dict): Torque vector or forward action for the robot. + """ + + + + if simulation == True: + nmpc_GRFs = nmpc_GRFs + else: + ##suff + breakpoint() + + # Compute Stance Torque --------------------------------------------------------------------------- + # feet_jac = self.env.feet_jacobians(frame='world', return_rot_jac=False) #come from outside + # Compute feet velocities + # Compute jacobian derivatives of the contact points + self.jac_feet_dot = (feet_jac - self.jac_feet_prev) / self.simulation_dt # Finite difference approximation + self.jac_feet_prev = feet_jac # Update previous Jacobianscomputed + # Compute the torque with the contact jacobian (-J.T @ f) J: R^nv -> R^3, f: R^3 + tau.FL = -np.matmul(feet_jac.FL[:, legs_qvel_idx.FL].T, nmpc_GRFs.FL) #env + tau.FR = -np.matmul(feet_jac.FR[:, legs_qvel_idx.FR].T, nmpc_GRFs.FR) #env + tau.RL = -np.matmul(feet_jac.RL[:, legs_qvel_idx.RL].T, nmpc_GRFs.RL) #env + tau.RR = -np.matmul(feet_jac.RR[:, legs_qvel_idx.RR].T, nmpc_GRFs.RR) #env + + # --------------------------------------------------------------------------------------------------- + + # Compute Swing Torque ------------------------------------------------------------------------------ + # TODO: Move contact sequence to labels FL1, FR1, RL1, RR1 instead of a fixed indexing. + # The swing controller is in the end-effector space. For its computation, + # we save for simplicity joints position and velocities + + # centrifugal, coriolis, gravity + + self.stc.update_swing_time(current_contact, self.legs_order, self.simulation_dt) + + + for leg_id, leg_name in enumerate(self.legs_order): + if self.current_contact[leg_id] == 0: # If in swing phase, compute the swing trajectory tracking control. + tau[leg_name], _, _ = self.stc.compute_swing_control( + leg_id=leg_id, + q_dot=qvel[legs_qvel_idx[leg_name]], + J=feet_jac[leg_name][:, legs_qvel_idx[leg_name]], + J_dot=self.jac_feet_dot[leg_name][:,legs_qvel_idx[leg_name]], + lift_off=self.frg.lift_off_positions[leg_name], + touch_down=nmpc_footholds[leg_name], + foot_pos=self.feet_pos[leg_name], + foot_vel=feet_vel[leg_name], + h=legs_qfrc_bias[leg_name], + mass_matrix=legs_mass_matrix[leg_name] + ) + lift_off_position=self.frg.lift_off_positions + return tau, lift_off_position + + def reset(self): + self.pgg.reset() + self.frg.lift_off_positions = self.lift_off_positions + self.current_contact = np.array([0, 0, 0, 0]) + self.previous_contact = np.asarray(self.current_contact) + + + + + diff --git a/quadruped_pympc/helpers/periodic_gait_generator.py b/quadruped_pympc/helpers/periodic_gait_generator.py index 4b539df..d7717d0 100644 --- a/quadruped_pympc/helpers/periodic_gait_generator.py +++ b/quadruped_pympc/helpers/periodic_gait_generator.py @@ -32,6 +32,8 @@ def reset(self): self.phase_offset = [0.0, 0.5, 0.75, 0.25] elif self.gait_type == GaitType.FRONTDIAGONALCRAWL.value: self.phase_offset = [0.5, 1.0, 0.75, 1.25] + elif self.gait_type == GaitType.FULL_STANCE.value: + self.phase_offset = [0.0, 0.5, 0.5, 0.0] else: self.phase_offset = [0.0, 0.5, 0.5, 0.0] diff --git a/simulation/simulation_mujoco.py b/simulation/simulation_mujoco.py index 066e348..4247baa 100644 --- a/simulation/simulation_mujoco.py +++ b/simulation/simulation_mujoco.py @@ -13,11 +13,11 @@ from gym_quadruped.utils.quadruped_utils import LegsAttr # Control imports from quadruped_pympc import config as cfg -from quadruped_pympc.helpers.foothold_reference_generator import FootholdReferenceGenerator -from quadruped_pympc.helpers.periodic_gait_generator import PeriodicGaitGenerator -from quadruped_pympc.helpers.swing_trajectory_controller import SwingTrajectoryController -from quadruped_pympc.helpers.terrain_estimator import TerrainEstimator + from quadruped_pympc.helpers.quadruped_utils import plot_swing_mujoco +from quadruped_pympc.controllers.srb_controller import SingleRigidBodyController as Controller +## +import copy as cp if __name__ == '__main__': np.set_printoptions(precision=3, suppress=True) @@ -28,9 +28,12 @@ robot_feet_geom_names = cfg.robot_feet_geom_names scene_name = cfg.simulation_params['scene'] simulation_dt = cfg.simulation_params['dt'] + feet_traj_geom_ids, feet_GRF_geom_ids = None, LegsAttr(FL=-1, FR=-1, RL=-1, RR=-1) + legs_order = ["FL", "FR", "RL", "RR"] + mpc_frequency = cfg.simulation_params['mpc_frequency'] - state_observables_names = ('base_pos', 'base_lin_vel', 'base_ori_euler_xyz', 'base_ori_quat_wxyz', 'base_ang_vel', - 'qpos_js', 'qvel_js', 'tau_ctrl_setpoint', + state_observables_names = ('base_pos', 'base_lin_vel', 'base_ori_euler_xyz', 'base_ori_quat_wxyz', 'base_ang_vel' + , 'qvel_js', 'tau_ctrl_setpoint', 'feet_pos_base', 'feet_vel_base', 'contact_state', 'contact_forces_base',) # Create the quadruped robot environment. _______________________________________________________________________ @@ -45,139 +48,46 @@ base_vel_command_type="human", # "forward", "random", "forward+rotate", "human" state_obs_names=state_observables_names, # Desired quantities in the 'state' vec ) - # env = QuadrupedEnv(robot=robot_name, - # hip_height=hip_height, - # legs_joint_names=LegsAttr(**robot_leg_joints), # Joint names of the legs DoF - # feet_geom_name=LegsAttr(**robot_feet_geom_names), # Geom/Frame id of feet - # scene=scene_name, - # sim_dt=simulation_dt, - # ref_base_lin_vel=(-4.0 * hip_height, 4.0 * hip_height), # pass a float for a fixed value - # ref_base_ang_vel=(-np.pi * 3 / 4, np.pi * 3 / 4), # pass a float for a fixed value - # ground_friction_coeff=(0.3, 1.5), # pass a float for a fixed value - # base_vel_command_type="random", # "forward", "random", "forward+rotate", "human" - # state_obs_names=state_observables_names, # Desired quantities in the 'state' vec - # ) # Some robots require a change in the zero joint-space configuration. If provided apply it if cfg.qpos0_js is not None: env.mjModel.qpos0 = np.concatenate((env.mjModel.qpos0[:7], cfg.qpos0_js)) env.reset(random=False) env.render() # Pass in the first render call any mujoco.viewer.KeyCallbackType - - # _______________________________________________________________________________________________________________ - - # TODO: CONTROLLER INITIALIZATION CODE THAT SHOULD BE REMOVED FROM HERE. - mpc_frequency = cfg.simulation_params['mpc_frequency'] - mpc_dt = cfg.mpc_params['dt'] - horizon = cfg.mpc_params['horizon'] - - # input_rates optimize the delta_GRF (smoooth!) - # nominal optimize directly the GRF (not smooth) - # sampling use GPU - if cfg.mpc_params['type'] == 'nominal': - from quadruped_pympc.controllers.gradient.nominal.centroidal_nmpc_nominal import Acados_NMPC_Nominal - - controller = Acados_NMPC_Nominal() - - if cfg.mpc_params['optimize_step_freq']: - from quadruped_pympc.controllers.gradient.nominal.centroidal_nmpc_gait_adaptive import \ - Acados_NMPC_GaitAdaptive - - batched_controller = Acados_NMPC_GaitAdaptive() - - elif cfg.mpc_params['type'] == 'input_rates': - from quadruped_pympc.controllers.gradient.input_rates.centroidal_nmpc_input_rates import Acados_NMPC_InputRates - - controller = Acados_NMPC_InputRates() - - if cfg.mpc_params['optimize_step_freq']: - from quadruped_pympc.controllers.gradient.nominal.centroidal_nmpc_gait_adaptive import \ - Acados_NMPC_GaitAdaptive - - batched_controller = Acados_NMPC_GaitAdaptive() - - elif cfg.mpc_params['type'] == 'sampling': - if cfg.mpc_params['optimize_step_freq']: - from quadruped_pympc.controllers.sampling.centroidal_nmpc_jax_gait_adaptive import Sampling_MPC - else: - from quadruped_pympc.controllers.sampling.centroidal_nmpc_jax import Sampling_MPC - - controller = Sampling_MPC(horizon=horizon, - dt=mpc_dt, - num_parallel_computations=cfg.mpc_params['num_parallel_computations'], - sampling_method=cfg.mpc_params['sampling_method'], - control_parametrization=cfg.mpc_params['control_parametrization'], - device="gpu") - - - - - # Periodic gait generator _________________________________________________________________________ - gait_name = cfg.simulation_params['gait'] - gait_params = cfg.simulation_params['gait_params'][gait_name] - gait_type, duty_factor, step_frequency = gait_params['type'], gait_params['duty_factor'], gait_params['step_freq'] - # Given the possibility to use nonuniform discretization, - # we generate a contact sequence two times longer and with a dt half of the one of the mpc - pgg = PeriodicGaitGenerator(duty_factor=duty_factor, - step_freq=step_frequency, - gait_type=gait_type, - horizon=horizon * 2, contact_sequence_dt=mpc_dt / 2.) - - contact_sequence = pgg.compute_contact_sequence() - nominal_sample_freq = pgg.step_freq - - # Create the foothold reference generator - stance_time = (1 / pgg.step_freq) * pgg.duty_factor - frg = FootholdReferenceGenerator(stance_time=stance_time, hip_height=cfg.hip_height, lift_off_positions=env.feet_pos(frame='world')) - - # Create swing trajectory generator - step_height = cfg.simulation_params['step_height'] - swing_period = (1 - pgg.duty_factor) * (1 / pgg.step_freq) - position_gain_fb = cfg.simulation_params['swing_position_gain_fb'] - velocity_gain_fb = cfg.simulation_params['swing_velocity_gain_fb'] - swing_generator = cfg.simulation_params['swing_generator'] - stc = SwingTrajectoryController(step_height=step_height, swing_period=swing_period, - position_gain_fb=position_gain_fb, velocity_gain_fb=velocity_gain_fb, - generator=swing_generator) - - # Terrain estimator - terrain_computation = TerrainEstimator() - - - # Initialization of variables used in the main control loop - # ____________________________________________________________ - # Set the reference for the state - ref_pose = np.array([0, 0, cfg.hip_height]) - ref_base_lin_vel, ref_base_ang_vel = env.target_base_vel() - ref_orientation = np.array([0.0, 0.0, 0.0]) - # # SET REFERENCE AS DICTIONARY - # TODO: I would suggest to create a DataClass for "BaseConfig" used in the PotatoModel controllers. - ref_state = {} - - # Starting contact sequence - previous_contact = np.array([1, 1, 1, 1]) - previous_contact_mpc = np.array([1, 1, 1, 1]) - current_contact = np.array([1, 1, 1, 1]) - - nmpc_GRFs = np.zeros((12,)) - nmpc_wrenches = np.zeros((6,)) - nmpc_footholds = np.zeros((12,)) - # Jacobian matrices - jac_feet_prev = LegsAttr(*[np.zeros((3, env.mjModel.nv)) for _ in range(4)]) - jac_feet_dot = LegsAttr(*[np.zeros((3, env.mjModel.nv)) for _ in range(4)]) + nv = env.mjModel.nv + jac_feet_prev = LegsAttr(*[np.zeros((3, nv)) for _ in range(4)]) + jac_feet_prev_follower = LegsAttr(*[np.zeros((3, nv)) for _ in range(4)]) + jac_feet_dot = LegsAttr(*[np.zeros((3, nv)) for _ in range(4)]) + jac_feet_dot_follower = LegsAttr(*[np.zeros((3, nv)) for _ in range(4)]) # Torque vector - tau = LegsAttr(*[np.zeros((env.mjModel.nv, 1)) for _ in range(4)]) - # State - state_current, state_prev = {}, {} - feet_pos = None - feet_traj_geom_ids, feet_GRF_geom_ids = None, LegsAttr(FL=-1, FR=-1, RL=-1, RR=-1) - legs_order = ["FL", "FR", "RL", "RR"] + tau = LegsAttr(*[np.zeros((nv, 1)) for _ in range(4)]) + lift_off_positions = env.feet_pos(frame='world') + # _______________________________________________________________________________________________________________ + # Controller initialization ______________________________________________________________________________________ + # firest create or select the controller type and the dictionary with the entries needed in this case we use the + # config file + if cfg.mpc_params['type'] == 'nominal' or cfg.mpc_params['type'] == 'input_rates': + mpc_params = cfg.mpc_params.update(cfg.mpc_nominal_params) + if cfg.mpc_params['type'] == 'sampling': + mpc_params = cfg.mpc_params.update(cfg.mpc_sampling_params) + + controller = Controller(hip_height=hip_height, + legs_order=["FL", "FR", "RL", "RR"], + inertia=cfg.inertia, + mass=cfg.mass, + lift_off_positions=lift_off_positions, + simulation_parameters=cfg.simulation_params, + mpc_parameters=cfg.mpc_params, + jac_feet_dot=jac_feet_dot, + jac_feet_prev=jac_feet_prev, + tau=LegsAttr(*[np.zeros((nv, 1)) for _ in range(4)]), + ) # _____________________________________________________________ RENDER_FREQ = 20 # Hz N_EPISODES = 500 - N_STEPS_PER_EPISODE = 2000 if env.base_vel_command_type != "human" else 20000 + N_STEPS_PER_EPISODE = 20000 if env.base_vel_command_type != "human" else 20000 last_render_time = time.time() state_obs_history, ctrl_state_history = [], [] @@ -187,288 +97,105 @@ for _ in range(N_STEPS_PER_EPISODE): step_start = time.time() - # Update the robot state -------------------------------- - feet_pos = env.feet_pos(frame='world') - hip_pos = env.hip_positions(frame='world') + # update reference state + ref_base_lin_vel, ref_base_ang_vel = env.target_base_vel() + # Get current Rigidbody state base_lin_vel = env.base_lin_vel(frame='world') base_ang_vel = env.base_ang_vel(frame='world') - - state_current = dict( - position=env.base_pos, - linear_velocity=base_lin_vel, - orientation=env.base_ori_euler_xyz, - angular_velocity=base_ang_vel, - foot_FL=feet_pos.FL, - foot_FR=feet_pos.FR, - foot_RL=feet_pos.RL, - foot_RR=feet_pos.RR - ) - - # Update the desired contact sequence --------------------------- - pgg.run(simulation_dt, pgg.step_freq) - contact_sequence = pgg.compute_contact_sequence() - - # in the case of nonuniform discretization, we need to subsample the contact sequence - if (cfg.mpc_params['use_nonuniform_discretization']): - dt_fine_grained = cfg.mpc_params['dt_fine_grained'] - horizon_fine_grained = cfg.mpc_params['horizon_fine_grained'] - contact_sequence = pgg.sample_contact_sequence(contact_sequence, mpc_dt, dt_fine_grained, - horizon_fine_grained) - - previous_contact = current_contact - current_contact = np.array([contact_sequence[0][0], - contact_sequence[1][0], - contact_sequence[2][0], - contact_sequence[3][0]]) - - # Compute the reference for the footholds --------------------------------------------------- - frg.update_lift_off_positions(previous_contact, current_contact, feet_pos, legs_order) - ref_feet_pos = frg.compute_footholds_reference( - com_position=env.base_pos, - base_ori_euler_xyz=env.base_ori_euler_xyz, - base_xy_lin_vel=base_lin_vel[0:2], - ref_base_xy_lin_vel=ref_base_lin_vel[0:2], - hips_position=hip_pos, - com_height_nominal=cfg.simulation_params['ref_z']) - - # Estimate the terrain slope and elevation ------------------------------------------------------- - terrain_roll, \ - terrain_pitch, \ - terrain_height = terrain_computation.compute_terrain_estimation( - base_position=env.base_pos, - yaw=env.base_ori_euler_xyz[2], - feet_pos=frg.lift_off_positions, - current_contact=current_contact) - - ref_pos = np.array([0, 0, cfg.hip_height]) - ref_pos[2] = cfg.simulation_params['ref_z'] + terrain_height - - # Update state reference ------------------------------------------------------------------------ - # Update target base velocity - ref_base_lin_vel, ref_base_ang_vel = env.target_base_vel() - - ref_state |= dict(ref_foot_FL=ref_feet_pos.FL.reshape((1, 3)), - ref_foot_FR=ref_feet_pos.FR.reshape((1, 3)), - ref_foot_RL=ref_feet_pos.RL.reshape((1, 3)), - ref_foot_RR=ref_feet_pos.RR.reshape((1, 3)), - # Also update the reference base linear velocity and - ref_linear_velocity=ref_base_lin_vel, - ref_angular_velocity=ref_base_ang_vel, - ref_orientation=np.array([terrain_roll, terrain_pitch, 0.0]), - ref_position=ref_pos - ) - # ------------------------------------------------------------------------------------------------- - - # TODO: this should be hidden inside the controller forward/get_action method - # Solve OCP --------------------------------------------------------------------------------------- - if env.step_num % round(1 / (mpc_frequency * simulation_dt)) == 0: - - time_start = time.time() - - # We can recompute the inertia of the single rigid body model - # or use the fixed one in cfg.py - if(cfg.simulation_params['use_inertia_recomputation']): - inertia = env.get_base_inertia().flatten() # Reflected inertia of base at qpos, in world frame - else: - inertia = cfg.inertia.flatten() - - if ((cfg.mpc_params['optimize_step_freq'])): - # we can always optimize the step freq, or just at the apex of the swing - # to avoid possible jittering in the solution - #optimize_swing = 1 - optimize_swing = stc.check_apex_condition(current_contact) - if(optimize_swing == 1): - nominal_sample_freq = pgg.step_freq - else: - optimize_swing = 0 - - - # If we use sampling - if (cfg.mpc_params['type'] == 'sampling'): - - # Convert data to jax and shift previous solution - state_current_jax, \ - reference_state_jax, = controller.prepare_state_and_reference(state_current, - ref_state, - current_contact, - previous_contact_mpc) - previous_contact_mpc = current_contact - - for iter_sampling in range(cfg.mpc_params['num_sampling_iterations']): - controller = controller.with_newkey() - if (cfg.mpc_params['sampling_method'] == 'cem_mppi'): - if (iter_sampling == 0): - controller = controller.with_newsigma(cfg.mpc_params['sigma_cem_mppi']) - - nmpc_GRFs, \ - nmpc_footholds, \ - controller.best_control_parameters, \ - best_cost, \ - best_sample_freq, \ - costs, \ - sigma_cem_mppi = controller.jitted_compute_control(state_current_jax, reference_state_jax, - contact_sequence, controller.best_control_parameters, - controller.master_key, controller.sigma_cem_mppi) - controller = controller.with_newsigma(sigma_cem_mppi) - else: - nmpc_GRFs, \ - nmpc_footholds, \ - controller.best_control_parameters, \ - best_cost, \ - best_sample_freq, \ - costs = controller.jitted_compute_control(state_current_jax, reference_state_jax, - contact_sequence, controller.best_control_parameters, - controller.master_key, pgg.phase_signal, - nominal_sample_freq, optimize_swing) - - - nmpc_footholds = ref_feet_pos - nmpc_GRFs = np.array(nmpc_GRFs) - - - # If we use Gradient-Based MPC - else: - - nmpc_GRFs, \ - nmpc_footholds, _, \ - status = controller.compute_control(state_current, - ref_state, - contact_sequence, - inertia=inertia) - - - nmpc_footholds = LegsAttr(FL=nmpc_footholds[0], - FR=nmpc_footholds[1], - RL=nmpc_footholds[2], - RR=nmpc_footholds[3]) - - - - if cfg.mpc_params['optimize_step_freq'] and optimize_swing == 1: - contact_sequence_temp = np.zeros((len(cfg.mpc_params['step_freq_available']), 4, horizon * 2)) - for j in range(len(cfg.mpc_params['step_freq_available'])): - pgg_temp = PeriodicGaitGenerator(duty_factor=pgg.duty_factor, - step_freq=cfg.mpc_params['step_freq_available'][j], - gait_type=pgg.gait_type, - horizon=horizon * 2, - contact_sequence_dt=mpc_dt/2.) - pgg_temp.set_phase_signal(pgg.phase_signal) - contact_sequence_temp[j] = pgg_temp.compute_contact_sequence() - - # in the case of nonuniform discretization, we need to subsample the contact sequence - if (cfg.mpc_params['use_nonuniform_discretization']): - dt_fine_grained = cfg.mpc_params['dt_fine_grained'] - horizon_fine_grained = cfg.mpc_params['horizon_fine_grained'] - contact_sequence_temp[j] = pgg.sample_contact_sequence(contact_sequence, mpc_dt, dt_fine_grained, horizon_fine_grained) - - - costs, \ - best_sample_freq = batched_controller.compute_batch_control(state_current, - ref_state, - contact_sequence_temp) - - # If the controller is using RTI, we need to linearize the mpc after its computation - # this helps to minize the delay between new state->control, but only in a real case. - # Here we are in simulation and does not make any difference for now - if (controller.use_RTI): - # preparation phase - controller.acados_ocp_solver.options_set('rti_phase', 1) - status = controller.acados_ocp_solver.solve() - # print("preparation phase time: ", controller.acados_ocp_solver.get_stats('time_tot')) - - - # If we have optimized the gait, we set all the timing parameters - if ((cfg.mpc_params['optimize_step_freq']) and (optimize_swing == 1)): - pgg.step_freq = np.array([best_sample_freq])[0] - nominal_sample_freq = pgg.step_freq - frg.stance_time = (1 / pgg.step_freq) * pgg.duty_factor - swing_period = (1 - pgg.duty_factor) * (1 / pgg.step_freq) - stc.regenerate_swing_trajectory_generator(step_height=step_height, swing_period=swing_period) - - - # TODO: Indexing should not be hardcoded. Env should provide indexing of leg actuator dimensions. - nmpc_GRFs = LegsAttr(FL=nmpc_GRFs[0:3] * current_contact[0], - FR=nmpc_GRFs[3:6] * current_contact[1], - RL=nmpc_GRFs[6:9] * current_contact[2], - RR=nmpc_GRFs[9:12] * current_contact[3]) - - # ------------------------------------------------------------------------------------------------- - - # Compute Stance Torque --------------------------------------------------------------------------- + base_ori_euler_xyz_leader = env.base_ori_euler_xyz + base_pos_leader = env.base_pos + feet_pos = env.feet_pos(frame='world') + hip_pos = env.hip_positions(frame='world') + qpos, qvel = env.mjData.qpos, env.mjData.qvel feet_jac = env.feet_jacobians(frame='world', return_rot_jac=False) - # Compute feet velocities feet_vel = LegsAttr(**{leg_name: feet_jac[leg_name] @ env.mjData.qvel for leg_name in legs_order}) - # Compute jacobian derivatives of the contact points - jac_feet_dot = (feet_jac - jac_feet_prev) / simulation_dt # Finite difference approximation - jac_feet_prev = feet_jac # Update previous Jacobians - # Compute the torque with the contact jacobian (-J.T @ f) J: R^nv -> R^3, f: R^3 - tau.FL = -np.matmul(feet_jac.FL[:, env.legs_qvel_idx.FL].T, nmpc_GRFs.FL) - tau.FR = -np.matmul(feet_jac.FR[:, env.legs_qvel_idx.FR].T, nmpc_GRFs.FR) - tau.RL = -np.matmul(feet_jac.RL[:, env.legs_qvel_idx.RL].T, nmpc_GRFs.RL) - tau.RR = -np.matmul(feet_jac.RR[:, env.legs_qvel_idx.RR].T, nmpc_GRFs.RR) - # --------------------------------------------------------------------------------------------------- - - # Compute Swing Torque ------------------------------------------------------------------------------ - # TODO: Move contact sequence to labels FL, FR, RL, RR instead of a fixed indexing. - # The swing controller is in the end-effector space. For its computation, - # we save for simplicity joints position and velocities - qpos, qvel = env.mjData.qpos, env.mjData.qvel - # centrifugal, coriolis, gravity - legs_mass_matrix = env.legs_mass_matrix legs_qfrc_bias = env.legs_qfrc_bias + legs_mass_matrix = env.legs_mass_matrix + leg_qvel_idx = env.legs_qvel_idx + + ###### Controller single aliengo model + ref_state, state_current, ref_feet_pos, contact_sequence = controller.update_mpc_reference_and_state( + nv=nv, + feet_pos=feet_pos, # two robots edit, + hip_pos=hip_pos, # two robots edit, + base_pos=env.base_pos, + base_ori_euler_xyz=env.base_ori_euler_xyz, + base_lin_vel=base_lin_vel, + base_ang_vel=base_ang_vel, + ref_base_lin_vel=ref_base_lin_vel, + ref_base_ang_vel=ref_base_ang_vel) + if env.step_num % round(1 / (mpc_frequency * simulation_dt)) == 0: + nmpc_footholds_mpc, nmpc_GRFs, current_contact = controller.solve_MPC_main_loop( + inertia_env=env.get_base_inertia().flatten(), + ref_feet_pos=ref_feet_pos, + spring_gain=None, + eef_position=None, + eef_jacobian=None, + ext_wrenches=None) + + nmpc_footholds = LegsAttr(FL=nmpc_footholds_mpc[0], + FR=nmpc_footholds_mpc[1], + RL=nmpc_footholds_mpc[2], + RR=nmpc_footholds_mpc[3], ) + + nmpc_GRFs_computed = LegsAttr(FL=nmpc_GRFs[0:3] * current_contact[0], + FR=nmpc_GRFs[3:6] * current_contact[1], + RL=nmpc_GRFs[6:9] * current_contact[2], + RR=nmpc_GRFs[9:12] * current_contact[3]) + + tau, lift_off_position_ctrl = controller.compute_stance_and_swing_torque(simulation=True, + feet_jac=feet_jac, + feet_vel=feet_vel, + legs_qvel_idx=leg_qvel_idx, + qpos=qpos, + qvel=qvel, + legs_qfrc_bias=legs_qfrc_bias, + legs_mass_matrix=legs_mass_matrix, + nmpc_GRFs=nmpc_GRFs_computed, + nmpc_footholds=nmpc_footholds, + tau=tau, + current_contact=current_contact, ) + + time_start = time.time() - stc.update_swing_time(current_contact, legs_order, simulation_dt) - - for leg_id, leg_name in enumerate(legs_order): - if current_contact[leg_id] == 0: # If in swing phase, compute the swing trajectory tracking control. - tau[leg_name], _, _ = stc.compute_swing_control( - leg_id=leg_id, - q_dot=qvel[env.legs_qvel_idx[leg_name]], - J=feet_jac[leg_name][:, env.legs_qvel_idx[leg_name]], - J_dot=jac_feet_dot[leg_name][:, env.legs_qvel_idx[leg_name]], - lift_off=frg.lift_off_positions[leg_name], - touch_down=nmpc_footholds[leg_name], - foot_pos=feet_pos[leg_name], - foot_vel=feet_vel[leg_name], - h=legs_qfrc_bias[leg_name], - mass_matrix=legs_mass_matrix[leg_name] - ) # Set control and mujoco step ---------------------------------------------------------------------- + action = np.zeros(env.mjModel.nu) action[env.legs_tau_idx.FL] = tau.FL action[env.legs_tau_idx.FR] = tau.FR action[env.legs_tau_idx.RL] = tau.RL action[env.legs_tau_idx.RR] = tau.RR - action_noise = np.random.normal(0, 2, size=env.mjModel.nu) - action += action_noise - state, reward, is_terminated, is_truncated, info = env.step(action=action) # Store the history of observations and control ___________________________________________________________ ep_state_obs_history.append(state) base_lin_vel_err = ref_base_lin_vel - base_lin_vel base_ang_vel_err = ref_base_ang_vel - base_ang_vel - base_poz_z_err = ref_pos[2] - env.base_pos[2] - ctrl_state = np.concatenate((base_lin_vel_err, base_ang_vel_err, [base_poz_z_err], pgg._phase_signal)) + base_poz_z_err = controller.ref_pos[2] - env.base_pos[2] + ctrl_state = np.concatenate((base_lin_vel_err, + base_ang_vel_err, + [base_poz_z_err], + controller.pgg.phase_signal)) ep_ctrl_state_history.append(ctrl_state) # ___________________________________________________________________________________________________________ + # # Render only at a certain frequency - # Render only at a certain frequency if time.time() - last_render_time > 1.0 / RENDER_FREQ or env.step_num == 1: _, _, feet_GRF = env.feet_contact_state(ground_reaction_forces=True) feet_traj_geom_ids = plot_swing_mujoco(viewer=env.viewer, - swing_traj_controller=stc, - swing_period=stc.swing_period, - swing_time=LegsAttr(FL=stc.swing_time[0], - FR=stc.swing_time[1], - RL=stc.swing_time[2], - RR=stc.swing_time[3]), - lift_off_positions=frg.lift_off_positions, + swing_traj_controller=controller.stc, + swing_period=controller.stc.swing_period, + swing_time=LegsAttr(FL=controller.stc.swing_time[0], + FR=controller.stc.swing_time[1], + RL=controller.stc.swing_time[2], + RR=controller.stc.swing_time[3], + ), + lift_off_positions=lift_off_positions, nmpc_footholds=nmpc_footholds, ref_feet_pos=ref_feet_pos, geom_ids=feet_traj_geom_ids) - for leg_id, leg_name in enumerate(legs_order): + for leg_id, leg_name in enumerate(controller.legs_order): feet_GRF_geom_ids[leg_name] = render_vector(env.viewer, vector=feet_GRF[leg_name], pos=feet_pos[leg_name], @@ -476,9 +203,8 @@ color=np.array([0, 1, 0, .5]), geom_id=feet_GRF_geom_ids[leg_name]) - env.render() - last_render_time = time.time() - + env.render() + last_render_time = time.time() if env.step_num > N_STEPS_PER_EPISODE or is_terminated or is_truncated: if is_terminated: print("Environment terminated") @@ -486,12 +212,9 @@ state_obs_history.append(ep_state_obs_history) ctrl_state_history.append(ep_ctrl_state_history) env.reset(random=False) - pgg.reset() - frg.lift_off_positions = env.feet_pos(frame='world') + controller.reset() current_contact = np.array([0, 0, 0, 0]) previous_contact = np.asarray(current_contact) z_foot_mean = 0.0 env.close() - -