Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion com.unity.ml-agents/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ and this project adheres to
### Major Changes
#### com.unity.ml-agents (C#)
#### ml-agents / ml-agents-envs / gym-unity (Python)
The minimum supported python version for ml-agents-envs was changed to 3.6.1. (#4244)
- The minimum supported python version for ml-agents-envs was changed to 3.6.1. (#4244)
- The interaction between EnvManager and TrainerController was changed; EnvManager.advance() was split into to stages,
and TrainerController now uses the results from the first stage to handle new behavior names. This change speeds up
Python training by approximately 5-10%. (#4259)

### Minor Changes
#### com.unity.ml-agents (C#)
Expand Down
23 changes: 16 additions & 7 deletions ml-agents/mlagents/trainers/env_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class EnvManager(ABC):
def __init__(self):
self.policies: Dict[BehaviorName, TFPolicy] = {}
self.agent_managers: Dict[BehaviorName, AgentManager] = {}
self.first_step_infos: List[EnvironmentStep] = None
self.first_step_infos: List[EnvironmentStep] = []

def set_policy(self, brain_name: BehaviorName, policy: TFPolicy) -> None:
self.policies[brain_name] = policy
Expand Down Expand Up @@ -84,15 +84,20 @@ def training_behaviors(self) -> Dict[BehaviorName, BehaviorSpec]:
def close(self):
pass

def advance(self):
def get_steps(self) -> List[EnvironmentStep]:
"""
Updates the policies, steps the environments, and returns the step information from the environments.
Calling code should pass the returned EnvironmentSteps to process_steps() after calling this.
:return: The list of EnvironmentSteps
"""
# If we had just reset, process the first EnvironmentSteps.
# Note that we do it here instead of in reset() so that on the very first reset(),
# we can create the needed AgentManagers before calling advance() and processing the EnvironmentSteps.
if self.first_step_infos is not None:
if self.first_step_infos:
self._process_step_infos(self.first_step_infos)
self.first_step_infos = None
self.first_step_infos = []
# Get new policies if found. Always get the latest policy.
for brain_name in self.training_behaviors:
for brain_name in self.agent_managers.keys():
_policy = None
try:
# We make sure to empty the policy queue before continuing to produce steps.
Expand All @@ -101,9 +106,13 @@ def advance(self):
_policy = self.agent_managers[brain_name].policy_queue.get_nowait()
except AgentManagerQueue.Empty:
if _policy is not None:
self.set_policy(brain_name, _policy)
# Step the environment
# policy_queue contains Policy, but we need a TFPolicy here
self.set_policy(brain_name, _policy) # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't previously typchecking this method. This was an old "error" but is surfaced now. Not sure if we should fix it here or separately. cc @ervteng @andrewcoh

Copy link
Contributor

@ervteng ervteng Jul 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was some subtlety about types here given that Policy didn't contain some of the methods used by the AgentProcessor, e.g. get/set memories. We can fix this separately after Arthur's PR #4254 is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I'll leave the # type: ignore for now.

# Step the environments
new_step_infos = self._step()
return new_step_infos

def process_steps(self, new_step_infos: List[EnvironmentStep]) -> int:
# Add to AgentProcessor
num_step_infos = self._process_step_infos(new_step_infos)
return num_step_infos
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def test_advance(self, mock_create_worker, training_behaviors_mock, step_mock):
}
step_info = EnvironmentStep(step_info_dict, 0, action_info_dict, env_stats)
step_mock.return_value = [step_info]
env_manager.advance()
env_manager.process_steps(env_manager.get_steps())

# Test add_experiences
env_manager._step.assert_called_once()
Expand Down
3 changes: 2 additions & 1 deletion ml-agents/mlagents/trainers/tests/test_trainer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def test_advance_adds_experiences_to_trainer_and_trains(
tc.advance(env_mock)

env_mock.reset.assert_not_called()
env_mock.advance.assert_called_once()
env_mock.get_steps.assert_called_once()
env_mock.process_steps.assert_called_once()
# May have been called many times due to thread
trainer_mock.advance.call_count > 0
36 changes: 26 additions & 10 deletions ml-agents/mlagents/trainers/trainer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from mlagents.tf_utils import tf

from mlagents_envs.logging_util import get_logger
from mlagents.trainers.env_manager import EnvManager
from mlagents.trainers.env_manager import EnvManager, EnvironmentStep
from mlagents_envs.exception import (
UnityEnvironmentException,
UnityCommunicationException,
Expand Down Expand Up @@ -59,6 +59,7 @@ def __init__(
self.train_model = train
self.param_manager = param_manager
self.ghost_controller = self.trainer_factory.ghost_controller
self.registered_behavior_ids: Set[str] = set()

self.trainer_threads: List[threading.Thread] = []
self.kill_trainers = False
Expand Down Expand Up @@ -101,15 +102,17 @@ def _create_output_path(output_path):
)

@timed
def _reset_env(self, env: EnvManager) -> None:
def _reset_env(self, env_manager: EnvManager) -> None:
"""Resets the environment.

Returns:
A Data structure corresponding to the initial reset state of the
environment.
"""
new_config = self.param_manager.get_current_samplers()
env.reset(config=new_config)
env_manager.reset(config=new_config)
# Register any new behavior ids that were generated on the reset.
self._register_new_behaviors(env_manager, env_manager.first_step_infos)

def _not_done_training(self) -> bool:
return (
Expand Down Expand Up @@ -169,15 +172,10 @@ def _create_trainers_and_managers(
def start_learning(self, env_manager: EnvManager) -> None:
self._create_output_path(self.output_path)
tf.reset_default_graph()
last_brain_behavior_ids: Set[str] = set()
try:
# Initial reset
self._reset_env(env_manager)
while self._not_done_training():
external_brain_behavior_ids = set(env_manager.training_behaviors.keys())
new_behavior_ids = external_brain_behavior_ids - last_brain_behavior_ids
self._create_trainers_and_managers(env_manager, new_behavior_ids)
last_brain_behavior_ids = external_brain_behavior_ids
n_steps = self.advance(env_manager)
for _ in range(n_steps):
self.reset_env_if_ready(env_manager)
Expand Down Expand Up @@ -233,10 +231,12 @@ def reset_env_if_ready(self, env: EnvManager) -> None:
env.set_env_parameters(self.param_manager.get_current_samplers())

@timed
def advance(self, env: EnvManager) -> int:
def advance(self, env_manager: EnvManager) -> int:
# Get steps
with hierarchical_timer("env_step"):
num_steps = env.advance()
new_step_infos = env_manager.get_steps()
self._register_new_behaviors(env_manager, new_step_infos)
num_steps = env_manager.process_steps(new_step_infos)

# Report current lesson for each environment parameter
for (
Expand All @@ -255,6 +255,22 @@ def advance(self, env: EnvManager) -> int:

return num_steps

def _register_new_behaviors(
self, env_manager: EnvManager, step_infos: List[EnvironmentStep]
) -> None:
"""
Handle registration (adding trainers and managers) of new behaviors ids.
:param env_manager:
:param step_infos:
:return:
"""
step_behavior_ids: Set[str] = set()
for s in step_infos:
step_behavior_ids |= set(s.name_behavior_ids)
new_behavior_ids = step_behavior_ids - self.registered_behavior_ids
self._create_trainers_and_managers(env_manager, new_behavior_ids)
self.registered_behavior_ids |= step_behavior_ids

def join_threads(self, timeout_seconds: float = 1.0) -> None:
"""
Wait for threads to finish, and merge their timer information into the main thread.
Expand Down