Source code for torchbearer.trial

import sys

if sys.version_info[0] < 3:
    import inspect
    def get_default(fcn, arg):
        a = inspect.getargspec(fcn)
        return dict(zip(a.args[-len(a.defaults):], a.defaults))[arg]
else:
    from inspect import signature
    def get_default(fcn, arg):
        return signature(fcn).parameters[arg].default

import functools
import warnings
import itertools

import torch
import torch.nn
from torch.utils.data import DataLoader, TensorDataset
from torch.optim import Optimizer

import torchbearer
from torchbearer import cite
from torchbearer import State
from torchbearer.metrics import MetricList
from torchbearer.callbacks import Callback, CallbackList, Tqdm, AggregatePredictions
from torchbearer.bases import standard_closure, _forward_with_exceptions, _get_param_list

bibtex = """
@article{2018torchbearer,
  title={Torchbearer: A Model Fitting Library for PyTorch},
  author={Harris, Ethan and Painter, Matthew and Hare, Jonathon},
  journal={arXiv preprint arXiv:1809.03363},
  year={2018}
}
"""


class MockOptimizer(Optimizer):
    """The Mock Optimizer will be used inplace of an optimizer in the event that none is passed to the Trial class.
    """

    def __init__(self):
        super(MockOptimizer, self).__init__([torch.zeros(1)], [])

    def add_param_group(self, param_group):
        pass  # Do Nothing

    def load_state_dict(self, state_dict):
        pass  # Do Nothing

    def state_dict(self):
        return {}  # Return Empty

    def step(self, closure=None):
        if closure is not None:
            closure()

    def zero_grad(self):
        pass  # Do Nothing


class MockModel(torch.nn.Module):
    def forward(self, x, state=None):
        return None


class CallbackListInjection(CallbackList):
    """This class allows for an callback to be injected into a callback list, without masking the methods available for
    mutating the list. In this way, callbacks (such as printers) can be injected seamlessly into the methods of the
    trial class.

    Args:
        callback (Callback): The :class:`.Callback` to inject
        callback_list (CallbackList): The underlying :class:`.CallbackList`
    """

    def __init__(self, callback, callback_list):
        super(CallbackListInjection, self).__init__([])

        self.callback = callback
        self.callback_list = callback_list

    def state_dict(self):
        return self.callback_list.state_dict()

    def load_state_dict(self, state_dict):
        self.callback_list.load_state_dict(state_dict)
        return self

    def __iter__(self):
        return self.callback_list.__iter__()

    def __copy__(self):
        return self.callback_list.copy()

    def copy(self):
        return self.__copy__()

    def append(self, callback_list):
        self.callback_list.append(callback_list)

    def _for_list(self, function):
        function(self.callback)  # Call injected callback BEFORE the callback list
        function(self.callback_list)


def inject_printer(validation_label_letter='v'):
    """The inject printer decorator is used to inject the appropriate printer callback, according to the verbosity level.

    Args:
        validation_label_letter (str): The validation label letter to use

    Returns:
        A decorator
    """
    from inspect import getcallargs

    def decorator(func):
        root = func if not hasattr(func, 'root') else func.root
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            call_args = getcallargs(root, self, *args, **kwargs)
            verbose = call_args['verbose'] if 'verbose' in call_args else get_default(func, 'verbose')  # Populate default value
            verbose = self.verbose if verbose == -1 else verbose

            printer = get_printer(verbose=verbose, validation_label_letter=validation_label_letter)

            callback_list_old = self.state[torchbearer.CALLBACK_LIST]

            self.state[torchbearer.CALLBACK_LIST] = CallbackListInjection(printer, callback_list_old)

            res = func(self, *args, **kwargs)

            self.state[torchbearer.CALLBACK_LIST] = callback_list_old
            return res
        wrapper.root = root
        return wrapper
    return decorator


def get_printer(verbose, validation_label_letter):
    if verbose >= 2:
        printer = Tqdm(validation_label_letter=validation_label_letter)
    elif verbose >= 1:
        printer = Tqdm(validation_label_letter=validation_label_letter, on_epoch=True)
    else:
        printer = Callback()

    return printer


[docs] def deep_to(batch, device, dtype): """ Static method to call :func:`to` on tensors, tuples or dicts. All items will have :func:`deep_to` called Example: :: >>> import torch >>> from torchbearer import deep_to >>> example_dict = {'a': torch.ones(5)*2.1, 'b': torch.ones(1)*5.9} >>> deep_to(example_dict, device='cpu', dtype=torch.int) {'a': tensor([2, 2, 2, 2, 2], dtype=torch.int32), 'b': tensor([5], dtype=torch.int32)} Args: batch (tuple / list / torch.Tensor / dict): The mini-batch which requires a :func:`to` call device (torch.device): The desired device of the batch dtype (torch.dtype): The desired datatype of the batch Returns: tuple / list / torch.Tensor: The moved or casted batch """ is_tuple = isinstance(batch, tuple) if isinstance(batch, list) or isinstance(batch, tuple): batch = list(batch) for i in range(len(batch)): batch[i] = deep_to(batch[i], device, dtype) batch = tuple(batch) if is_tuple else batch elif isinstance(batch, dict): for key in batch: batch[key] = deep_to(batch[key], device, dtype) elif torch.is_tensor(batch): if batch.dtype.is_floating_point: batch = batch.to(device, dtype) else: batch = batch.to(device) return batch
[docs] def load_batch_infinite(loader): """ Wraps a batch loader and refreshes the iterator once it has been completed. Args: loader: batch loader to wrap """ def call(state): try: loader(state) except StopIteration: state[torchbearer.ITERATOR] = iter(state[torchbearer.GENERATOR]) loader(state) return call
[docs] def load_batch_standard(state): """ Load a standard (input data, target) tuple mini-batch from iterator into state Args: state (dict): The current state dict of the :class:`Trial`. """ state[torchbearer.X], state[torchbearer.Y_TRUE] = deep_to(next(state[torchbearer.ITERATOR]), state[torchbearer.DEVICE], state[torchbearer.DATA_TYPE])
[docs] def load_batch_none(state): """ Load a none (none, none) tuple mini-batch into state Args: state (dict): The current state dict of the :class:`Trial`. """ state[torchbearer.X], state[torchbearer.Y_TRUE] = None, None
[docs] def load_batch_predict(state): """ Load a prediction (input data, target) or (input data) mini-batch from iterator into state Args: state (dict): The current state dict of the :class:`Trial`. """ data = deep_to(next(state[torchbearer.ITERATOR]), state[torchbearer.DEVICE], state[torchbearer.DATA_TYPE]) if isinstance(data, list) or isinstance(data, tuple): try: state[torchbearer.X], state[torchbearer.Y_TRUE] = data except ValueError: state[torchbearer.X] = data[0] else: state[torchbearer.X] = data
def inject_sampler(data_key, batch_sampler): """ Decorator to inject a :class:`Sampler` into state[torchbearer.SAMPLER] along with the specified \ generator into state[torchbearer.GENERATOR] and number of steps into state[torchbearer.STEPS] Args: data_key (:class:`.StateKey`): Key for the data to inject batch_sampler (function): Batch sampler function that extracts batch from data loader, stores in state and sends data to correct device Returns: The decorator """ from inspect import getcallargs def decorator(func): root = func if not hasattr(func, 'root') else func.root def infinite_wrapper(self, key, generator, steps, sampler): if generator is not None and steps is not None: over_steps = steps > len(generator) inf_steps = steps == -1 inf_train_loader = key == torchbearer.TRAIN_DATA and self.state[torchbearer.INF_TRAIN_LOADING] if over_steps or inf_steps or inf_train_loader: # Want iterator to refresh at end not per epoch if steps == -1: warnings.warn("Trial is set to run indefinitely. " "Make sure you have some method to terminate safely.") sampler = load_batch_infinite(sampler) # Want iterator to run until end before refreshing regardless of number of train/val steps if inf_train_loader and not hasattr(generator, 'tb_iter'): generator.tb_iter = iter(generator) return generator, sampler @functools.wraps(func) def wrapper(self, *args, **kwargs): sampler = batch_sampler call_args = getcallargs(root, self, *args, **kwargs) key = call_args['data_key'] if ('data_key' in call_args and call_args['data_key'] is not None) else data_key # Populate default value generator, steps = self.state[key] if self.state[key] is not None else (None, None) if self.state[torchbearer.LOADER] is not None: sampler = self.state[torchbearer.LOADER] elif generator is None: sampler = load_batch_none generator, sampler = infinite_wrapper(self, key, generator, steps, sampler) self.state[torchbearer.DATA] = key self.state[torchbearer.SAMPLER] = sampler self.state[torchbearer.GENERATOR] = generator self.state[torchbearer.STEPS] = steps res = func(self, *args, **kwargs) return res wrapper.root = root return wrapper return decorator def inject_callback(callback): """ Decorator to inject a callback into the callback list and remove the callback after the decorated function has executed Args: callback (Callback): :class:`.Callback` to be injected Returns: The decorator """ def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): callback_list_old = self.state[torchbearer.CALLBACK_LIST] self.state[torchbearer.CALLBACK_LIST] = CallbackListInjection(callback, callback_list_old) res = func(self, *args, **kwargs) self.state[torchbearer.CALLBACK_LIST] = callback_list_old return res return wrapper return decorator
[docs] def update_device_and_dtype(state, *args, **kwargs): """Function gets data type and device values from the args / kwargs and updates state. Args: state (State): The :class:`.State` to update args: Arguments to the :func:`Trial.to` function kwargs: Keyword arguments to the :func:`Trial.to` function Returns: state """ for key, _ in kwargs.items(): if key == str(torchbearer.DATA_TYPE): state[torchbearer.DATA_TYPE] = kwargs['dtype'] elif str(torchbearer.DEVICE) in kwargs: state[torchbearer.DEVICE] = kwargs['device'] for arg in args: if isinstance(arg, torch.dtype): state[torchbearer.DATA_TYPE] = arg else: state[torchbearer.DEVICE] = arg return state
[docs] @cite(bibtex) class Trial(object): """ The trial class contains all of the required hyper-parameters for model running in torchbearer and presents an API for model fitting, evaluating and predicting. Example: :: >>> import torch >>> from torchbearer import Trial # Example trial that attempts to minimise the output of a linear layer. # Makes use of a callback to input the random data at each batch and a loss that is the absolute value of the # linear layer output. Runs for 10 iterations and a single epoch. >>> model = torch.nn.Linear(2,1) >>> optimiser = torch.optim.Adam(model.parameters(), lr=3e-4) >>> @torchbearer.callbacks.on_sample ... def initial_data(state): ... state[torchbearer.X] = torch.rand(1, 2)*10 >>> def minimise_output_loss(y_pred, y_true): ... return torch.abs(y_pred) >>> trial = Trial(model, optimiser, minimise_output_loss, ['loss'], [initial_data]).for_steps(10).run(1) Args: model (torch.nn.Module): The base pytorch model optimizer (torch.optim.Optimizer): The optimizer used for pytorch model weight updates criterion (func / None): The final loss criterion that provides a loss value to the optimizer metrics (list): The list of :class:`torchbearer.Metric <.Metric>` instances to process during fitting callbacks (list): The list of :class:`torchbearer.Callback <.Callback>` instances to call during fitting verbose (int): Global verbosity .If 2: use tqdm on batch, If 1: use tqdm on epoch, If 0: display no training progress """ def __init__(self, model, optimizer=None, criterion=None, metrics=[], callbacks=[], verbose=2): if criterion is None: def criterion(_, __): return torch.zeros(1, device=self.state[torchbearer.DEVICE], dtype=self.state[torchbearer.DATA_TYPE], requires_grad=True) self.verbose = verbose self.closure = standard_closure() self.state = State() self.state.update({ torchbearer.MODEL: model if model is not None else MockModel(), torchbearer.CRITERION: criterion, torchbearer.OPTIMIZER: optimizer if optimizer is not None else MockOptimizer(), torchbearer.METRIC_LIST: MetricList(metrics), torchbearer.CALLBACK_LIST: CallbackList(callbacks), torchbearer.DEVICE: 'cpu', torchbearer.DATA_TYPE: torch.float32, torchbearer.SELF: self, torchbearer.HISTORY: [], torchbearer.BACKWARD_ARGS: {}, torchbearer.TRAIN_GENERATOR: None, torchbearer.VALIDATION_GENERATOR: None, torchbearer.TEST_GENERATOR: None, torchbearer.TRAIN_STEPS: None, torchbearer.VALIDATION_STEPS: None, torchbearer.TEST_STEPS: None, torchbearer.TRAIN_DATA: None, torchbearer.VALIDATION_DATA: None, torchbearer.TEST_DATA: None, torchbearer.INF_TRAIN_LOADING: False, torchbearer.LOADER: None }) self.state[torchbearer.CALLBACK_LIST].on_init(self.state) def __str__(self): def state_string(name, state_key): import math N = (50-len(name))/2 res = "-" * int(math.floor(N)) + " " + name.upper() + " " + "-" * int(math.ceil(N)) res = res + "-" if len(res) < 52 else res return res + "\n" + str(self.state[state_key]) + "\n\n" optim_str = state_string('Optimzer', torchbearer.OPTIMIZER) crit_str = state_string("Criterion", torchbearer.CRITERION) metrics_str = state_string("Metrics", torchbearer.METRIC_LIST) callbacks_str = state_string("Callbacks", torchbearer.CALLBACK_LIST) model_str = state_string("Model", torchbearer.MODEL) return optim_str + crit_str + metrics_str + callbacks_str + model_str def __repr__(self): return str(self) # Data addition
[docs] def for_train_steps(self, steps): """Run this trial for the given number of training steps. Note that the generator will output (None, None) if it has not been set. Useful for differentiable programming. Returns self so that methods can be chained for convenience. If steps is larger than dataset size then loader will be refreshed like if it was a new epoch. If steps is -1 then loader will be refreshed until stopped by STOP_TRAINING flag or similar. Example: :: # Simple trial that runs for 100 training iterations, in this case optimising nothing >>> from torchbearer import Trial >>> trial = Trial(None).for_train_steps(100) Args: steps (int): The number of training steps per epoch to run. Returns: Trial: self """ if not isinstance(steps, int): warnings.warn("Number of training steps is not an int, casting to int") steps = int(steps) self.state[torchbearer.TRAIN_STEPS] = steps self.state[torchbearer.TRAIN_DATA] = (self.state[torchbearer.TRAIN_GENERATOR], self.state[torchbearer.TRAIN_STEPS]) return self
[docs] def with_train_generator(self, generator, steps=None): """Use this trial with the given train generator. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 100 training iterations on the MNIST dataset >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> dataloader = DataLoader(MNIST('./data/', train=True)) >>> trial = Trial(None).with_train_generator(dataloader).for_steps(100).run(1) Args: generator: The train data generator to use during calls to :meth:`.run` steps (int): The number of steps per epoch to take when using this generator. Returns: Trial: self """ self.state[torchbearer.TRAIN_GENERATOR] = generator steps = self.state[torchbearer.TRAIN_STEPS] if steps is None else steps steps = len(generator) if steps is None else steps self.for_train_steps(steps) return self
[docs] def with_train_data(self, x, y, batch_size=1, shuffle=True, num_workers=1, steps=None): """Use this trial with the given train data. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 10 training iterations on some random data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> targets = torch.rand(10, 1) >>> trial = Trial(None).with_val_data(data, targets).for_steps(10).run(1) Args: x (torch.Tensor): The train x data to use during calls to :meth:`.run` y (torch.Tensor): The train labels to use during calls to :meth:`.run` batch_size (int): The size of each batch to sample from the data shuffle (bool): If True, then data will be shuffled each epoch num_workers (int): Number of worker threads to use in the data loader steps (int): The number of steps per epoch to take when using this data Returns: Trial: self """ dataset = TensorDataset(x, y) dataloader = DataLoader(dataset, batch_size, shuffle=shuffle, num_workers=num_workers) self.with_train_generator(dataloader, steps=steps) return self
[docs] def for_val_steps(self, steps): """Run this trial for the given number of validation steps. Note that the generator will output (None, None) if it has not been set. Useful for differentiable programming. Returns self so that methods can be chained for convenience. If steps larger than dataset size then loader will be refreshed like if it was a new epoch. If steps -1 then loader will be refreshed until stopped by STOP_TRAINING flag or similar. Example: :: # Simple trial that runs for 10 validation iterations on no data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> trial = Trial(None).for_val_steps(10).run(1) Args: steps (int): The number of validation steps per epoch to run Returns: Trial: self """ if not isinstance(steps, int): warnings.warn("Number of validation steps is not an int, casting to int") steps = int(steps) self.state[torchbearer.VALIDATION_STEPS] = steps self.state[torchbearer.VALIDATION_DATA] = (self.state[torchbearer.VALIDATION_GENERATOR], self.state[torchbearer.VALIDATION_STEPS]) return self
[docs] def with_val_generator(self, generator, steps=None): """Use this trial with the given validation generator. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 100 validation iterations on the MNIST dataset >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> dataloader = DataLoader(MNIST('./data/', train=False)) >>> trial = Trial(None).with_val_generator(dataloader).for_steps(100).run(1) Args: generator: The validation data generator to use during calls to :meth:`.run` and :meth:`.evaluate` steps (int): The number of steps per epoch to take when using this generator Returns: Trial: self """ self.state[torchbearer.VALIDATION_GENERATOR] = generator steps = self.state[torchbearer.VALIDATION_STEPS] if steps is None else steps steps = len(generator) if steps is None else steps self.for_val_steps(steps) return self
[docs] def with_val_data(self, x, y, batch_size=1, shuffle=True, num_workers=1, steps=None): """Use this trial with the given validation data. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 10 validation iterations on some random data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> targets = torch.rand(10, 1) >>> trial = Trial(None).with_val_data(data, targets).for_steps(10).run(1) Args: x (torch.Tensor): The validation x data to use during calls to :meth:`.run` and :meth:`.evaluate` y (torch.Tensor): The validation labels to use during calls to :meth:`.run` and :meth:`.evaluate` batch_size (int): The size of each batch to sample from the data shuffle (bool): If True, then data will be shuffled each epoch num_workers (int): Number of worker threads to use in the data loader steps (int): The number of steps per epoch to take when using this data Returns: Trial: self """ dataset = TensorDataset(x, y) dataloader = DataLoader(dataset, batch_size, shuffle=shuffle, num_workers=num_workers) self.with_val_generator(dataloader, steps=steps) return self
[docs] def for_test_steps(self, steps): """Run this trial for the given number of test steps. Note that the generator will output (None, None) if it has not been set. Useful for differentiable programming. Returns self so that methods can be chained for convenience. If steps larger than dataset size then loader will be refreshed like if it was a new epoch. If steps -1 then loader will be refreshed until stopped by STOP_TRAINING flag or similar. Example: :: # Simple trial that runs for 10 test iterations on some random data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> trial = Trial(None).with_test_data(data).for_test_steps(10).run(1) Args: steps (int): The number of test steps per epoch to run (when using :meth:`.predict`) Returns: Trial: self """ if not isinstance(steps, int): warnings.warn("Number of test steps is not an int, casting to int") steps = int(steps) self.state[torchbearer.TEST_STEPS] = steps self.state[torchbearer.TEST_DATA] = (self.state[torchbearer.TEST_GENERATOR], self.state[torchbearer.TEST_STEPS]) return self
[docs] def with_test_generator(self, generator, steps=None): """Use this trial with the given test generator. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 10 test iterations on no data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> trial = Trial(None).with_test_data(data).for_test_steps(10).run(1) Args: generator: The test data generator to use during calls to :meth:`.predict` steps (int): The number of steps per epoch to take when using this generator Returns: Trial: self """ self.state[torchbearer.TEST_GENERATOR] = generator steps = self.state[torchbearer.TEST_STEPS] if steps is None else steps steps = len(generator) if steps is None else steps self.for_test_steps(steps) return self
[docs] def with_test_data(self, x, batch_size=1, num_workers=1, steps=None): """Use this trial with the given test data. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 10 test iterations on some random data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> trial = Trial(None).with_test_data(data).for_test_steps(10).run(1) Args: x (torch.Tensor): The test x data to use during calls to :meth:`.predict` batch_size (int): The size of each batch to sample from the data num_workers (int): Number of worker threads to use in the data loader steps (int): The number of steps per epoch to take when using this data Returns: Trial: self """ dataset = TensorDataset(x) dataloader = DataLoader(dataset, batch_size, num_workers=num_workers) self.with_test_generator(dataloader, steps=steps) return self
[docs] def for_steps(self, train_steps=None, val_steps=None, test_steps=None): """Use this trial for the given number of train, val and test steps. Returns self so that methods can be chained for convenience. If steps larger than dataset size then loader will be refreshed like if it was a new epoch. If steps -1 then loader will be refreshed until stopped by STOP_TRAINING flag or similar. Example: :: # Simple trial that runs for 10 training, validation and test iterations on some random data >>> from torchbearer import Trial >>> train_data = torch.rand(10, 1) >>> val_data = torch.rand(10, 1) >>> test_data = torch.rand(10, 1) >>> trial = Trial(None).with_train_data(train_data).with_val_data(val_data).with_test_data(test_data) >>> trial.for_steps(10, 10, 10).run(1) Args: train_steps (int): The number of training steps per epoch to run val_steps (int): The number of validation steps per epoch to run test_steps (int): The number of test steps per epoch to run (when using :meth:`.predict`) Returns: Trial: self """ if train_steps is not None: self.for_train_steps(train_steps) if val_steps is not None: self.for_val_steps(val_steps) if test_steps is not None: self.for_test_steps(test_steps) return self
[docs] def with_generators(self, train_generator=None, val_generator=None, test_generator=None, train_steps=None, val_steps=None, test_steps=None): """Use this trial with the given generators. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 100 steps from a training and validation data generator >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> trainloader = DataLoader(MNIST('./data/', train=True)) >>> valloader = DataLoader(MNIST('./data/', train=False)) >>> trial = Trial(None).with_generators(trainloader, valloader, train_steps=100, val_steps=100).run(1) Args: train_generator: The training data generator to use during calls to :meth:`.run` val_generator: The validation data generator to use during calls to :meth:`.run` and :meth:`.evaluate` test_generator: The testing data generator to use during calls to :meth:`.predict` train_steps (int): The number of steps per epoch to take when using the training generator val_steps (int): The number of steps per epoch to take when using the validation generator test_steps (int): The number of steps per epoch to take when using the testing generator Returns: Trial: self """ if train_generator is not None: self.with_train_generator(train_generator, train_steps) if val_generator is not None: self.with_val_generator(val_generator, val_steps) if test_generator is not None: self.with_test_generator(test_generator, test_steps) return self
[docs] def with_data(self, x_train=None, y_train=None, x_val=None, y_val=None, x_test=None, batch_size=1, num_workers=1, train_steps=None, val_steps=None, test_steps=None, shuffle=True): """Use this trial with the given data. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs for 10 test iterations on some random data >>> from torchbearer import Trial >>> data = torch.rand(10, 1) >>> targets = torch.rand(10, 1) >>> test_data = torch.rand(10, 1) >>> trial = Trial(None).with_data(x_train=data, y_train=targets, x_test=test_data) >>> trial.for_test_steps(10).run(1) Args: x_train (torch.Tensor): The training data to use y_train (torch.Tensor): The training targets to use x_val (torch.Tensor): The validation data to use y_val (torch.Tensor): The validation targets to use x_test (torch.Tensor): The test data to use batch_size (int): Batch size to use in mini-batching num_workers (int): Number of workers to use for data loading and batching train_steps (int): Number of steps for each training pass val_steps (int): Number of steps for each validation pass test_steps (int): Number of steps for each test pass shuffle (bool): If True, shuffle training and validation data. Returns: Trial: self """ self.with_train_data(x_train, y_train, batch_size, shuffle, num_workers, train_steps) self.with_val_data(x_val, y_val, batch_size, shuffle, num_workers, val_steps) self.with_test_data(x_test, batch_size, num_workers, test_steps) return self
# Infinite steps and loading
[docs] def for_inf_train_steps(self): """Use this trial with an infinite number of training steps (until stopped via STOP_TRAINING flag or similar). Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs training data until stopped >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> trainloader = DataLoader(MNIST('./data/', train=True)) >>> trial = Trial(None).with_train_generator(trainloader).for_inf_train_steps() >>> trial.run(1) Returns: Trial: self """ self.for_train_steps(-1) return self
[docs] def for_inf_val_steps(self): """Use this trial with an infinite number of validation steps (until stopped via STOP_TRAINING flag or similar). Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs validation data until stopped >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> valloader = DataLoader(MNIST('./data/', train=False)) >>> trial = Trial(None).with_val_generator(valloader).for_inf_val_steps() >>> trial.run(1) Returns: Trial: self """ self.for_val_steps(-1) return self
[docs] def for_inf_test_steps(self): """Use this trial with an infinite number of test steps (until stopped via STOP_TRAINING flag or similar). Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs test data until stopped >>> from torchbearer import Trial >>> test_data = torch.rand(1000, 10) >>> trial = Trial(None).with_test_data(test_data).for_inf_test_steps() >>> trial.run(1) Returns: Trial: self """ self.for_test_steps(-1) return self
[docs] def for_inf_steps(self, train=True, val=True, test=True): """Use this trail with infinite steps. Returns self so that methods can be chained for convenience. Example: :: # Simple trial that runs training and test data until stopped >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> trainloader = DataLoader(MNIST('./data/', train=True)) >>> valloader = DataLoader(MNIST('./data/', train=False)) >>> trial = Trial(None).with_train_generator(trainloader).for_inf_steps(valloader) >>> trial.with_inf_test_loader(True, False, True).run(1) Args: train (bool): Use an infinite number of training steps val (bool): Use an infinite number of validation steps test (bool): Use an infinite number of test steps Returns: Trial: self """ if train: self.for_inf_train_steps() if val: self.for_inf_val_steps() if test: self.for_inf_test_steps() return self
[docs] def with_inf_train_loader(self): """Use this trial with a training iterator that refreshes when it finishes instead of each epoch. This allows for setting training steps less than the size of the generator and model will still be trained on all training samples if enough "epochs" are run. Example: :: # Simple trial that runs 10 epochs of 100 iterations of a training generator without reshuffling until all data has been seen >>> from torchbearer import Trial >>> from torchvision.datasets import MNIST >>> from torch.utils.data import DataLoader >>> trainloader = DataLoader(MNIST('./data/', train=True)) >>> trial = Trial(None).with_train_generator(trainloader).with_inf_train_loader() >>> trial.run(10) Returns: Trial: self: """ self.state[torchbearer.INF_TRAIN_LOADING] = True return self
# Customise training loop
[docs] def with_loader(self, batch_loader): """Use this trial with custom batch loader. Usually calls next on state[torchbearer.ITERATOR] and populates state[torchbearer.X] and state[torchbearer.Y_TRUE] Example: :: # Simple trial that runs with a custom loader function that populates X and Y_TRUE in state with random data >>> from torchbearer import Trial >>> def custom_loader(state): ... state[X], state[Y_TRUE] = torch.rand(5, 5), torch.rand(5, 5) >>> trial = Trial(None).with_loader(custom_loader) >>> trial.run(10) Args: batch_loader (function): Function of state that extracts data from data loader (stored under torchbearer.ITERATOR), stores it in state and sends it to the correct device Returns: Trial: self: """ self.state[torchbearer.LOADER] = batch_loader return self
[docs] def with_closure(self, closure): """Use this trial with custom closure Example: :: # Simple trial that runs with a custom closure >>> from torchbearer import Trial >>> def custom_closure(state): ... print(state[torchbearer.BATCH]) >>> trial = Trial(None).with_closure(custom_closure).for_steps(3) >>> _ = trial.run(1) 0 1 2 Args: closure (function): Function of state that defines the custom closure Returns: Trial: self: """ self.closure = closure return self
# Run
[docs] @inject_printer() def run(self, epochs=1, verbose=-1): r"""Run this trial for the given number of epochs, starting from the last trained epoch. Example: :: # Simple trial that runs with a custom closure >>> from torchbearer import Trial >>> trial = Trial(None).for_steps(100) >>> _ = trial.run(1) Args: epochs (int, optional): The number of epochs to run for verbose (int, optional): If 2: use tqdm on batch, If 1: use tqdm on epoch, If 0: display no training progress, If -1: Automatic State Requirements: - :attr:`torchbearer.state.MODEL`: Model should be callable and not none, set on Trial init Returns: list: The model history (list of tuple of steps summary and epoch metric dicts) """ state = State() state.update({ torchbearer.EPOCH: 0, torchbearer.MAX_EPOCHS: epochs, torchbearer.STOP_TRAINING: False, }) if self.state[torchbearer.MODEL] is None or not callable(self.state[torchbearer.MODEL]): warnings.warn('The Model is None or not callable which may cause issues if not deliberate') self.state[torchbearer.MODEL] = MockModel() state.update(self.state) # TODO: Swap this for something which makes `self.state` still mutable if state[torchbearer.TRAIN_GENERATOR] is not None \ or state[torchbearer.TRAIN_STEPS] is not None \ or state[torchbearer.VALIDATION_GENERATOR] is not None \ or state[torchbearer.VALIDATION_STEPS] is not None: state[torchbearer.CALLBACK_LIST].on_start(state) for state[torchbearer.EPOCH] in range(len(state[torchbearer.HISTORY]), state[torchbearer.MAX_EPOCHS]): state[torchbearer.CALLBACK_LIST].on_start_epoch(state) final_metrics = self._fit_pass(state)[torchbearer.METRICS] if state[torchbearer.STOP_TRAINING]: break final_metrics.update(self._validation_pass(state)) state[torchbearer.METRICS] = final_metrics state[torchbearer.CALLBACK_LIST].on_end_epoch(state) steps_summary = {str(torchbearer.TRAIN_STEPS): state[torchbearer.TRAIN_STEPS], str(torchbearer.VALIDATION_STEPS): state[torchbearer.VALIDATION_STEPS]} self.state[torchbearer.HISTORY].append(dict(state[torchbearer.METRICS], **steps_summary)) state[torchbearer.CALLBACK_LIST].on_checkpoint(state) if state[torchbearer.STOP_TRAINING]: break state[torchbearer.CALLBACK_LIST].on_end(state) return self.state[torchbearer.HISTORY]
@staticmethod def _new_iter(generator): if generator is None: return None if hasattr(generator, 'inf') and generator.inf: # Inf train loader deals with the iterator itself return generator.tb_iter else: return iter(generator) @inject_sampler(torchbearer.TRAIN_DATA, load_batch_standard) def _fit_pass(self, state): state.update(self.state) # TODO: Hack to make injection work, should be removed if `self.state` is mutable self.train() state[torchbearer.ITERATOR] = Trial._new_iter(state[torchbearer.GENERATOR]) state[torchbearer.METRIC_LIST].reset(state) state[torchbearer.METRICS] = {} state[torchbearer.STEPS] = 0 if state[torchbearer.STEPS] is None else state[torchbearer.STEPS] state[torchbearer.CALLBACK_LIST].on_start_training(state) for state[torchbearer.BATCH] in (range(state[torchbearer.STEPS]) if state[torchbearer.STEPS] != -1 else itertools.count()): state[torchbearer.SAMPLER](state) state[torchbearer.CALLBACK_LIST].on_sample(state) # Update parameters state[torchbearer.OPTIMIZER].step(lambda: self.closure(state)) state[torchbearer.METRICS] = state[torchbearer.METRIC_LIST].process(state.data) state[torchbearer.CALLBACK_LIST].on_step_training(state) if state[torchbearer.STOP_TRAINING]: break state[torchbearer.METRICS].update(state[torchbearer.METRIC_LIST].process_final(state.data)) state[torchbearer.CALLBACK_LIST].on_end_training(state) return state def _test_pass(self, state): with torch.no_grad(): state[torchbearer.ITERATOR] = Trial._new_iter(state[torchbearer.GENERATOR]) state[torchbearer.METRIC_LIST].reset(state) state[torchbearer.METRICS] = {} state[torchbearer.CALLBACK_LIST].on_start_validation(state) state[torchbearer.STEPS] = 0 if state[torchbearer.STEPS] is None else state[torchbearer.STEPS] for state[torchbearer.BATCH] in range(state[torchbearer.STEPS]): state[torchbearer.SAMPLER](state) state[torchbearer.CALLBACK_LIST].on_sample_validation(state) _forward_with_exceptions(torchbearer.X, torchbearer.MODEL, torchbearer.Y_PRED, state) state[torchbearer.CALLBACK_LIST].on_forward_validation(state) # Loss and metrics if torchbearer.Y_TRUE in state: # Loss Calculation try: state[torchbearer.LOSS] = state[torchbearer.CRITERION](state) except TypeError: loss_function_params = _get_param_list(state[torchbearer.Y_PRED]) + _get_param_list( state[torchbearer.Y_TRUE]) state[torchbearer.LOSS] = state[torchbearer.CRITERION](*loss_function_params) state[torchbearer.CALLBACK_LIST].on_criterion_validation(state) state[torchbearer.METRICS] = state[torchbearer.METRIC_LIST].process(state.data) state[torchbearer.CALLBACK_LIST].on_step_validation(state) if state[torchbearer.STOP_TRAINING]: break if torchbearer.Y_TRUE in state: state[torchbearer.METRICS].update(state[torchbearer.METRIC_LIST].process_final(state.data)) state[torchbearer.CALLBACK_LIST].on_end_validation(state) return state @inject_sampler(torchbearer.VALIDATION_DATA, load_batch_standard) def _validation_pass(self, state): state.update(self.state) # TODO: Hack to make injection work, should be removed if `self.state` is mutable if state[torchbearer.VALIDATION_GENERATOR] is not None or state[torchbearer.VALIDATION_STEPS] is not None: self.eval() self._test_pass(state) self.train() return state[torchbearer.METRICS]
[docs] @inject_sampler(torchbearer.VALIDATION_DATA, load_batch_standard) @inject_printer(validation_label_letter='e') def evaluate(self, verbose=-1, data_key=None): # Note: kwargs appear unused but are inspected in inject_sampler """Evaluate this trial on the validation data. Example: :: # Simple trial to evaluate on both validation and test data >>> from torchbearer import Trial >>> test_data = torch.rand(5, 5) >>> val_data = torch.rand(5, 5) >>> t = Trial(None).with_val_data(val_data).with_test_data(test_data) >>> t.evaluate(data_key=torchbearer.VALIDATION_DATA).evaluate(data_key=torchbearer.TEST_DATA) Args: verbose (int): If 2: use tqdm on batch, If 1: use tqdm on epoch, If 0: display no training progress, If -1: Automatic data_key (StateKey): Optional :class:`.StateKey` for the data to evaluate on. Default: torchbearer.VALIDATION_DATA Returns: dict: The final metric values """ state = State() state.update({ torchbearer.MAX_EPOCHS: 1, torchbearer.EPOCH: 0, torchbearer.STOP_TRAINING: False }) state.update(self.state) # TODO: Hack to make injection work, should be removed if `self.state` is mutable if state[torchbearer.GENERATOR] is not None or state[torchbearer.STEPS] is not None: state[torchbearer.CALLBACK_LIST].on_start(state) state[torchbearer.CALLBACK_LIST].on_start_epoch(state) self.eval() state = self._test_pass(state) state[torchbearer.CALLBACK_LIST].on_end_epoch(state) if len(self.state[torchbearer.HISTORY]) != 0: self.state[torchbearer.HISTORY][-1].update(state[torchbearer.METRICS]) state[torchbearer.CALLBACK_LIST].on_end(state) return state[torchbearer.METRICS] return {}
[docs] @inject_callback(AggregatePredictions()) @inject_sampler(torchbearer.TEST_DATA, load_batch_predict) @inject_printer(validation_label_letter='p') def predict(self, verbose=-1, data_key=None): # Note: kwargs appear unused but are inspected in inject_sampler """Determine predictions for this trial on the test data. Example: :: # Simple trial to predict on some validation and test data >>> from torchbearer import Trial >>> val_data = torch.rand(5, 5) >>> test_data = torch.rand(5, 5) >>> t = Trial(None).with_test_data(test_data) >>> test_predictions = t.predict(data_key=torchbearer.TEST_DATA) Args: verbose (int): If 2: use tqdm on batch, If 1: use tqdm on epoch, If 0: display no training progress, If -1: Automatic data_key (StateKey): Optional :class:`.StateKey` for the data to predict on. Default: torchbearer.TEST_DATA Returns: list: Model outputs as a list """ state = State() state.update({ torchbearer.MAX_EPOCHS: 1, torchbearer.EPOCH: 0, torchbearer.STOP_TRAINING: False }) state.update(self.state) # TODO: Hack to make injection work, should be removed if `self.state` is mutable if state[torchbearer.GENERATOR] is not None or state[torchbearer.STEPS] is not None: state[torchbearer.CALLBACK_LIST].on_start(state) state[torchbearer.CALLBACK_LIST].on_start_epoch(state) self.eval() res = self._test_pass(state)[torchbearer.FINAL_PREDICTIONS] state[torchbearer.CALLBACK_LIST].on_end_epoch(state) state[torchbearer.CALLBACK_LIST].on_end(state) return res return []
[docs] def replay(self, callbacks=None, verbose=2, one_batch=False): # TODO: Should we track if testing passes have happened? """ Replay the fit passes stored in history with given callbacks, useful when reloading a saved Trial. Note that only progress and metric information is populated in state during a replay. Example: :: >>> from torchbearer import Trial >>> state = torch.load('some_state.pt') >>> t = Trial(None).load_state_dict(state) >>> t.replay() Args: callbacks (list): List of callbacks to be run during the replay verbose (int): If 2: use tqdm on batch, If 1: use tqdm on epoch, If 0: display no training progress one_batch (bool): If True, only one batch per epoch is replayed. If False, all batches are replayed Returns: Trial: self """ if callbacks is None: callbacks = [] history = self.state[torchbearer.HISTORY] callbacks.append(get_printer(verbose=verbose, validation_label_letter='v')) callbacks = CallbackList(callbacks) state = State() state.update(self.state) state[torchbearer.STOP_TRAINING] = False state[torchbearer.MAX_EPOCHS] = len(history) callbacks.on_start(state) for i in range(len(history)): metrics = dict(history[i]) state[torchbearer.EPOCH] = i if not one_batch: state[torchbearer.TRAIN_STEPS], state[torchbearer.VALIDATION_STEPS] = metrics[str(torchbearer.TRAIN_STEPS)], metrics[str(torchbearer.VALIDATION_STEPS)] else: state[torchbearer.TRAIN_STEPS], state[torchbearer.VALIDATION_STEPS] =\ 1 if metrics[str(torchbearer.TRAIN_STEPS)] is not None else None,\ 1 if metrics[str(torchbearer.VALIDATION_STEPS)] is not None else None del metrics[str(torchbearer.TRAIN_STEPS)] del metrics[str(torchbearer.VALIDATION_STEPS)] state[torchbearer.METRICS] = metrics self._replay_pass(state, callbacks) callbacks.on_end(state) return self
def _replay_pass(self, state, callback_list): callback_list.on_start_epoch(state) all_metrics = state[torchbearer.METRICS] if state[torchbearer.TRAIN_STEPS] is not None: # Training pass state[torchbearer.STEPS] = state[torchbearer.TRAIN_STEPS] state[torchbearer.METRICS] = {key: all_metrics[key] for key in all_metrics.keys() if "val_" not in key} callback_list.on_start_training(state) for state[torchbearer.BATCH] in range(state[torchbearer.STEPS]): callback_list.on_sample(state) callback_list.on_forward(state) callback_list.on_criterion(state) callback_list.on_backward(state) callback_list.on_step_training(state) if state[torchbearer.STOP_TRAINING]: break callback_list.on_end_training(state) if state[torchbearer.VALIDATION_STEPS] is not None: # Validation pass if not state[torchbearer.STOP_TRAINING]: state[torchbearer.STEPS] = state[torchbearer.VALIDATION_STEPS] state[torchbearer.METRICS] = {key: all_metrics[key] for key in all_metrics.keys() if "val_" in key} callback_list.on_start_validation(state) for state[torchbearer.BATCH] in range(state[torchbearer.STEPS]): callback_list.on_sample_validation(state) callback_list.on_forward_validation(state) callback_list.on_criterion_validation(state) callback_list.on_step_validation(state) if state[torchbearer.STOP_TRAINING]: break callback_list.on_end_validation(state) state[torchbearer.METRICS] = all_metrics callback_list.on_end_epoch(state) return self # Device management
[docs] def train(self): """Set model and metrics to training mode. Example: :: >>> from torchbearer import Trial >>> t = Trial(None).train() Returns: Trial: self """ self.state[torchbearer.MODEL].train() self.state[torchbearer.METRIC_LIST].train() return self
[docs] def eval(self): """Set model and metrics to evaluation mode Example: :: >>> from torchbearer import Trial >>> t = Trial(None).eval() Returns: Trial: self """ self.state[torchbearer.MODEL].eval() if torchbearer.DATA in self.state: self.state[torchbearer.METRIC_LIST].eval(data_key=self.state[torchbearer.DATA]) else: self.state[torchbearer.METRIC_LIST].eval() return self
[docs] def to(self, *args, **kwargs): """ Moves and/or casts the parameters and buffers. Example: :: >>> from torchbearer import Trial >>> t = Trial(None).to('cuda:1') Args: args: See: `torch.nn.Module.to <https://pytorch.org/docs/stable/nn.html?highlight=#torch.nn.Module.to>`_ kwargs: See: `torch.nn.Module.to <https://pytorch.org/docs/stable/nn.html?highlight=#torch.nn.Module.to>`_ Returns: Trial: self """ self.state[torchbearer.MODEL].to(*args, **kwargs) for state in self.state[torchbearer.OPTIMIZER].state.values(): for k, v in state.items(): if torch.is_tensor(v): state[k] = v.to(*args, **kwargs) self.state = update_device_and_dtype(self.state, *args, **kwargs) return self
[docs] def cuda(self, device=None): """ Moves all model parameters and buffers to the GPU. Example: :: >>> from torchbearer import Trial >>> t = Trial(None).cuda() Args: device (int): if specified, all parameters will be copied to that device Returns: Trial: self """ if device is None: device = torch.cuda.current_device() self.to('cuda:' + str(device)) return self
[docs] def cpu(self): """ Moves all model parameters and buffers to the CPU. Example: :: >>> from torchbearer import Trial >>> t = Trial(None).cpu() Returns: Trial: self """ self.to('cpu') return self
# States
[docs] def state_dict(self, **kwargs): """Get a dict containing the model and optimizer states, as well as the model history. Example: :: >>> from torchbearer import Trial >>> t = Trial(None) >>> state = t.state_dict() # State dict that can now be saved with torch.save Args: kwargs: See: `torch.nn.Module.state_dict <https://pytorch.org/docs/stable/nn.html?highlight=#torch.nn.Module.state_dict>`_ Returns: dict: A dict containing parameters and persistent buffers. """ state_dict = { torchbearer.VERSION: torchbearer.__version__.replace('.dev', ''), torchbearer.MODEL: self.state[torchbearer.MODEL].state_dict(**kwargs), torchbearer.OPTIMIZER: self.state[torchbearer.OPTIMIZER].state_dict(), torchbearer.HISTORY: self.state[torchbearer.HISTORY], torchbearer.CALLBACK_LIST: self.state[torchbearer.CALLBACK_LIST].state_dict() } return state_dict
[docs] def load_state_dict(self, state_dict, resume=True, **kwargs): """Resume this trial from the given state. Expects that this trial was constructed in the same way. Optionally, just load the model state when resume=False. Example: :: >>> from torchbearer import Trial >>> t = Trial(None) >>> state = torch.load('some_state.pt') >>> t.load_state_dict(state) Args: state_dict (dict): The state dict to reload resume (bool): If True, resume from the given state. Else, just load in the model weights. kwargs: See: `torch.nn.Module.load_state_dict <https://pytorch.org/docs/stable/nn.html?highlight=#torch.nn.Module.load_state_dict>`_ Returns: Trial: self """ if resume and torchbearer.MODEL in state_dict: # torchbearer dict if torchbearer.VERSION in state_dict and state_dict[torchbearer.VERSION] != torchbearer.__version__.replace('.dev', ''): warnings.warn('This state dict was saved with a different torchbearer version, loading available keys. Consider setting resume=False') if torchbearer.MODEL in state_dict: self.state[torchbearer.MODEL].load_state_dict(state_dict[torchbearer.MODEL], **kwargs) if torchbearer.OPTIMIZER in state_dict: self.state[torchbearer.OPTIMIZER].load_state_dict(state_dict[torchbearer.OPTIMIZER]) if torchbearer.HISTORY in state_dict: self.state[torchbearer.HISTORY] = state_dict[torchbearer.HISTORY] if torchbearer.CALLBACK_LIST in state_dict: self.state[torchbearer.CALLBACK_LIST].load_state_dict(state_dict[torchbearer.CALLBACK_LIST]) elif torchbearer.MODEL in state_dict: self.state[torchbearer.MODEL].load_state_dict(state_dict[torchbearer.MODEL], **kwargs) else: # something else warnings.warn('Not a torchbearer state dict, passing to model') self.state[torchbearer.MODEL].load_state_dict(state_dict, **kwargs) return self