-
Notifications
You must be signed in to change notification settings - Fork 1
/
stepper.py
61 lines (52 loc) · 2.62 KB
/
stepper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from omni.isaac.core.simulation_context import SimulationContext
from omni.isaac.core.articulations import Articulation
from omni.isaac.core.articulations import ArticulationView
import numpy as np
from trainer import Trainer
from sars_buffer import SizeLimmitedSarsPushBuffer
from agent_over_manager import AgentOverManager
class Stepper:
def __init__ (self, world: SimulationContext, cartpoles: ArticulationView, render: bool):
self.world = world
self.cartpoles = cartpoles
self.render = render
def step (self, a: np.ndarray):
self.cartpoles.set_joint_efforts(efforts=np.reshape(a, (-1, 1)) * 100, joint_indices=np.repeat(0, a.shape[0]))
self.world.step(render=self.render)
class StepForTrain (Stepper):
def __init__ (self, cartpoles: ArticulationView, stepper: Stepper, trainer: Trainer, agent_over_manager: AgentOverManager, sars_buffer: SizeLimmitedSarsPushBuffer, agent_num: int):
self.cartpoles = cartpoles
self.stepper = stepper
self.trainer = trainer
self.sars_buffer = sars_buffer
self.agent_num = agent_num
self.agent_over_manager = agent_over_manager
def step (self, a: np.ndarray[np.ndarray[float]]):
prev_joint_positions = self.cartpoles.get_joint_positions(joint_indices=[0, 1])
prev_joint_velocities = self.cartpoles.get_joint_velocities(joint_indices=[0, 1])
self.stepper.step(a)
cur_joint_positions = self.cartpoles.get_joint_positions(joint_indices=[0, 1])
cur_joint_velocities = self.cartpoles.get_joint_velocities(joint_indices=[0, 1])
prev_over_table = np.array(self.agent_over_manager.over_table)
prev_live_table = np.logical_not(prev_over_table)
self.agent_over_manager.update(cur_joint_positions)
cur_over_table = np.array(self.agent_over_manager.over_table)
reward = np.zeros((self.agent_num, 1), dtype=np.float32)
reward += 0.1
reward -= np.abs(cur_joint_positions[:, [0]]) * 0.01
reward[np.expand_dims(cur_over_table, axis=1)] = -1.0
new_sars = np.concatenate([
prev_joint_positions[prev_live_table][:, [0]],
prev_joint_velocities[prev_live_table][:, [0]],
prev_joint_positions[prev_live_table][:, [1]],
prev_joint_velocities[prev_live_table][:, [1]],
a[prev_live_table][:, [0]],
reward[prev_live_table][:, [0]],
cur_joint_positions[prev_live_table][:, [0]],
cur_joint_velocities[prev_live_table][:, [0]],
cur_joint_positions[prev_live_table][:, [1]],
cur_joint_velocities[prev_live_table][:, [1]],
np.expand_dims(np.array(cur_over_table, dtype=np.float32), axis=1)[prev_live_table][:, [0]]
], axis=1)
self.sars_buffer.push(new_sars)
self.trainer.train()