Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9b7b607
Add failing autoresume world size 2 test
Oct 11, 2022
f936ce5
Add hanlin's log statements
Oct 11, 2022
6124366
typo
Oct 11, 2022
11ddc8a
Add error for deepspeed + autoresume
Oct 11, 2022
861e7f2
Attempt to support autoresume for multigpu/multinode/deepspeed
Oct 12, 2022
ff478ff
Merge branch 'dev' into auto_resumption
Oct 12, 2022
3f0c477
Switch to warning for autoresume checkpoint not found so that True wo…
Oct 12, 2022
dc11c88
Add a barrier
Oct 13, 2022
1a8c6aa
Reorganize code
Oct 13, 2022
d2e74ec
Merge branch 'dev' into auto_resumption
Oct 13, 2022
1285503
Clean up final file exists check
Oct 13, 2022
a32462a
Fix doctests due to new errors raised
Oct 13, 2022
a808a3a
rerun tests
Oct 13, 2022
361727d
Rename _attempt_checkpoint_download
Oct 13, 2022
fc8b370
Move makedirs out of function
Oct 13, 2022
833cc5f
return None for first autoresume run
Oct 13, 2022
4af7de8
Nits to remove else
Oct 13, 2022
f89d2c8
Merge branch 'dev' into auto_resumption
Oct 13, 2022
db9e24b
rerun tests
Oct 13, 2022
978ca89
Add debug statement
Oct 14, 2022
7e5153a
Broadcast the remote path to all ranks
Oct 14, 2022
1e72431
Add debug statement
Oct 14, 2022
8ad80ee
Adjust debug statement
Oct 14, 2022
cda6d2f
more logs
Oct 14, 2022
5c80159
not abspath for remote file name
Oct 14, 2022
50fb3f0
format debug
Oct 14, 2022
f91dda0
Change order of debug
Oct 14, 2022
ef7219d
Add barrier
Oct 14, 2022
3214e9a
Merge branch 'dev' into auto_resumption
Oct 14, 2022
3b20561
Add another log
Oct 14, 2022
7f09290
Merge branch 'dev' into auto_resumption
Oct 14, 2022
d0b5257
merge
Oct 14, 2022
0a56763
PR comments
Oct 14, 2022
552c737
Change concurrent uploads default to 1
Oct 15, 2022
986b60d
Undo, meant to push to a different branch
Oct 15, 2022
39fbcc0
Merge dev
Oct 15, 2022
29fcc6d
Change default concurrent uploads to 1
Oct 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions composer/loggers/remote_uploader_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class RemoteUploaderDownloader(LoggerDestination):

Default: ``'{remote_file_name}'``

num_concurrent_uploads (int, optional): Maximum number of concurrent uploads. Defaults to 4.
num_concurrent_uploads (int, optional): Maximum number of concurrent uploads. Defaults to 1.
upload_staging_folder (str, optional): A folder to use for staging uploads.
If not specified, defaults to using a :func:`~tempfile.TemporaryDirectory`.
use_procs (bool, optional): Whether to perform file uploads in background processes (as opposed to threads).
Expand All @@ -190,7 +190,7 @@ def __init__(self,
bucket_uri: str,
backend_kwargs: Optional[Dict[str, Any]] = None,
file_path_format_string: str = '{remote_file_name}',
num_concurrent_uploads: int = 4,
num_concurrent_uploads: int = 1,
upload_staging_folder: Optional[str] = None,
use_procs: bool = True,
num_attempts: int = 3) -> None:
Expand Down
99 changes: 76 additions & 23 deletions composer/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,14 +1160,36 @@ def saved_checkpoints(self) -> List[str]:
return []
return self._checkpoint_saver.saved_checkpoints

def _try_checkpoint_download(self, latest_checkpoint_path: str, save_latest_remote_file_name: str,
loggers: Sequence[LoggerDestination], load_progress_bar: bool) -> None:
"""Attempts to download the checkpoint from the logger destinations."""
log.debug(
f'Trying to download {save_latest_remote_file_name} to {latest_checkpoint_path} on rank {dist.get_global_rank()}'
)
for logger in loggers:
try:
# Fetch from logger. If it succeeds, stop trying the rest of the loggers
get_file(
path=save_latest_remote_file_name,
destination=latest_checkpoint_path,
object_store=logger,
overwrite=True,
progress_bar=load_progress_bar,
)
break
except (NotImplementedError, FileNotFoundError):
log.info(f'Checkpoint not found in: {logger}')
# Ignore errors caused by no checkpoint saved with logger
pass

def _get_autoresume_checkpoint(
self,
save_folder: str,
save_latest_filename: str,
save_latest_remote_file_name: str,
loggers: Sequence[LoggerDestination],
load_progress_bar: bool,
):
) -> Optional[str]:
"""Determines the load path when using autoresume.

First, check the ``save_folder`` for the latest checkpoint.
Expand All @@ -1182,30 +1204,61 @@ def _get_autoresume_checkpoint(
save_latest_remote_file_name = format_name_with_dist(save_latest_remote_file_name, self.state.run_name)
latest_checkpoint_path = os.path.join(save_folder, save_latest_filename)

log.info(
f'Looking for autoresume checkpoint: {save_latest_remote_file_name} (remote), {latest_checkpoint_path} (local)'
)

# If latest checkpoint is not saved locally, try to fetch from loggers
if not os.path.exists(latest_checkpoint_path):
# Make save folder in case it doesn't exist so latest checkpoint can be downloaded
if not os.path.exists(latest_checkpoint_path) and (dist.get_global_rank() == 0 or self.deepspeed_enabled):
log.debug(f'Attempting to download the checkpoint on to rank {dist.get_global_rank()}')
os.makedirs(save_folder, exist_ok=True)
for logger in loggers:
try:
# Fetch from logger. If it succeeds, stop trying the rest of the loggers
get_file(
path=save_latest_remote_file_name,
destination=latest_checkpoint_path,
object_store=logger,
overwrite=True,
progress_bar=load_progress_bar,
)
break
except (NotImplementedError, FileNotFoundError):
# Ignore errors caused by no checkpoint saved with logger
pass
# Require all ranks to have local checkpoint if we wish to restore from it
latest_checkpoint_exists = self._device.tensor_to_device(
torch.tensor([os.path.exists(latest_checkpoint_path)], dtype=torch.uint8))
dist.all_reduce(latest_checkpoint_exists, reduce_operation='MIN')
# If latest checkpoint is saved locally, change load_path to it
if int(latest_checkpoint_exists.item()) == 1:
self._try_checkpoint_download(latest_checkpoint_path, save_latest_remote_file_name, loggers,
load_progress_bar)

# list of whether the checkpoint exists on each rank
latest_checkpoint_exists = dist.all_gather_object(os.path.exists(latest_checkpoint_path))

if self.deepspeed_enabled:
# Require all ranks to have their own local checkpoint if we wish to restore from it for deepspeed
if not all(latest_checkpoint_exists):
missing_ranks = [n for (n, exist) in enumerate(latest_checkpoint_exists) if not exist]
raise RuntimeError(f'Deepspeed was enabled, but checkpoints missing on ranks: {missing_ranks}')

return latest_checkpoint_path
else:
# The checkpoint must at least exist for rank zero
if not latest_checkpoint_exists[0]:
return None

# broadcast the local checkpoint path to all ranks
latest_checkpoint_path_list = [os.path.abspath(latest_checkpoint_path)]
dist.broadcast_object_list(latest_checkpoint_path_list, src=0)
latest_checkpoint_path = latest_checkpoint_path_list[0]

# broadcast the remote checkpoint path to all ranks
save_latest_remote_file_name_list = [save_latest_remote_file_name]
dist.broadcast_object_list(save_latest_remote_file_name_list, src=0)
save_latest_remote_file_name = save_latest_remote_file_name_list[0]

# download the checkpoint on local rank 0 of all nodes
if dist.get_local_rank() == 0 and not os.path.exists(latest_checkpoint_path):
log.debug(f'Attempting to download the checkpoint {save_latest_remote_file_name} on to all nodes')
os.makedirs(save_folder, exist_ok=True)
self._try_checkpoint_download(latest_checkpoint_path, save_latest_remote_file_name, loggers,
load_progress_bar)
dist.barrier()
# At this point the rank 0 filepath should exist on all ranks
latest_checkpoint_exists_on_all_ranks = self._device.tensor_to_device(
torch.tensor([os.path.exists(latest_checkpoint_path)], dtype=torch.uint8))
dist.all_reduce(latest_checkpoint_exists_on_all_ranks, reduce_operation='MIN')

log.debug(
f'Checkpoint {latest_checkpoint_path} exists on rank {dist.get_global_rank()}? {os.path.exists(latest_checkpoint_path)}'
)

if int(latest_checkpoint_exists_on_all_ranks.item()) == 0:
raise RuntimeError('Downloading the checkpoint to all nodes failed')

return latest_checkpoint_path

def fit(
Expand Down
4 changes: 4 additions & 0 deletions composer/utils/file_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import logging
import os
import pathlib
import re
Expand All @@ -23,6 +24,8 @@
if TYPE_CHECKING:
from composer.loggers import LoggerDestination

log = logging.getLogger(__name__)

__all__ = [
'get_file',
'ensure_folder_is_empty',
Expand Down Expand Up @@ -358,6 +361,7 @@ def get_file(
# Read object name in the symlink
with open(symlink_file_name, 'r') as f:
real_path = f.read()
log.debug(f'Read path {real_path} from symlink file.')

# Recurse
return get_file(
Expand Down
55 changes: 53 additions & 2 deletions docs/source/notes/resumption.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,33 @@ However, recovering from a failure here would still require manual intervention

Instead, our trainer supports the ``autoresume=True`` feature. With autoresume, the trainer will automatically check the ``save_folder`` for the latest checkpoints and resume training.

.. testsetup::

import os
import shutil

from composer import Trainer

trainer = Trainer(
model=model,
train_dataloader=train_dataloader,
max_duration="1ep",
save_filename="ep{epoch}.pt",
save_folder="./path/to/folder",
save_overwrite=True,
save_interval="1ep", # Save checkpoints every epoch
)
trainer.fit()
trainer.close()

.. testcode::

trainer = Trainer(
...,
autoresume=True,
save_folder='./path/to/folder',
run_name='my_cool_run',
max_duration='90ep'
)

With autoresume, users can re-submit the _same_ code to the training run, and the trainer will handle finding and resuming from the latest checkpoints. This works well with systems like Kubernetes that automatically resubmit the same job when there is a node failure (due to spot instances as well). For ``autoresume=True`` to work, we require that both a ``save_folder`` and a ``run_name`` be provided. These are used to search for existing checkpoints.
Expand All @@ -76,6 +96,34 @@ Example: Object Store

A typical use case is saving checkpoints to object store (e.g. S3) when there is no local file storage shared across runs. For example, a setup such as this:

.. testsetup::
:skipif: not _LIBCLOUD_INSTALLED

from composer.loggers import RemoteUploaderDownloader
from composer.utils.object_store import S3ObjectStore

# this assumes credentials are already configured via boto3
remote_uploader_downloader = RemoteUploaderDownloader(
bucket_uri=f"s3://checkpoint-debugging",
)

import os
import shutil

from composer import Trainer

trainer = Trainer(
model=model,
train_dataloader=train_dataloader,
max_duration="1ep",
save_filename="ep{epoch}.pt",
save_folder="checkpoints",
save_overwrite=True,
save_interval="1ep", # Save checkpoints every epoch
loggers=[remote_uploader_downloader],
)
trainer.fit()
trainer.close()

.. testcode::
:skipif: not _LIBCLOUD_INSTALLED
Expand All @@ -96,6 +144,7 @@ A typical use case is saving checkpoints to object store (e.g. S3) when there is
run_name='my_cool_run',
save_remote_file_name='checkpoints/ep{epoch}.pt',
loggers=[remote_uploader_downloader],
max_duration='90ep'
)

trainer.fit()
Expand Down Expand Up @@ -123,8 +172,9 @@ To run fine-tuning on a spot instance, ``load_path`` would be set to the origina
trainer = Trainer(
...,
save_filename='pretrained_weights/model.pt',
save_folder='.',
save_folder='checkpoints',
run_name='my_cool_run',
max_duration='1ep'
)

trainer.fit()
Expand All @@ -141,7 +191,8 @@ To run fine-tuning on a spot instance, ``load_path`` would be set to the origina
run_name='my_cool_run',
loggers=[
remote_uploader_downloader
]
],
max_duration='90ep'
)


Expand Down
11 changes: 4 additions & 7 deletions tests/trainer/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from composer.utils import dist, is_tar
from composer.utils.checkpoint import glob_filter
from tests.common import RandomImageDataset, SimpleConvModel, deep_compare, device
from tests.common.markers import world_size


class DummyStatefulCallback(Callback):
Expand Down Expand Up @@ -255,16 +256,12 @@ def test_load_weights_object_store(self, tmp_path):
trainer_2.state.model,
)

@world_size(1, 2)
@device('cpu', 'gpu')
@pytest.mark.parametrize('use_object_store', [True, False])
@pytest.mark.parametrize('delete_local', [True, False])
def test_autoresume(
self,
device: str,
tmp_path: pathlib.Path,
use_object_store: bool,
delete_local: bool,
):
def test_autoresume(self, device: str, tmp_path: pathlib.Path, use_object_store: bool, delete_local: bool,
world_size: int):
if delete_local and not use_object_store:
pytest.skip('Invalid test setting.')

Expand Down