-
Notifications
You must be signed in to change notification settings - Fork 168
/
03_eval_finetuned.py
114 lines (91 loc) · 4.08 KB
/
03_eval_finetuned.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
This script demonstrates how to load and rollout a finetuned Octo model.
We use the Octo model finetuned on ALOHA sim data from the examples/02_finetune_new_observation_action.py script.
For installing the ALOHA sim environment, clone: https://github.com/tonyzhaozh/act
Then run:
pip3 install opencv-python modern_robotics pyrealsense2 h5py_cache pyquaternion pyyaml rospkg pexpect mujoco==2.3.3 dm_control==1.0.9 einops packaging h5py
Finally, modify the `sys.path.append` statement below to add the ACT repo to your path.
If you are running this on a head-less server, start a virtual display:
Xvfb :1 -screen 0 1024x768x16 &
export DISPLAY=:1
To run this script, run:
cd examples
python3 03_eval_finetuned.py --finetuned_path=<path_to_finetuned_aloha_checkpoint>
"""
from functools import partial
import sys
from absl import app, flags, logging
import gym
import jax
import numpy as np
import wandb
sys.path.append("path/to/your/act")
# keep this to register ALOHA sim env
from envs.aloha_sim_env import AlohaGymEnv # noqa
from octo.model.octo_model import OctoModel
from octo.utils.gym_wrappers import HistoryWrapper, NormalizeProprio, RHCWrapper
from octo.utils.train_callbacks import supply_rng
FLAGS = flags.FLAGS
flags.DEFINE_string(
"finetuned_path", None, "Path to finetuned Octo checkpoint directory."
)
def main(_):
# setup wandb for logging
wandb.init(name="eval_aloha", project="octo")
# load finetuned model
logging.info("Loading finetuned model...")
model = OctoModel.load_pretrained(FLAGS.finetuned_path)
# make gym environment
##################################################################################################################
# environment needs to implement standard gym interface + return observations of the following form:
# obs = {
# "image_primary": ...
# }
# it should also implement an env.get_task() function that returns a task dict with goal and/or language instruct.
# task = {
# "language_instruction": "some string"
# "goal": {
# "image_primary": ...
# }
# }
##################################################################################################################
env = gym.make("aloha-sim-cube-v0")
# wrap env to normalize proprio
env = NormalizeProprio(env, model.dataset_statistics)
# add wrappers for history and "receding horizon control", i.e. action chunking
env = HistoryWrapper(env, horizon=1)
env = RHCWrapper(env, exec_horizon=50)
# the supply_rng wrapper supplies a new random key to sample_actions every time it's called
policy_fn = supply_rng(
partial(
model.sample_actions,
unnormalization_statistics=model.dataset_statistics["action"],
),
)
# running rollouts
for _ in range(3):
obs, info = env.reset()
# create task specification --> use model utility to create task dict with correct entries
language_instruction = env.get_task()["language_instruction"]
task = model.create_tasks(texts=language_instruction)
# run rollout for 400 steps
images = [obs["image_primary"][0]]
episode_return = 0.0
while len(images) < 400:
# model returns actions of shape [batch, pred_horizon, action_dim] -- remove batch
actions = policy_fn(jax.tree_map(lambda x: x[None], obs), task)
actions = actions[0]
# step env -- info contains full "chunk" of observations for logging
# obs only contains observation for final step of chunk
obs, reward, done, trunc, info = env.step(actions)
images.extend([o["image_primary"][0] for o in info["observations"]])
episode_return += reward
if done or trunc:
break
print(f"Episode return: {episode_return}")
# log rollout video to wandb -- subsample temporally 2x for faster logging
wandb.log(
{"rollout_video": wandb.Video(np.array(images).transpose(0, 3, 1, 2)[::2])}
)
if __name__ == "__main__":
app.run(main)