diff --git a/iris/normalizer.py b/iris/normalizer.py index 92fb584..33aeabd 100644 --- a/iris/normalizer.py +++ b/iris/normalizer.py @@ -15,8 +15,8 @@ """Normalizer class for observation and action normalization in RL.""" import abc +import copy from typing import Any, Dict, Optional, Sequence, Union - from absl import logging import gin import gym @@ -32,54 +32,80 @@ UNNORM_VAR = "unnorm_var" -@gin.configurable class Buffer(abc.ABC): """Buffer class for collecting online statistics from data.""" - def __init__(self, shape: Sequence[int] = (0,)) -> None: - self._shape = shape - self._data = {} - self.reset() + @abc.abstractmethod + def reset(self) -> None: + """Reset buffer.""" + @abc.abstractmethod def push(self, x: np.ndarray) -> None: """Push new data point.""" - def merge(self, data: Dict[str, np.ndarray]) -> None: + @abc.abstractmethod + def merge(self, data: Dict[str, Any]) -> None: """Merge data from another buffer.""" - def reset(self) -> None: - self._data[N] = 0 + @property + @abc.abstractmethod + def data(self) -> Dict[str, Any]: + """Returns copy of current data in buffer.""" @property + @abc.abstractmethod def shape(self) -> Sequence[int]: - return self._shape + """Shape of data point.""" @property - def data(self) -> Dict[str, Any]: - return self._data.copy() + @abc.abstractmethod + def state(self) -> Dict[str, Any]: + """Returns copy of current state of buffer.""" + + @state.setter + @abc.abstractmethod + def state(self, new_state: Dict[str, Any]) -> None: + """Sets state of buffer.""" - @data.setter - def data(self, data: Dict[str, Any]) -> None: - self._data = data.copy() + +class NoOpBuffer(Buffer): + """No-op buffer.""" + + def reset(self) -> None: + pass + + def push(self, x: np.ndarray) -> None: + pass + + def merge(self, data: Dict[str, Any]) -> None: + pass @property - def n(self) -> Any: - return self._data[N] + def data(self) -> Dict[str, Any]: + return {} + + @property + def shape(self) -> Sequence[int]: + return () @property def state(self) -> Dict[str, Any]: - state = {N: self.n} - return state + return {} @state.setter def state(self, new_state: Dict[str, Any]) -> None: - self._data[N] = new_state[N] + pass @gin.configurable class MeanStdBuffer(Buffer): """Collect stats for calculating mean and std online.""" + def __init__(self, shape: Sequence[int] = (0,)) -> None: + self._shape = shape + self._data = {} + self.reset() + def reset(self) -> None: self._data[N] = 0 self._data[MEAN] = np.zeros(self._shape, dtype=np.float64) @@ -95,7 +121,7 @@ def push(self, x: np.ndarray) -> None: self._data[MEAN] += delta / self._data[N] self._data[UNNORM_VAR] += delta * delta * n1 / self._data[N] - def merge(self, data: Dict[str, np.ndarray]) -> None: + def merge(self, data: Dict[str, Any]) -> None: """Merge data from another buffer.""" n1 = self._data[N] n2 = data[N] @@ -131,50 +157,55 @@ def merge(self, data: Dict[str, np.ndarray]) -> None: self._data[UNNORM_VAR] = unnorm_var @property - def mean(self) -> np.ndarray: - return self._data[MEAN] - - @property - def unnorm_var(self) -> np.ndarray: - return self._data[UNNORM_VAR] - - @property - def var(self) -> np.ndarray: - return ( - self.unnorm_var / (self.n - 1) - if self.n > 1 - else np.ones_like(self.mean) - ) + def data(self) -> Dict[str, Any]: + return copy.deepcopy(self._data) @property - def std(self) -> np.ndarray: - # asarray is needed for boolean indexing to work when shape = (1) - std = np.asarray(np.sqrt(self.var)) - std[std < 1e-7] = float("inf") - return std + def shape(self) -> Sequence[int]: + return self._shape @property def state(self) -> Dict[str, Any]: - state = {MEAN: self.mean, STD: self.std, N: self.n} - return state + return {MEAN: self._data[MEAN], STD: self._std, N: self._data[N]} @state.setter def state(self, new_state: Dict[str, Any]) -> None: - self._data[MEAN] = new_state[MEAN].copy() + new_state = copy.deepcopy(new_state) + self._data[MEAN] = new_state[MEAN] self._data[N] = new_state[N] - std = new_state[STD].copy() + + std = new_state[STD] std[std == float("inf")] = 0 var = np.square(std) - unnorm_var = var * (self.n - 1) if self.n > 1 else np.zeros_like(self.mean) + unnorm_var = ( + var * (self._data[N] - 1) + if self._data[N] > 1 + else np.zeros_like(self._data[MEAN]) + ) self._data[UNNORM_VAR] = unnorm_var + @property + def _var(self) -> np.ndarray: + return ( + self._data[UNNORM_VAR] / (self._data[N] - 1) + if self._data[N] > 1 + else np.ones_like(self._data[MEAN]) + ) + + @property + def _std(self) -> np.ndarray: + # asarray is needed for boolean indexing to work when shape = (1) + std = np.asarray(np.sqrt(self._var)) + std[std < 1e-7] = float("inf") + return std + class Normalizer(abc.ABC): """Base Normalizer class.""" - def __init__(self, - space: gym.Space, - ignored_keys: Optional[Sequence[str]] = None) -> None: + def __init__( + self, space: gym.Space, ignored_keys: Optional[Sequence[str]] = None + ) -> None: """Initializes Normalizer. Args: @@ -185,9 +216,7 @@ def __init__(self, """ self._space = space self._space_ignored = None - self._ignored_keys = ignored_keys - if ignored_keys is None: - self._ignored_keys = [] + self._ignored_keys = ignored_keys or [] if self._ignored_keys and isinstance(space, spaces.Dict): self._space = {} self._space_ignored = {} @@ -200,15 +229,23 @@ def __init__(self, self._space_ignored = spaces.Dict(self._space_ignored) self._flat_space = utils.flatten_space(self._space) self._state = {} - self._buffer = Buffer((0,)) @property + @abc.abstractmethod def buffer(self) -> Buffer: """Buffer for collecting normalization statistics.""" - return self._buffer + + @abc.abstractmethod + def __call__( + self, + value: Union[np.ndarray, Dict[str, np.ndarray]], + update_buffer: bool = True, + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """Apply normalization.""" def _filter_ignored_input( - self, input_dict: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + self, input_dict: Dict[str, np.ndarray] + ) -> Dict[str, np.ndarray]: ignored_dict = {} for key in self._ignored_keys: ignored_dict[key] = input_dict[key] @@ -216,21 +253,14 @@ def _filter_ignored_input( return ignored_dict def _add_ignored_input( - self, input_dict: Dict[str, np.ndarray], - ignored_input: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + self, + input_dict: Dict[str, np.ndarray], + ignored_input: Dict[str, np.ndarray], + ) -> Dict[str, np.ndarray]: if isinstance(input_dict, dict): input_dict.update(ignored_input) return input_dict - @abc.abstractmethod - def __call__( - self, - value: Union[np.ndarray, Dict[str, np.ndarray]], - update_buffer: bool = True) -> Union[np.ndarray, Dict[str, np.ndarray]]: - """Apply normalization.""" - raise NotImplementedError( - "Should be implemented in derived classes for specific filters.") - @property def state(self) -> Dict[str, np.ndarray]: return self._state.copy() @@ -244,10 +274,15 @@ def state(self, state: Dict[str, np.ndarray]) -> None: class NoNormalizer(Normalizer): """No Normalization applied to input.""" + @property + def buffer(self) -> Buffer: + return NoOpBuffer() + def __call__( self, value: Union[np.ndarray, Dict[str, np.ndarray]], - update_buffer: bool = True) -> Union[np.ndarray, Dict[str, np.ndarray]]: + update_buffer: bool = True, + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: del update_buffer return value @@ -256,19 +291,24 @@ def __call__( class ActionRangeDenormalizer(Normalizer): """Actions mapped to given range from [-1, 1].""" - def __init__(self, - space: gym.Space, - ignored_keys: Optional[Sequence[str]] = None) -> None: + def __init__( + self, space: gym.Space, ignored_keys: Optional[Sequence[str]] = None + ) -> None: super().__init__(space, ignored_keys) low = self._flat_space.low high = self._flat_space.high self._state["mid"] = (low + high) / 2.0 self._state["half_range"] = (high - low) / 2.0 + @property + def buffer(self) -> Buffer: + return NoOpBuffer() + def __call__( self, action: Union[np.ndarray, Dict[str, np.ndarray]], - update_buffer: bool = True) -> Union[np.ndarray, Dict[str, np.ndarray]]: + update_buffer: bool = True, + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: """Maps actions from range [-1, 1] to the range in given action space. Args: @@ -292,19 +332,24 @@ def __call__( class ObservationRangeNormalizer(Normalizer): """Observations mapped from given range to [-1, 1].""" - def __init__(self, - space: gym.Space, - ignored_keys: Optional[Sequence[str]] = None) -> None: + def __init__( + self, space: gym.Space, ignored_keys: Optional[Sequence[str]] = None + ) -> None: super().__init__(space, ignored_keys) low = self._flat_space.low high = self._flat_space.high self._state["mid"] = (low + high) / 2.0 self._state["half_range"] = (high - low) / 2.0 + @property + def buffer(self) -> Buffer: + return NoOpBuffer() + def __call__( self, observation: Union[np.ndarray, Dict[str, np.ndarray]], - update_buffer: bool = True) -> Union[np.ndarray, Dict[str, np.ndarray]]: + update_buffer: bool = True, + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: """Maps observations from range in given observation space to [-1, 1]. Args: @@ -329,26 +374,31 @@ def __call__( class RunningMeanStdNormalizer(Normalizer): """Standardize observations with mean and std calculated online.""" - def __init__(self, - space: gym.Space, - ignored_keys: Optional[Sequence[str]] = None) -> None: + def __init__( + self, space: gym.Space, ignored_keys: Optional[Sequence[str]] = None + ) -> None: super().__init__(space, ignored_keys) shape = self._flat_space.shape self._state[MEAN] = np.zeros(shape, dtype=np.float64) self._state[STD] = np.ones(shape, dtype=np.float64) self._buffer = MeanStdBuffer(shape) + @property + def buffer(self) -> MeanStdBuffer: + return self._buffer + def __call__( self, observation: Union[np.ndarray, Dict[str, np.ndarray]], - update_buffer: bool = True) -> Union[np.ndarray, Dict[str, np.ndarray]]: + update_buffer: bool = True, + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: observation = observation.copy() ignored_observation = self._filter_ignored_input(observation) observation = utils.flatten(self._space, observation) if update_buffer: self._buffer.push(observation) observation -= self._state[MEAN] - observation /= (self._state[STD] + _EPSILON) + observation /= self._state[STD] + _EPSILON observation = utils.unflatten(self._space, observation) observation = self._add_ignored_input(observation, ignored_observation) return observation @@ -360,11 +410,17 @@ class RunningMeanStdAgentVsAgentNormalizer(RunningMeanStdNormalizer): def __init__(self, space: gym.Space) -> None: # We use the "ignored_keys" to split the agent obs to process individually. super().__init__(space, ignored_keys=["opp"]) + self._buffer = MeanStdBuffer(shape=self._flat_space.shape) + + @property + def buffer(self) -> MeanStdBuffer: + return self._buffer def __call__( self, observation: Union[np.ndarray, Dict[str, np.ndarray]], - update_buffer: bool = True) -> Union[np.ndarray, Dict[str, np.ndarray]]: + update_buffer: bool = True, + ) -> Union[np.ndarray, Dict[str, np.ndarray]]: observation = observation.copy() opp_observation = self._filter_ignored_input(observation) @@ -379,7 +435,7 @@ def __call__( def _normalized(obs, unflatten_space): obs -= self._state[MEAN] - obs /= (self._state[STD] + _EPSILON) + obs /= self._state[STD] + _EPSILON obs = utils.unflatten(unflatten_space, obs) return obs diff --git a/iris/normalizer_test.py b/iris/normalizer_test.py index 59727b0..52bb869 100644 --- a/iris/normalizer_test.py +++ b/iris/normalizer_test.py @@ -22,65 +22,74 @@ class NormalizerTest(absltest.TestCase): def test_no_normalizer(self): norm = normalizer.NoNormalizer( - gym.spaces.Box(low=np.zeros(5), high=np.ones(5))) + gym.spaces.Box(low=np.zeros(5), high=np.ones(5)) + ) value = np.ones(5) norm_value = norm(value) np.testing.assert_array_equal(norm_value, value) def test_action_range_denormalizer(self): - space = gym.spaces.Box(low=np.zeros(5), high=5*np.ones(5)) + space = gym.spaces.Box(low=np.zeros(5), high=5 * np.ones(5)) norm = normalizer.ActionRangeDenormalizer(space) value = np.array([1, 1, -1, -1, 1]) norm_value = norm(value) - np.testing.assert_array_equal( - norm_value, np.array([5, 5, 0, 0, 5])) + np.testing.assert_array_equal(norm_value, np.array([5, 5, 0, 0, 5])) space = gym.spaces.Dict({ - 'sensor1': gym.spaces.Box(low=np.zeros(5), high=5*np.ones(5)), - 'sensor2': gym.spaces.Box(low=np.zeros(3), high=5*np.ones(3)), + 'sensor1': gym.spaces.Box(low=np.zeros(5), high=5 * np.ones(5)), + 'sensor2': gym.spaces.Box(low=np.zeros(3), high=5 * np.ones(3)), }) norm = normalizer.ActionRangeDenormalizer(space, ignored_keys=['sensor2']) - value = {'sensor1': np.array([1, 1, -1, -1, 1]), - 'sensor2': np.array([1, 1, -1])} + value = { + 'sensor1': np.array([1, 1, -1, -1, 1]), + 'sensor2': np.array([1, 1, -1]), + } norm_value = norm(value) np.testing.assert_array_equal( - norm_value['sensor1'], np.array([5, 5, 0, 0, 5])) + norm_value['sensor1'], np.array([5, 5, 0, 0, 5]) + ) np.testing.assert_array_equal(norm_value['sensor2'], np.array([1, 1, -1])) def test_observation_range_normalizer(self): - space = gym.spaces.Box(low=np.zeros(5), high=5*np.ones(5)) + space = gym.spaces.Box(low=np.zeros(5), high=5 * np.ones(5)) norm = normalizer.ObservationRangeNormalizer(space) value = np.array([5, 5, 0, 0, 5]) norm_value = norm(value) np.testing.assert_array_equal(norm_value, np.array([1, 1, -1, -1, 1])) space = gym.spaces.Dict({ - 'sensor1': gym.spaces.Box(low=np.zeros(5), high=5*np.ones(5)), - 'sensor2': gym.spaces.Box(low=np.zeros(3), high=5*np.ones(3)), + 'sensor1': gym.spaces.Box(low=np.zeros(5), high=5 * np.ones(5)), + 'sensor2': gym.spaces.Box(low=np.zeros(3), high=5 * np.ones(3)), }) - norm = normalizer.ObservationRangeNormalizer(space, - ignored_keys=['sensor2']) - value = {'sensor1': np.array([5, 5, 0, 0, 5]), - 'sensor2': np.array([5, 5, 0])} + norm = normalizer.ObservationRangeNormalizer( + space, ignored_keys=['sensor2'] + ) + value = { + 'sensor1': np.array([5, 5, 0, 0, 5]), + 'sensor2': np.array([5, 5, 0]), + } norm_value = norm(value) np.testing.assert_array_equal( - norm_value['sensor1'], np.array([1, 1, -1, -1, 1])) + norm_value['sensor1'], np.array([1, 1, -1, -1, 1]) + ) np.testing.assert_array_equal(norm_value['sensor2'], np.array([5, 5, 0])) def test_running_mean_std_normalizer(self): - space = gym.spaces.Box(low=np.zeros(5), high=5*np.ones(5)) + space = gym.spaces.Box(low=np.zeros(5), high=5 * np.ones(5)) norm = normalizer.RunningMeanStdNormalizer(space) value = np.array([5, 5, 0, 0, 5]) norm_value = norm(value) np.testing.assert_array_equal(norm_value, value) space = gym.spaces.Dict({ - 'sensor1': gym.spaces.Box(low=np.zeros(5), high=5*np.ones(5)), - 'sensor2': gym.spaces.Box(low=np.zeros(3), high=5*np.ones(3)), + 'sensor1': gym.spaces.Box(low=np.zeros(5), high=5 * np.ones(5)), + 'sensor2': gym.spaces.Box(low=np.zeros(3), high=5 * np.ones(3)), }) norm = normalizer.RunningMeanStdNormalizer(space, ignored_keys=['sensor2']) - value = {'sensor1': np.array([5, 5, 0, 0, 5]), - 'sensor2': np.array([5, 5, 0])} + value = { + 'sensor1': np.array([5, 5, 0, 0, 5]), + 'sensor2': np.array([5, 5, 0]), + } norm_value = norm(value) np.testing.assert_array_equal(norm_value['sensor1'], value['sensor1']) np.testing.assert_array_equal(norm_value['sensor2'], np.array([5, 5, 0])) @@ -88,29 +97,32 @@ def test_running_mean_std_normalizer(self): norm.state = norm.buffer.state norm_value = norm(value) np.testing.assert_array_equal( - norm_value['sensor1'], np.zeros_like(value['sensor1'])) + norm_value['sensor1'], np.zeros_like(value['sensor1']) + ) np.testing.assert_array_equal(norm_value['sensor2'], np.array([5, 5, 0])) state = { 'mean': value['sensor1'] / 2.0, - 'std': np.ones_like(value['sensor1']) + 'std': np.ones_like(value['sensor1']), } norm.state = state norm_value = norm(value) - np.testing.assert_array_equal(norm_value['sensor1'], value['sensor1']/2.0) + np.testing.assert_array_equal(norm_value['sensor1'], value['sensor1'] / 2.0) np.testing.assert_array_equal(norm_value['sensor2'], np.array([5, 5, 0])) - np.testing.assert_array_equal(norm.buffer.mean, value['sensor1']) - self.assertEqual(norm.buffer.n, 3) + np.testing.assert_array_equal(norm.buffer._data['mean'], value['sensor1']) + self.assertEqual(norm.buffer._data['n'], 3) data = { 'n': 1, 'mean': value['sensor1'] / 2.0, - 'unnorm_var': np.zeros_like(value['sensor1']) + 'unnorm_var': np.zeros_like(value['sensor1']), } norm.buffer.merge(data) - np.testing.assert_array_equal(norm.buffer.mean, value['sensor1']*(7/8)) - self.assertEqual(norm.buffer.n, 4) + np.testing.assert_array_equal( + norm.buffer._data['mean'], value['sensor1'] * (7 / 8) + ) + self.assertEqual(norm.buffer._data['n'], 4) def test_mean_std_buffer_empty_merge(self): mean_std_buffer = normalizer.MeanStdBuffer() @@ -121,11 +133,11 @@ def test_mean_std_buffer_empty_merge(self): def test_mean_std_buffer_scalar(self): mean_std_buffer = normalizer.MeanStdBuffer((1)) mean_std_buffer.push(np.asarray(10.0)) - self.assertEqual(mean_std_buffer.std, 1.0) # First value is always 1.0. + self.assertEqual(mean_std_buffer._std, 1.0) # First value is always 1.0. mean_std_buffer.push(np.asarray(11.0)) # sqrt(11.0-10.0 / 2.0) - np.testing.assert_almost_equal(mean_std_buffer.std, np.sqrt(0.5)) + np.testing.assert_almost_equal(mean_std_buffer._std, np.sqrt(0.5)) def test_mean_std_buffer_reject_infinity_on_merge(self): mean_std_buffer = normalizer.MeanStdBuffer((1)) @@ -135,7 +147,8 @@ def test_mean_std_buffer_reject_infinity_on_merge(self): infinty_buffer.push(np.asarray(np.inf)) mean_std_buffer.merge(infinty_buffer.data) - self.assertEqual(mean_std_buffer.n, 1) # Still only 1 value. + self.assertEqual(mean_std_buffer._data['n'], 1) # Still only 1 value. + if __name__ == '__main__': absltest.main()