diff --git a/com.unity.ml-agents/CHANGELOG.md b/com.unity.ml-agents/CHANGELOG.md index 733dcba37b..55aa465de6 100755 --- a/com.unity.ml-agents/CHANGELOG.md +++ b/com.unity.ml-agents/CHANGELOG.md @@ -11,7 +11,10 @@ and this project adheres to ### Major Changes #### com.unity.ml-agents (C#) #### ml-agents / ml-agents-envs / gym-unity (Python) -The minimum supported python version for ml-agents-envs was changed to 3.6.1. (#4244) +- The minimum supported python version for ml-agents-envs was changed to 3.6.1. (#4244) +- The interaction between EnvManager and TrainerController was changed; EnvManager.advance() was split into to stages, +and TrainerController now uses the results from the first stage to handle new behavior names. This change speeds up +Python training by approximately 5-10%. (#4259) ### Minor Changes #### com.unity.ml-agents (C#) diff --git a/ml-agents/mlagents/trainers/env_manager.py b/ml-agents/mlagents/trainers/env_manager.py index e50fd1ce24..9d03db3dc7 100644 --- a/ml-agents/mlagents/trainers/env_manager.py +++ b/ml-agents/mlagents/trainers/env_manager.py @@ -38,7 +38,7 @@ class EnvManager(ABC): def __init__(self): self.policies: Dict[BehaviorName, TFPolicy] = {} self.agent_managers: Dict[BehaviorName, AgentManager] = {} - self.first_step_infos: List[EnvironmentStep] = None + self.first_step_infos: List[EnvironmentStep] = [] def set_policy(self, brain_name: BehaviorName, policy: TFPolicy) -> None: self.policies[brain_name] = policy @@ -84,15 +84,20 @@ def training_behaviors(self) -> Dict[BehaviorName, BehaviorSpec]: def close(self): pass - def advance(self): + def get_steps(self) -> List[EnvironmentStep]: + """ + Updates the policies, steps the environments, and returns the step information from the environments. + Calling code should pass the returned EnvironmentSteps to process_steps() after calling this. + :return: The list of EnvironmentSteps + """ # If we had just reset, process the first EnvironmentSteps. # Note that we do it here instead of in reset() so that on the very first reset(), # we can create the needed AgentManagers before calling advance() and processing the EnvironmentSteps. - if self.first_step_infos is not None: + if self.first_step_infos: self._process_step_infos(self.first_step_infos) - self.first_step_infos = None + self.first_step_infos = [] # Get new policies if found. Always get the latest policy. - for brain_name in self.training_behaviors: + for brain_name in self.agent_managers.keys(): _policy = None try: # We make sure to empty the policy queue before continuing to produce steps. @@ -101,9 +106,13 @@ def advance(self): _policy = self.agent_managers[brain_name].policy_queue.get_nowait() except AgentManagerQueue.Empty: if _policy is not None: - self.set_policy(brain_name, _policy) - # Step the environment + # policy_queue contains Policy, but we need a TFPolicy here + self.set_policy(brain_name, _policy) # type: ignore + # Step the environments new_step_infos = self._step() + return new_step_infos + + def process_steps(self, new_step_infos: List[EnvironmentStep]) -> int: # Add to AgentProcessor num_step_infos = self._process_step_infos(new_step_infos) return num_step_infos diff --git a/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py b/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py index f8463f1703..818e0747c7 100644 --- a/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py +++ b/ml-agents/mlagents/trainers/tests/test_subprocess_env_manager.py @@ -166,7 +166,7 @@ def test_advance(self, mock_create_worker, training_behaviors_mock, step_mock): } step_info = EnvironmentStep(step_info_dict, 0, action_info_dict, env_stats) step_mock.return_value = [step_info] - env_manager.advance() + env_manager.process_steps(env_manager.get_steps()) # Test add_experiences env_manager._step.assert_called_once() diff --git a/ml-agents/mlagents/trainers/tests/test_trainer_controller.py b/ml-agents/mlagents/trainers/tests/test_trainer_controller.py index cd3b5533a4..8461bd5c31 100644 --- a/ml-agents/mlagents/trainers/tests/test_trainer_controller.py +++ b/ml-agents/mlagents/trainers/tests/test_trainer_controller.py @@ -141,6 +141,7 @@ def test_advance_adds_experiences_to_trainer_and_trains( tc.advance(env_mock) env_mock.reset.assert_not_called() - env_mock.advance.assert_called_once() + env_mock.get_steps.assert_called_once() + env_mock.process_steps.assert_called_once() # May have been called many times due to thread trainer_mock.advance.call_count > 0 diff --git a/ml-agents/mlagents/trainers/trainer_controller.py b/ml-agents/mlagents/trainers/trainer_controller.py index 201a761f88..e38f81b9d7 100644 --- a/ml-agents/mlagents/trainers/trainer_controller.py +++ b/ml-agents/mlagents/trainers/trainer_controller.py @@ -11,7 +11,7 @@ from mlagents.tf_utils import tf from mlagents_envs.logging_util import get_logger -from mlagents.trainers.env_manager import EnvManager +from mlagents.trainers.env_manager import EnvManager, EnvironmentStep from mlagents_envs.exception import ( UnityEnvironmentException, UnityCommunicationException, @@ -59,6 +59,7 @@ def __init__( self.train_model = train self.param_manager = param_manager self.ghost_controller = self.trainer_factory.ghost_controller + self.registered_behavior_ids: Set[str] = set() self.trainer_threads: List[threading.Thread] = [] self.kill_trainers = False @@ -101,7 +102,7 @@ def _create_output_path(output_path): ) @timed - def _reset_env(self, env: EnvManager) -> None: + def _reset_env(self, env_manager: EnvManager) -> None: """Resets the environment. Returns: @@ -109,7 +110,9 @@ def _reset_env(self, env: EnvManager) -> None: environment. """ new_config = self.param_manager.get_current_samplers() - env.reset(config=new_config) + env_manager.reset(config=new_config) + # Register any new behavior ids that were generated on the reset. + self._register_new_behaviors(env_manager, env_manager.first_step_infos) def _not_done_training(self) -> bool: return ( @@ -169,15 +172,10 @@ def _create_trainers_and_managers( def start_learning(self, env_manager: EnvManager) -> None: self._create_output_path(self.output_path) tf.reset_default_graph() - last_brain_behavior_ids: Set[str] = set() try: # Initial reset self._reset_env(env_manager) while self._not_done_training(): - external_brain_behavior_ids = set(env_manager.training_behaviors.keys()) - new_behavior_ids = external_brain_behavior_ids - last_brain_behavior_ids - self._create_trainers_and_managers(env_manager, new_behavior_ids) - last_brain_behavior_ids = external_brain_behavior_ids n_steps = self.advance(env_manager) for _ in range(n_steps): self.reset_env_if_ready(env_manager) @@ -233,10 +231,12 @@ def reset_env_if_ready(self, env: EnvManager) -> None: env.set_env_parameters(self.param_manager.get_current_samplers()) @timed - def advance(self, env: EnvManager) -> int: + def advance(self, env_manager: EnvManager) -> int: # Get steps with hierarchical_timer("env_step"): - num_steps = env.advance() + new_step_infos = env_manager.get_steps() + self._register_new_behaviors(env_manager, new_step_infos) + num_steps = env_manager.process_steps(new_step_infos) # Report current lesson for each environment parameter for ( @@ -255,6 +255,22 @@ def advance(self, env: EnvManager) -> int: return num_steps + def _register_new_behaviors( + self, env_manager: EnvManager, step_infos: List[EnvironmentStep] + ) -> None: + """ + Handle registration (adding trainers and managers) of new behaviors ids. + :param env_manager: + :param step_infos: + :return: + """ + step_behavior_ids: Set[str] = set() + for s in step_infos: + step_behavior_ids |= set(s.name_behavior_ids) + new_behavior_ids = step_behavior_ids - self.registered_behavior_ids + self._create_trainers_and_managers(env_manager, new_behavior_ids) + self.registered_behavior_ids |= step_behavior_ids + def join_threads(self, timeout_seconds: float = 1.0) -> None: """ Wait for threads to finish, and merge their timer information into the main thread.