Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Controller and MPC classes to ensure modularity and avoid global config files. #12

Closed
wants to merge 9 commits into from
43 changes: 23 additions & 20 deletions quadruped_pympc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# These are used both for a real experiment and a simulation -----------
# These are the only attributes needed per quadruped, the rest can be computed automatically ----------------------
robot = 'aliengo' # 'go2', 'aliengo', 'hyqreal', 'mini_cheetah' # TODO: Load from robot_descriptions.py
robot = 'aliengo' # 'go2', 'aliengo', 'hyqreal', 'mini_cheetah' aliengo_arm # TODO: Load from robot_descriptions.py
robot_leg_joints = dict(FL=['FL_hip_joint', 'FL_thigh_joint', 'FL_calf_joint',], # TODO: Make configs per robot.
FR=['FR_hip_joint', 'FR_thigh_joint', 'FR_calf_joint',],
RL=['RL_hip_joint', 'RL_thigh_joint', 'RL_calf_joint',],
Expand Down Expand Up @@ -52,30 +52,41 @@
hip_height = 0.225
qpos0_js = np.concatenate((np.array([0, -np.pi/2, 0] * 2), np.array([0, np.pi/2, 0] * 2)))


mpc_params = {
# 'nominal' optimized directly the GRF
# 'input_rates' optimizes the delta GRF
# 'sampling' is a gpu-based mpc that samples the GRF
# 'collaborative' optimized directly the GRF and has a passive arm model inside
'type': 'nominal',

'type': 'sampling', # 'nominal', 'input_rates', 'sampling', 'collaborative'
# horizon is the number of timesteps in the future that the mpc will optimize
# dt is the discretization time used in the mpc
'horizon': 12,
'dt': 0.02,
'mu': 0.5,

# GRF limits for each single leg
"grf_max": mass * 9.81,
"grf_min": 0,
'mu': 0.5,

# if this is true, we optimize the step frequency as well
# for the sampling controller, this is done in the rollout
# for the gradient-based controller, this is done with a batched version of the ocp
'optimize_step_freq': True,
'step_freq_available': [1.4, 2.0, 2.4],
'use_nonuniform_discretization': False,

# ----- START properties only for the gradient-based mpc -----
}
mpc_nominal_params = {

# ----- START properties only for the gradient-based mpc -----
"grf_max": mass * 9.81,
"grf_min": 0,
# this is used if you want to manually warm start the mpc
'use_warm_start': False,

# this enables integrators for height, linear velocities, roll and pitch
'use_integrators': False,
'use_integrators': True,
'alpha_integrator': 0.1,
'integrator_cap': [0.5, 0.2, 0.2, 0.0, 0.0, 1.0],

Expand Down Expand Up @@ -133,10 +144,10 @@
# compensate for the external wrench in the prediction (only collaborative)
'passive_arm_compensation': True,

# ----- END properties for the gradient-based mpc -----

# ----- END properties for the gradient-based mpc -----
}
mpc_sampling_params = {
# ----- START properties only for the sampling-based mpc -----

# this is used only in the case 'sampling'.
'sampling_method': 'random_sampling', # 'random_sampling', 'mppi', 'cem_mppi'
'control_parametrization': 'cubic_spline',
Expand All @@ -148,19 +159,11 @@
'sigma_mppi': 3,
'sigma_random_sampling': [0.2, 3, 10],
'shift_solution': False,

# ----- END properties for the sampling-based mpc -----

# if this is true, we optimize the step frequency as well
# for the sampling controller, this is done in the rollout
# for the gradient-based controller, this is done with a batched version of the ocp
'optimize_step_freq': False,
'step_freq_available': [1.4, 2.0, 2.4]

}
}
# -----------------------------------------------------------------------

simulation_params = {

'swing_generator': 'scipy', # 'scipy', 'explicit', 'ndcurves'
'swing_position_gain_fb': 5000,
'swing_velocity_gain_fb': 100,
Expand All @@ -186,7 +189,7 @@

# the MPC will be called every 1/(mpc_frequency*dt) timesteps
# this helps to evaluate more realistically the performance of the controller
'mpc_frequency': 200,
'mpc_frequency': 100,

'use_inertia_recomputation': True,

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,33 @@
# Description: This file contains the class for the NMPC controller

import pathlib

# Authors: Giulio Turrisi -

from acados_template import AcadosOcp, AcadosOcpSolver
from centroidal_model_collaborative import Centroidal_Model_Collaborative
from .centroidal_model_collaborative import Centroidal_Model_Collaborative
import numpy as np
import scipy.linalg
import casadi as cs
import copy

import time

import os
dir_path = os.path.dirname(os.path.realpath(__file__))

import sys
sys.path.append(dir_path)
sys.path.append(dir_path + '/../../')

from quadruped_pympc import config


# Class for the Acados NMPC, the model is in another file!
class Acados_NMPC_Collaborative:
def __init__(self):
def __init__(self,mpc_parameters=None,robot_name=None):

self.horizon = config.mpc_params['horizon'] # Define the number of discretization steps
self.dt = config.mpc_params['dt']
self.robot_name = robot_name
self.horizon = self.mpc_parameters['horizon'] # Define the number of discretization steps
self.dt = self.mpc_parameters['dt']
self.T_horizon = self.horizon*self.dt
self.use_RTI = config.mpc_params['use_RTI']
self.use_integrators = config.mpc_params['use_integrators']
self.use_warm_start = config.mpc_params['use_warm_start']
self.use_foothold_constraints = config.mpc_params['use_foothold_constraints']
self.use_RTI = self.mpc_parameters['use_RTI']
self.use_integrators = self.mpc_parameters['use_integrators']
self.use_warm_start = self.mpc_parameters['use_warm_start']
self.use_foothold_constraints = self.mpc_parameters['use_foothold_constraints']


self.use_static_stability = config.mpc_params['use_static_stability']
self.use_zmp_stability = config.mpc_params['use_zmp_stability']
self.use_static_stability = self.mpc_parameters['use_static_stability']
self.use_zmp_stability = self.mpc_parameters['use_zmp_stability']
self.use_stability_constraints = self.use_static_stability or self.use_zmp_stability


Expand Down Expand Up @@ -64,9 +56,20 @@ def __init__(self):
# Create the acados ocp solver
self.ocp = self.create_ocp_solver_description(acados_model)

self.ocp.code_export_directory = dir_path + '/c_generated_code'
#if (not os.path.isdir(dir_path + "/c_generated_code") or os.listdir(dir_path + "/c_generated_code") == []):
self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + ".json")
# self.ocp.code_export_directory = dir_path + '/c_generated_code'
# #if (not os.path.isdir(dir_path + "/c_generated_code") or os.listdir(dir_path + "/c_generated_code") == []):
# self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + ".json")

code_export_dir = pathlib.Path(__file__).parent / 'c_generated_code'
self.ocp.code_export_directory = str(code_export_dir)
# # self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json")

if ((not code_export_dir) or code_export_dir == []):
self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json")

else :
self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file="centroidal_nmpc" + ".json", build = False, generate = True)


#else :
# self.acados_ocp_solver = AcadosOcpSolver(self.ocp, json_file=self.ocp.code_export_directory + "/centroidal_nmpc" + ".json", build = False, generate = True)
Expand Down Expand Up @@ -221,32 +224,32 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp:
ocp.solver_options.nlp_solver_max_iter = 1
# Set the RTI type for the advanced RTI method
# (see https://arxiv.org/pdf/2403.07101.pdf)
if(config.mpc_params['as_rti_type'] == "AS-RTI-A"):
if(self.mpc_parameters['as_rti_type'] == "AS-RTI-A"):
ocp.solver_options.as_rti_iter = 1
ocp.solver_options.as_rti_level = 0
elif(config.mpc_params['as_rti_type'] == "AS-RTI-B"):
elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-B"):
ocp.solver_options.as_rti_iter = 1
ocp.solver_options.as_rti_level = 1
elif(config.mpc_params['as_rti_type'] == "AS-RTI-C"):
elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-C"):
ocp.solver_options.as_rti_iter = 1
ocp.solver_options.as_rti_level = 2
elif(config.mpc_params['as_rti_type'] == "AS-RTI-D"):
elif(self.mpc_parameters['as_rti_type'] == "AS-RTI-D"):
ocp.solver_options.as_rti_iter = 1
ocp.solver_options.as_rti_level = 3
else:
ocp.solver_options.nlp_solver_type = "SQP"
ocp.solver_options.nlp_solver_max_iter = config.mpc_params['num_qp_iterations']
ocp.solver_options.nlp_solver_max_iter = self.mpc_parameters['num_qp_iterations']
#ocp.solver_options.globalization = "MERIT_BACKTRACKING" # FIXED_STEP, MERIT_BACKTRACKING


if(config.mpc_params['solver_mode'] == "balance"):
if(self.mpc_parameters['solver_mode'] == "balance"):
ocp.solver_options.hpipm_mode = "BALANCE"
elif(config.mpc_params['solver_mode'] == "robust"):
elif(self.mpc_parameters['solver_mode'] == "robust"):
ocp.solver_options.hpipm_mode = "ROBUST"
elif(config.mpc_params['solver_mode'] == "fast"):
elif(self.mpc_parameters['solver_mode'] == "fast"):
ocp.solver_options.qp_solver_iter_max = 10
ocp.solver_options.hpipm_mode = "SPEED"
elif(config.mpc_params['solver_mode'] == "crazy_speed"):
elif(self.mpc_parameters['solver_mode'] == "crazy_speed"):
ocp.solver_options.qp_solver_iter_max = 5
ocp.solver_options.hpipm_mode = "SPEED_ABS"

Expand All @@ -261,9 +264,9 @@ def create_ocp_solver_description(self, acados_model) -> AcadosOcp:


# Nonuniform discretization
if(config.mpc_params['use_nonuniform_discretization']):
time_steps_fine_grained = np.tile(config.mpc_params['dt_fine_grained'], config.mpc_params['horizon_fine_grained'])
time_steps = np.concatenate((time_steps_fine_grained, np.tile(self.dt, self.horizon - config.mpc_params['horizon_fine_grained'])))
if(self.mpc_parameters['use_nonuniform_discretization']):
time_steps_fine_grained = np.tile(self.mpc_parameters['dt_fine_grained'], self.mpc_parameters['horizon_fine_grained'])
time_steps = np.concatenate((time_steps_fine_grained, np.tile(self.dt, self.horizon - self.mpc_parameters['horizon_fine_grained'])))
shooting_nodes = np.zeros((self.horizon+1,))
for i in range(len(time_steps)):
shooting_nodes[i+1] = shooting_nodes[i] + time_steps[i]
Expand Down Expand Up @@ -496,8 +499,8 @@ def create_friction_cone_constraints(self,) ->None:
t = np.array([1, 0, 0])
b = np.array([0, 1, 0])
mu = self.centroidal_model.mu_friction
f_max = config.mpc_params['grf_max']
f_min = config.mpc_params['grf_min']
f_max = self.mpc_parameters['grf_max']
f_min = self.mpc_parameters['grf_min']

# Derivation can be found in the paper
# "High-slope terrain locomotion for torque-controlled quadruped robots",
Expand Down Expand Up @@ -575,7 +578,7 @@ def set_weight(self, nx, nu):
Q_passive_arm_force = np.array([0, 0, 0, 0, 0, 0]) #end_effector_position_z

R_foot_vel = np.array([0.0001, 0.0001, 0.00001]) #v_x, v_y, v_z (should be 4 times this, once per foot)
if(config.robot == "hyqreal"):
if(self.robot_name == "hyqreal"):
R_foot_force = np.array([0.00001, 0.00001, 0.00001])#f_x, f_y, f_z (should be 4 times this, once per foot)
else:
R_foot_force = np.array([0.001, 0.001, 0.001])
Expand Down Expand Up @@ -952,7 +955,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h
elif(np.array_equal(FL_contact_sequence, RR_contact_sequence)
and np.array_equal(FR_contact_sequence, RL_contact_sequence)):
#TROT
stability_margin = config.mpc_params['trot_stability_margin']
stability_margin = self.mpc_parameters['trot_stability_margin']
if(FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0):
ub_support_FL_RR = 0 + stability_margin
lb_support_FL_RR = 0 - stability_margin
Expand All @@ -964,7 +967,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h
elif(np.array_equal(FL_contact_sequence, RL_contact_sequence)
and np.array_equal(FR_contact_sequence, RR_contact_sequence)):
#PACE
stability_margin = config.mpc_params['pace_stability_margin']
stability_margin = self.mpc_parameters['pace_stability_margin']
if(FL_contact_sequence[j] == 1 and FR_contact_sequence[j] == 0):
ub_support_RL_FL = 0 + stability_margin
lb_support_RL_FL = 0 - stability_margin
Expand All @@ -976,7 +979,7 @@ def set_stage_constraint(self, constraint, state, reference, contact_sequence, h

else:
#CRAWL BACKDIAGONALCRAWL ONLY
stability_margin = config.mpc_params['crawl_stability_margin']
stability_margin = self.mpc_parameters['crawl_stability_margin']

if(FL_contact_sequence[j] == 1):
if(FR_contact_sequence[j] == 1):
Expand Down Expand Up @@ -1312,7 +1315,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,

# Fill stance param, friction and stance proximity
# (stance proximity will disable foothold optimization near a stance!!)
mu = config.mpc_params['mu']
mu = self.mpc_parameters['mu']
yaw = state["orientation"][2]

# Stance Proximity ugly routine. Basically we disable foothold optimization
Expand Down Expand Up @@ -1362,8 +1365,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,
for j in range(self.horizon):
# If we have estimated an external wrench, we can compensate it for all steps
# or less (maybe the disturbance is not costant along the horizon!)
if(config.mpc_params['external_wrenches_compensation'] and
j < config.mpc_params['external_wrenches_compensation_num_step']):
if(self.mpc_parameters['external_wrenches_compensation'] and
j < self.mpc_parameters['external_wrenches_compensation_num_step']):
external_wrenches_estimated_param = copy.deepcopy(external_wrenches)
external_wrenches_estimated_param = external_wrenches_estimated_param.reshape((6, ))
else:
Expand Down Expand Up @@ -1427,7 +1430,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,

if(self.use_integrators):
# Compute error for integral action
alpha_integrator = config.mpc_params["alpha_integrator"]
alpha_integrator = self.mpc_parameters["alpha_integrator"]
self.integral_errors[0] += (state["position"][2] - reference["ref_position"][2])*alpha_integrator
self.integral_errors[1] += (state["linear_velocity"][0] - reference["ref_linear_velocity"][0])*alpha_integrator
self.integral_errors[2] += (state["linear_velocity"][1] - reference["ref_linear_velocity"][1])*alpha_integrator
Expand All @@ -1436,12 +1439,12 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,
self.integral_errors[5] += (state["orientation"][1] - reference["ref_orientation"][1])*alpha_integrator


cap_integrator_z = config.mpc_params["integrator_cap"][0]
cap_integrator_x_dot = config.mpc_params["integrator_cap"][1]
cap_integrator_y_dot = config.mpc_params["integrator_cap"][2]
cap_integrator_z_dot = config.mpc_params["integrator_cap"][3]
cap_integrator_roll = config.mpc_params["integrator_cap"][4]
cap_integrator_pitch = config.mpc_params["integrator_cap"][5]
cap_integrator_z = self.mpc_parameters["integrator_cap"][0]
cap_integrator_x_dot = self.mpc_parameters["integrator_cap"][1]
cap_integrator_y_dot = self.mpc_parameters["integrator_cap"][2]
cap_integrator_z_dot = self.mpc_parameters["integrator_cap"][3]
cap_integrator_roll = self.mpc_parameters["integrator_cap"][4]
cap_integrator_pitch = self.mpc_parameters["integrator_cap"][5]

self.integral_errors[0] = np.where(np.abs(self.integral_errors[0]) > cap_integrator_z, cap_integrator_z*np.sign(self.integral_errors[0]), self.integral_errors[0])
self.integral_errors[1] = np.where(np.abs(self.integral_errors[1]) > cap_integrator_x_dot, cap_integrator_x_dot*np.sign(self.integral_errors[1]), self.integral_errors[1])
Expand Down Expand Up @@ -1470,6 +1473,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,


# Set stage constraint
h_R_w = np.array([np.cos(yaw), np.sin(yaw),
-np.sin(yaw), np.cos(yaw)])
if(self.use_foothold_constraints or self.use_stability_constraints):

h_R_w = np.array([np.cos(yaw), np.sin(yaw),
Expand Down Expand Up @@ -1498,7 +1503,7 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,
optimal_footholds_assigned = np.zeros((4, ), dtype='bool')

# Saturation of the GRF
optimal_GRF = np.clip(optimal_GRF, 0.0, config.mpc_params['grf_max'])
optimal_GRF = np.clip(optimal_GRF, 0.0, self.mpc_parameters['grf_max'])


# We need to provide the next touchdown foothold position.
Expand Down Expand Up @@ -1625,8 +1630,8 @@ def compute_control(self, state, reference, contact_sequence, constraint = None,
optimal_foothold[3] = reference["ref_foot_RR"][0]


if(config.mpc_params['dt'] <= 0.02 or (
config.mpc_params['use_nonuniform_discretization'] and config.mpc_params['dt_fine_grained'] <= 0.02)):
if(self.mpc_parameters['dt'] <= 0.02 or (
self.mpc_parameters['use_nonuniform_discretization'] and self.mpc_parameters['dt_fine_grained'] <= 0.02)):
optimal_next_state_index = 2
else:
optimal_next_state_index = 1
Expand Down
Loading