From 41a1fab2b95d82147e541d9b69d5643a9bd66c49 Mon Sep 17 00:00:00 2001 From: William Yue Date: Mon, 2 Feb 2026 16:09:55 -0800 Subject: [PATCH 01/12] added training time RTC --- .../policies/pi05/configuration_pi05.py | 4 ++ src/opentau/policies/pi05/modeling_pi05.py | 51 ++++++++++++++----- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/opentau/policies/pi05/configuration_pi05.py b/src/opentau/policies/pi05/configuration_pi05.py index 97a4327..e3bb7ed 100644 --- a/src/opentau/policies/pi05/configuration_pi05.py +++ b/src/opentau/policies/pi05/configuration_pi05.py @@ -116,6 +116,10 @@ class PI05Config(PreTrainedConfig): # Decoding num_steps: int = 10 + # Real Time Inference + # maximum number of frozen actions + max_delay: int = 0 + # Initialization strategy init_strategy: Literal["no_init", "full_he_init", "expert_only_he_init"] = "full_he_init" diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 169e7e4..3b6b507 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -29,7 +29,7 @@ import numpy as np import torch import torch.nn.functional as F # noqa: N812 -from einops import rearrange +from einops import rearrange, repeat from torch import Tensor, nn from transformers import AutoProcessor, AutoTokenizer @@ -622,6 +622,7 @@ def forward( lang_tokens, lang_masks, actions, + actions_is_pad, response_tokens, response_masks, noise, @@ -632,17 +633,8 @@ def forward( mse_loss = losses["MSE"] ce_loss = losses["CE"] - if actions_is_pad is not None: - in_episode_bound = ~actions_is_pad - mse_loss = mse_loss * in_episode_bound.unsqueeze(-1) - - # Remove padding - mse_loss = mse_loss[:, :, : self.config.max_action_dim] - # For backward pass - loss = mse_loss.mean() - - return {"MSE": loss, "CE": ce_loss} + return {"MSE": mse_loss, "CE": ce_loss} def prepare_discrete_state(self, batch: dict[str, Tensor]) -> list[str]: """Discretizes the state into bins and converts it to a string representation. @@ -1107,6 +1099,7 @@ def forward( lang_tokens: Tensor, lang_masks: Tensor, actions: Tensor, + actions_is_pad: Tensor | None = None, response_tokens: Tensor | None = None, response_masks: Tensor | None = None, noise: Tensor | None = None, @@ -1124,6 +1117,7 @@ def forward( response_tokens: Response language token tensor. response_masks: Response language mask tensor. actions: Action tensor. + actions_is_pad: Optional action is padded mask tensor. noise: Optional noise tensor. time: Optional time tensor. discrete_actions: Optional discrete action tensor. @@ -1161,13 +1155,21 @@ def forward( ) # Now run action expert + batch_size = actions.shape[0] if noise is None: noise = self.sample_noise(actions.shape, actions.device) if time is None: - time = self.sample_time(actions.shape[0], actions.device) + time = self.sample_time(batch_size, actions.device) + + # handle real time inference delay + delay = torch.randint(0, self.config.max_delay + 1, (batch_size,)) + prefix_mask = rearrange(torch.arange(self.config.chunk_size), "c -> 1 c") < rearrange( + delay, "b -> b 1" + ) + time = torch.where(prefix_mask, 1, rearrange(time, "b -> b 1")) - time_expanded = time[:, None, None] + time_expanded = rearrange(time, "b c -> b c 1") x_t = time_expanded * noise + (1 - time_expanded) * actions u_t = noise - actions @@ -1206,7 +1208,28 @@ def forward( v_t = self.action_out_proj(suffix_out) v_t = v_t.to(dtype=torch.float32) - losses = F.mse_loss(u_t, v_t, reduction="none") + mse_loss = F.mse_loss(u_t, v_t, reduction="none") + + # mask out frozen actions and padded actions + postfix_mask = rearrange( + torch.logical_not(prefix_mask), "b c -> b c 1" + ) # 0 for frozen actions, 1 for non-frozen actions + + if actions_is_pad is not None: + in_episode_bound = ~actions_is_pad + in_episode_bound = rearrange( + in_episode_bound, "b c -> b c 1" + ) # 0 for padded actions, 1 for non-padded actions + postfix_mask = torch.logical_and(postfix_mask, in_episode_bound) + + mse_loss = mse_loss * postfix_mask + + # Remove padding + mse_loss = mse_loss[:, :, : self.config.max_action_dim] + + # Do not include frozen actions and padded actions in the mean loss calculation + postfix_mask_expanded = repeat(postfix_mask, "b c 1 -> b c d", d=mse_loss.shape[-1]) + mse_loss = mse_loss.sum() / (postfix_mask_expanded.sum() + 1e-8) # compute cross entropy loss for discrete actions batch_size, seq_len = discrete_actions.shape From 63162fe051725dca29ab9db8ff8dfe190b5c1432 Mon Sep 17 00:00:00 2001 From: William Yue Date: Tue, 3 Feb 2026 14:26:50 -0800 Subject: [PATCH 02/12] fixed time embedding shapes --- configs/examples/pi05_training_config.json | 4 +-- src/opentau/policies/pi05/modeling_pi05.py | 39 +++++++++++++--------- src/opentau/utils/transformers_patch.py | 3 -- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/configs/examples/pi05_training_config.json b/configs/examples/pi05_training_config.json index 47f3cbd..b120f7b 100644 --- a/configs/examples/pi05_training_config.json +++ b/configs/examples/pi05_training_config.json @@ -36,8 +36,8 @@ "attention_implementation": "eager", "freeze_vision_encoder": true, "train_expert_only": true, - "prompt_max_length": 256, - "discrete_action_max_length": 60, + "prompt_max_length": 10, + "discrete_action_max_length": 10, "optimizer_lr": 2.5e-05, "optimizer_betas": [ 0.9, diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 3b6b507..2f8a53b 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -52,23 +52,23 @@ def create_sinusoidal_pos_embedding( """Computes sine-cosine positional embedding vectors for scalar positions. Args: - time: A 1-D tensor of shape (batch_size,). + time: A 2-D tensor of shape (batch_size, action_chunk_length). dimension: The dimension of the embedding vectors. Must be divisible by 2. min_period: The minimum period of the sinusoidal functions. max_period: The maximum period of the sinusoidal functions. device: The device to create the tensors on. Defaults to "cpu". Returns: - A tensor of shape (batch_size, dimension) containing the positional embeddings. + A tensor of shape (batch_size, action_chunk_length, dimension) containing the positional embeddings. Raises: - ValueError: If dimension is not divisible by 2 or if time tensor is not 1-D. + ValueError: If dimension is not divisible by 2 or if time tensor is not 2-D with shape (batch_size, action_chunk_length). """ if dimension % 2 != 0: raise ValueError(f"dimension ({dimension}) must be divisible by 2") - if time.ndim != 1: - raise ValueError("The time tensor is expected to be of shape `(batch_size, )`.") + if time.ndim != 2: + raise ValueError("The time tensor is expected to be of shape `(batch_size, action_chunk_length)`.") dtype = ( get_safe_dtype(torch.float64, device.type) @@ -80,8 +80,8 @@ def create_sinusoidal_pos_embedding( # Compute the outer product scaling_factor = 1.0 / period * 2 * math.pi - sin_input = scaling_factor[None, :] * time[:, None] - pos_emb = torch.cat([torch.sin(sin_input), torch.cos(sin_input)], dim=1) + sin_input = rearrange(scaling_factor, "d -> 1 1 d") * rearrange(time, "b c -> b c 1") + pos_emb = torch.cat([torch.sin(sin_input), torch.cos(sin_input)], dim=2) return pos_emb @@ -1040,7 +1040,7 @@ def embed_suffix(self, noisy_actions: Tensor, timestep: Tensor) -> tuple[Tensor, Args: noisy_actions: Tensor containing noisy actions. - timestep: Tensor containing timesteps. + timestep: Tensor containing timesteps of shape (batch_size, action_chunk_length). Returns: A tuple containing: @@ -1167,7 +1167,10 @@ def forward( prefix_mask = rearrange(torch.arange(self.config.chunk_size), "c -> 1 c") < rearrange( delay, "b -> b 1" ) - time = torch.where(prefix_mask, 1, rearrange(time, "b -> b 1")) + prefix_mask = prefix_mask.to(device=actions.device) + time = torch.where( + prefix_mask, 0, rearrange(time, "b -> b 1") + ) # using diffusion time 0 instead of flow matching time 1 time_expanded = rearrange(time, "b c -> b c 1") x_t = time_expanded * noise + (1 - time_expanded) * actions @@ -1296,6 +1299,8 @@ def sample_actions( img_masks: list[Tensor], lang_tokens: Tensor, lang_masks: Tensor, + action_prefix: Tensor | None = None, + delay: int = 0, noise: Tensor | None = None, ) -> Tensor: """Do a full inference forward and compute the action. @@ -1306,7 +1311,8 @@ def sample_actions( lang_tokens: Language token tensor. lang_masks: Language mask tensor. noise: Optional noise tensor. - + action_prefix: Optional action prefix tensor. + delay: number of delay actions. Returns: The sampled action tensor. """ @@ -1370,13 +1376,15 @@ def sample_actions( x_t = noise time = torch.tensor(1.0, dtype=torch.float32, device=device) + prefix_mask = rearrange(torch.arange(self.config.chunk_size), "c -> 1 c") < delay while time >= -dt / 2: - expanded_time = time.expand(bsize) + x_t = torch.where(rearrange(prefix_mask, "b c -> b c 1"), action_prefix, x_t) + time_masked = torch.where(prefix_mask, 0, time) v_t = self.denoise_step( prefix_pad_masks, past_key_values, x_t, - expanded_time, + time_masked, ) # Euler step @@ -1389,7 +1397,7 @@ def denoise_step( prefix_pad_masks: Tensor, past_key_values: list[dict[str, Tensor]], x_t: Tensor, - timestep: Tensor, + time: Tensor, ) -> Tensor: """Apply one denoising step of the noise `x_t` at a given timestep. @@ -1397,12 +1405,11 @@ def denoise_step( prefix_pad_masks: Prefix padding masks. past_key_values: Past key values from the VLM. x_t: Current noise tensor. - timestep: Current timestep. - + time: Time tensor of shape (batch_size, action_chunk_length). Returns: The predicted velocity tensor (v_t). """ - suffix_embs, suffix_pad_masks, suffix_att_masks, adarms_cond = self.embed_suffix(x_t, timestep) + suffix_embs, suffix_pad_masks, suffix_att_masks, adarms_cond = self.embed_suffix(x_t, time) num_cross_att_tokens = prefix_pad_masks.shape[1] action_expert_2d_attention_mask = make_att_2d_masks( diff --git a/src/opentau/utils/transformers_patch.py b/src/opentau/utils/transformers_patch.py index 8913ecc..08ccbf4 100644 --- a/src/opentau/utils/transformers_patch.py +++ b/src/opentau/utils/transformers_patch.py @@ -155,9 +155,6 @@ def forward( raise ValueError(f"Expected cond dimension {self.cond_dim}, got {cond.shape[-1]}") modulation = self.dense(cond) - # Reshape modulation to broadcast properly: [batch, 1, features] for [batch, seq, features] - if len(x.shape) == 3: # [batch, seq, features] - modulation = modulation.unsqueeze(1) scale, shift, gate = torch.chunk(modulation, 3, dim=-1) From dc72f28baaa616487a11195b1b68295f32e01049 Mon Sep 17 00:00:00 2001 From: William Yue Date: Tue, 3 Feb 2026 15:33:24 -0800 Subject: [PATCH 03/12] fixed pytests --- src/opentau/policies/pi05/modeling_pi05.py | 23 ++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 2f8a53b..5354676 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -546,13 +546,20 @@ def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) - return self._action_queue.popleft() @torch.no_grad() - def sample_actions(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor: + def sample_actions( + self, + batch: dict[str, Tensor], + noise: Tensor | None = None, + action_prefix: Tensor | None = None, + delay: int = 0, + ) -> Tensor: """Sample actions from the policy given environment observations. Args: batch: Batch of data containing environment observations. noise: Optional noise tensor. - + action_prefix: Optional action prefix tensor. + delay: number of delay actions. Returns: The sampled actions tensor of shape (batch_size, action_dim). """ @@ -566,6 +573,8 @@ def sample_actions(self, batch: dict[str, Tensor], noise: Tensor | None = None) img_masks, lang_tokens, lang_masks, + action_prefix=action_prefix, + delay=delay, noise=noise, ) @@ -1376,15 +1385,17 @@ def sample_actions( x_t = noise time = torch.tensor(1.0, dtype=torch.float32, device=device) - prefix_mask = rearrange(torch.arange(self.config.chunk_size), "c -> 1 c") < delay + prefix_mask = rearrange(torch.arange(self.config.chunk_size, device=device), "c -> 1 c") < delay while time >= -dt / 2: - x_t = torch.where(rearrange(prefix_mask, "b c -> b c 1"), action_prefix, x_t) - time_masked = torch.where(prefix_mask, 0, time) + # if delay is greater than 0, then freeze the action prefix at the beginning of action chunk + if delay > 0: + x_t = torch.where(rearrange(prefix_mask, "b c -> b c 1"), action_prefix, x_t) + masked_time = torch.where(prefix_mask, 0, time) v_t = self.denoise_step( prefix_pad_masks, past_key_values, x_t, - time_masked, + masked_time, ) # Euler step From cd255d3b8c856a8d7c1ecd0816273069dd4f6502 Mon Sep 17 00:00:00 2001 From: William Yue Date: Tue, 3 Feb 2026 15:35:14 -0800 Subject: [PATCH 04/12] revert changes to train config --- configs/examples/pi05_training_config.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/configs/examples/pi05_training_config.json b/configs/examples/pi05_training_config.json index b120f7b..47f3cbd 100644 --- a/configs/examples/pi05_training_config.json +++ b/configs/examples/pi05_training_config.json @@ -36,8 +36,8 @@ "attention_implementation": "eager", "freeze_vision_encoder": true, "train_expert_only": true, - "prompt_max_length": 10, - "discrete_action_max_length": 10, + "prompt_max_length": 256, + "discrete_action_max_length": 60, "optimizer_lr": 2.5e-05, "optimizer_betas": [ 0.9, From 4db734be9c469a8571a7a06db098e02e98e15705 Mon Sep 17 00:00:00 2001 From: William Yue Date: Tue, 3 Feb 2026 15:37:50 -0800 Subject: [PATCH 05/12] added check that delay must be valid --- src/opentau/policies/pi05/modeling_pi05.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 5354676..b29f6f5 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -563,6 +563,8 @@ def sample_actions( Returns: The sampled actions tensor of shape (batch_size, action_dim). """ + assert 0 <= delay <= self.config.max_delay, f"Delay must be between 0 and {self.config.max_delay}" + batch = self.normalize_inputs(batch) images, img_masks = self.prepare_images(batch) From 83267e2b11092f748d765986609e916d7c3d6319 Mon Sep 17 00:00:00 2001 From: William Yue Date: Tue, 3 Feb 2026 15:39:00 -0800 Subject: [PATCH 06/12] added comment for shape --- src/opentau/policies/pi05/modeling_pi05.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index b29f6f5..38e9e1e 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -558,7 +558,7 @@ def sample_actions( Args: batch: Batch of data containing environment observations. noise: Optional noise tensor. - action_prefix: Optional action prefix tensor. + action_prefix: Optional action prefix tensor of shape (batch_size, action_chunk_length, action_dim). delay: number of delay actions. Returns: The sampled actions tensor of shape (batch_size, action_dim). From 77c1d3c891c903bd1bee3ab19206f7c6cd8421d7 Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 4 Feb 2026 12:43:18 -0800 Subject: [PATCH 07/12] added real time inference for libero eval --- src/opentau/policies/pi05/modeling_pi05.py | 49 +++++++++++++++------- src/opentau/scripts/grpc/server.py | 4 +- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 38e9e1e..3466d6c 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -292,6 +292,7 @@ def __init__( def reset(self) -> None: """This should be called whenever the environment is reset.""" self._action_queue = deque([], maxlen=self.config.n_action_steps) + self._executed_actions: deque[Tensor] = deque([], maxlen=self.config.max_delay) @classmethod def from_pretrained( @@ -525,9 +526,13 @@ def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor: def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor: """Select a single action given environment observations. - This method wraps `select_actions` in order to return one action at a time for execution in the - environment. It works by managing the actions in a queue and only calling `select_actions` when the - queue is empty. + This method calls sample_actions every step and returns one action at a time from the new chunk. + The queue is replaced with the new chunk each time. The last config.max_delay executed actions + are passed to sample_actions as action_prefix; at episode start (no previous actions), delay + is 0. + + Note: This method should only be called when running a policy in simulation. For real world inference, + this method should be written in the ROS client node. Args: batch: Batch of data containing environment observations. @@ -538,12 +543,31 @@ def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) - """ self.eval() - # Action queue logic for n_action_steps > 1. When the action_queue is depleted, populate it by - # querying the policy. - if len(self._action_queue) == 0: - actions = self.sample_actions(batch, noise=noise) - self._action_queue.extend(actions) - return self._action_queue.popleft() + action_prefix = None + delay = 0 + if self.config.max_delay > 0 and len(self._executed_actions) > 0: + action_prefix = torch.stack(list(self._executed_actions), dim=1) + delay = action_prefix.shape[1] + action_prefix = self.normalize_actions({"actions": action_prefix})["actions"] + original_action_dim = self.config.action_feature.shape[0] + if original_action_dim < self.config.max_action_dim: + action_prefix = F.pad( + action_prefix, + (0, self.config.max_action_dim - original_action_dim), + ) + if delay < self.config.chunk_size: + action_prefix = F.pad( + action_prefix, + (0, 0, 0, self.config.chunk_size - delay), + ) + actions = self.sample_actions(batch, noise=noise, action_prefix=action_prefix, delay=delay) + actions = rearrange(actions, "b c d -> c b d") + self._action_queue.clear() + self._action_queue.extend(actions[delay:]) + action = self._action_queue.popleft() + if self.config.max_delay > 0: + self._executed_actions.append(action) + return action @torch.no_grad() def sample_actions( @@ -559,9 +583,9 @@ def sample_actions( batch: Batch of data containing environment observations. noise: Optional noise tensor. action_prefix: Optional action prefix tensor of shape (batch_size, action_chunk_length, action_dim). - delay: number of delay actions. + delay: number of frozen delay actions from action_prefix. Returns: - The sampled actions tensor of shape (batch_size, action_dim). + The sampled actions tensor of shape (batch_size, action_chunk_length, action_dim). """ assert 0 <= delay <= self.config.max_delay, f"Delay must be between 0 and {self.config.max_delay}" @@ -586,9 +610,6 @@ def sample_actions( actions = self.unnormalize_outputs({"actions": actions})["actions"] - # `self.model.forward` returns a (batch_size, n_action_steps, action_dim) tensor, but the queue - # effectively has shape (n_action_steps, batch_size, *), hence the transpose. - actions = actions.transpose(0, 1) return actions def forward( diff --git a/src/opentau/scripts/grpc/server.py b/src/opentau/scripts/grpc/server.py index c1e4832..3637254 100644 --- a/src/opentau/scripts/grpc/server.py +++ b/src/opentau/scripts/grpc/server.py @@ -189,9 +189,9 @@ def GetActionChunk( # Run inference with torch.inference_mode(): action_chunk = self.policy.sample_actions(batch) - # action_chunk shape: (n_action_steps, batch_size=1, action_dim) + # action_chunk shape: (batch_size=1, n_action_steps, action_dim) # Remove batch dimension and convert to numpy - action_chunk = action_chunk.squeeze(1).to("cpu", torch.float32).numpy() + action_chunk = action_chunk.squeeze(0).to("cpu", torch.float32).numpy() # Populate 2D action chunk structure for action_vector in action_chunk: From 5acf09d8e301afd644ebebcddc360d009fb95290 Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 4 Feb 2026 16:46:34 -0800 Subject: [PATCH 08/12] added fix to ensure the frozen actions stay frozen --- configs/examples/pi05_training_config.json | 4 ++-- src/opentau/policies/pi05/modeling_pi05.py | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/configs/examples/pi05_training_config.json b/configs/examples/pi05_training_config.json index 47f3cbd..b120f7b 100644 --- a/configs/examples/pi05_training_config.json +++ b/configs/examples/pi05_training_config.json @@ -36,8 +36,8 @@ "attention_implementation": "eager", "freeze_vision_encoder": true, "train_expert_only": true, - "prompt_max_length": 256, - "discrete_action_max_length": 60, + "prompt_max_length": 10, + "discrete_action_max_length": 10, "optimizer_lr": 2.5e-05, "optimizer_betas": [ 0.9, diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 3466d6c..2715a34 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -1424,6 +1424,10 @@ def sample_actions( # Euler step x_t += dt * v_t time += dt + + # we need to ensure the frozen actions are not modified before returning the denoised actions + if delay > 0: + x_t = torch.where(rearrange(prefix_mask, "b c -> b c 1"), action_prefix, x_t) return x_t def denoise_step( From 6959fed8882183d4f7806ce5835dd5fca3f8017d Mon Sep 17 00:00:00 2001 From: William Yue Date: Wed, 4 Feb 2026 16:47:52 -0800 Subject: [PATCH 09/12] undo --- configs/examples/pi05_training_config.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/configs/examples/pi05_training_config.json b/configs/examples/pi05_training_config.json index b120f7b..47f3cbd 100644 --- a/configs/examples/pi05_training_config.json +++ b/configs/examples/pi05_training_config.json @@ -36,8 +36,8 @@ "attention_implementation": "eager", "freeze_vision_encoder": true, "train_expert_only": true, - "prompt_max_length": 10, - "discrete_action_max_length": 10, + "prompt_max_length": 256, + "discrete_action_max_length": 60, "optimizer_lr": 2.5e-05, "optimizer_betas": [ 0.9, From dde19402007d2e05917b835c30eb61ca4f47bf86 Mon Sep 17 00:00:00 2001 From: William Yue Date: Thu, 5 Feb 2026 12:03:17 -0800 Subject: [PATCH 10/12] fixed select action --- src/opentau/policies/pi05/modeling_pi05.py | 60 ++++++++++++---------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 2715a34..832b6ff 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -292,7 +292,6 @@ def __init__( def reset(self) -> None: """This should be called whenever the environment is reset.""" self._action_queue = deque([], maxlen=self.config.n_action_steps) - self._executed_actions: deque[Tensor] = deque([], maxlen=self.config.max_delay) @classmethod def from_pretrained( @@ -526,10 +525,9 @@ def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor: def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) -> Tensor: """Select a single action given environment observations. - This method calls sample_actions every step and returns one action at a time from the new chunk. - The queue is replaced with the new chunk each time. The last config.max_delay executed actions - are passed to sample_actions as action_prefix; at episode start (no previous actions), delay - is 0. + This method uses an action queue that is replenished when it has config.max_delay or fewer actions (or is empty). + When replenishing, the current queue contents are used as action_prefix for sample_actions, + then the queue is refilled with the new chunk. Note: This method should only be called when running a policy in simulation. For real world inference, this method should be written in the ROS client node. @@ -543,30 +541,36 @@ def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) - """ self.eval() - action_prefix = None - delay = 0 - if self.config.max_delay > 0 and len(self._executed_actions) > 0: - action_prefix = torch.stack(list(self._executed_actions), dim=1) - delay = action_prefix.shape[1] - action_prefix = self.normalize_actions({"actions": action_prefix})["actions"] - original_action_dim = self.config.action_feature.shape[0] - if original_action_dim < self.config.max_action_dim: - action_prefix = F.pad( - action_prefix, - (0, self.config.max_action_dim - original_action_dim), - ) - if delay < self.config.chunk_size: - action_prefix = F.pad( - action_prefix, - (0, 0, 0, self.config.chunk_size - delay), - ) - actions = self.sample_actions(batch, noise=noise, action_prefix=action_prefix, delay=delay) - actions = rearrange(actions, "b c d -> c b d") - self._action_queue.clear() - self._action_queue.extend(actions[delay:]) + if len(self._action_queue) == 0 or len(self._action_queue) <= self.config.max_delay: + # Use current queue as action prefix to replenish + action_prefix = None + delay = 0 + if len(self._action_queue) > 0: + prefix_actions = list(self._action_queue) + delay = min(len(prefix_actions), self.config.max_delay) + assert delay == self.config.max_delay, f"Delay must be equal to {self.config.max_delay}" + prefix_actions = prefix_actions[-delay:] + action_prefix = torch.stack(prefix_actions, dim=1) + action_prefix = self.normalize_actions({"actions": action_prefix})["actions"] + original_action_dim = self.config.action_feature.shape[0] + if original_action_dim < self.config.max_action_dim: + action_prefix = F.pad( + action_prefix, + (0, self.config.max_action_dim - original_action_dim), + ) + if delay < self.config.chunk_size: + action_prefix = F.pad( + action_prefix, + (0, 0, 0, self.config.chunk_size - delay), + ) + actions = self.sample_actions(batch, noise=noise, action_prefix=action_prefix, delay=delay) + actions = rearrange(actions, "b c d -> c b d") + self._action_queue.extend(actions[delay:]) + assert len(self._action_queue) == self.config.n_action_steps, ( + f"Action queue must have {self.config.n_action_steps} actions" + ) + action = self._action_queue.popleft() - if self.config.max_delay > 0: - self._executed_actions.append(action) return action @torch.no_grad() From 38e63839101ceb179a814422429c9232c45e6d0e Mon Sep 17 00:00:00 2001 From: William Yue Date: Thu, 5 Feb 2026 15:35:35 -0800 Subject: [PATCH 11/12] merge --- src/opentau/policies/pi05/modeling_pi05.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index 832b6ff..ad3d673 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -1325,9 +1325,9 @@ def forward( # compute mean response_ce_loss = response_ce_loss.mean() else: - response_ce_loss = torch.tensor(0.0, device=losses.device) + response_ce_loss = torch.tensor(0.0, device=mse_loss.device) - return {"MSE": losses, "CE": discrete_action_ce_loss + response_ce_loss} + return {"MSE": mse_loss, "CE": discrete_action_ce_loss + response_ce_loss} def sample_actions( self, From 761f8bf85c20ee812fd1b77e28ddd06e7cbf55f1 Mon Sep 17 00:00:00 2001 From: William Yue Date: Fri, 6 Feb 2026 11:25:43 -0800 Subject: [PATCH 12/12] fix action normalization --- src/opentau/policies/pi05/modeling_pi05.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/opentau/policies/pi05/modeling_pi05.py b/src/opentau/policies/pi05/modeling_pi05.py index ad3d673..5dc5b85 100644 --- a/src/opentau/policies/pi05/modeling_pi05.py +++ b/src/opentau/policies/pi05/modeling_pi05.py @@ -271,7 +271,7 @@ def __init__( self.normalize_targets = Normalize( config.output_features, config.normalization_mapping, dataset_stats ) - self.normalize_actions = Normalize( + self.normalize_discrete_actions = Normalize( config.output_features, {"ACTION": NormalizationMode.MIN_MAX}, dataset_stats ) self.unnormalize_outputs = Unnormalize( @@ -551,7 +551,7 @@ def select_action(self, batch: dict[str, Tensor], noise: Tensor | None = None) - assert delay == self.config.max_delay, f"Delay must be equal to {self.config.max_delay}" prefix_actions = prefix_actions[-delay:] action_prefix = torch.stack(prefix_actions, dim=1) - action_prefix = self.normalize_actions({"actions": action_prefix})["actions"] + action_prefix = self.normalize_targets({"actions": action_prefix})["actions"] original_action_dim = self.config.action_feature.shape[0] if original_action_dim < self.config.max_action_dim: action_prefix = F.pad( @@ -630,7 +630,7 @@ def forward( A dictionary containing the loss components ("MSE" and "CE"). """ batch = self.normalize_inputs(batch) - batch["discrete_actions"] = self.normalize_actions(dict(batch))["actions"] + batch["discrete_actions"] = self.normalize_discrete_actions(dict(batch))["actions"] batch = self.normalize_targets(batch) images, img_masks = self.prepare_images(