Source code for carpedm.tasks.generic

#
# Copyright (C) 2018 Neal Digre.
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE file for details.

"""Base task class.

Todo:
    * Get rid of ``model_fn`` dependency on ``input_fn``.
    * LONG TERM: Training methods other than TensorFlow Estimator.
"""
import abc
import os
import re

import tensorflow as tf
from tensorflow.contrib.training import GreedyLoadBalancingStrategy

from carpedm.data.meta import MetaLoader
from carpedm.data.ops import shard_batch
from carpedm.nn.util import TOWER_NAME
from carpedm.util.train import config_optimizer
from carpedm.util.train import local_device_setter, make_hooks
from carpedm.util.train import compute_global_grads_loss, group_train_op


# Special tokens
GO_TOKEN = "<GO>"
END_TOKEN = "<END>"
PAD_TOKEN = "<PAD>"
UNK_TOKEN = "<UNK>"


[docs]class Task(object): """Abstract class for Tasks.""" __metaclass__ = abc.ABCMeta
[docs] def __init__(self, data_dir, task_dir, test_split='hnsd00000', dev_split=0.1, dev_factor=1, dataset_format='tfrecords', num_shards=8, num_threads=8, shape_store=None, shape_in=None, vocab_size=None, min_frequency=0, seed=None, **kwargs): """Initializer. Args: data_dir (str): Directory where raw data is stored. task_dir (str): Top-level directory for storing tasks data and results. test_split (float or str): Either the ratio of all data to use for testing or specific bibliography ID(s). Use comma-separated IDs for multiple books. dev_split (float or str): Either the ratio of training data to use for dev/val or specific bibliography ID(s). Use comma-separated IDs for multiple books. dev_factor: (int): Size of development set should be divisible by this value. Useful for training on multiple GPUs. dataset_format (str): Base storage unit for the dataset. vocab_size (int): Maximum vocab size. min_frequency (int): Minimum frequency of type to be included in vocab. shape_store (tuple or None): Size to which images are resized for storage, if needed, e.g. for TFRecords. The default is to not perform any resize. Please see this `note on image shape`_ for more information. shape_in (tuple or None): Size to which images are resized by interpolation or padding before being input to a model. Please see this `note on image shape`_ for more information. num_shards (int): Number of sharded output files. num_threads (int): Number of threads to run in parallel. seed (int or None): Number for seeding rng. **kwargs: Unused arguments. """ self._task_dir = task_dir self._test_split = test_split self._dev_split = dev_split self._dataset_format = dataset_format self._num_shards = num_shards self._num_threads = num_threads self._shape_store = shape_store self._shape_in = shape_in self.job_id = "_" self._meta = MetaLoader( data_dir=data_dir, test_split=self._test_split, dev_split=self._dev_split, dev_factor=dev_factor, vocab_size=vocab_size, min_freq=min_frequency, reserved=self.reserved, charset=self.character_set, image_scope=self.image_scope, seq_len=self.sequence_length, seq_maxlen=self.max_sequence_length, seed=seed)
# ====================== BEGIN TASK INTERFACE ==================== #
[docs] @abc.abstractmethod def results(self, loss, tower_features, tower_preds, tower_targets, is_training): """Accumulates predictions, computes metrics, and determines the tensors to log and/or visualize. Args: loss (tf.float): Global loss. tower_features (list of dict): Tower feature dicts. tower_preds (list): Tower predictions. tower_targets (list of tf.Tensor): Tower targets. is_training (bool): The model is training. Returns: dict: The tensors to log dict: All predictions dict: Evaluation metrics """
[docs] @abc.abstractmethod def loss_fn(self, features, model_output, targets, is_training): """Computes an appropriate loss for the tasks. Must be implemented in subclass. Args: features (dict): Additional features for computing loss. model_output (tf.Tensor or dict of tf.Tensor): Model output used for computing the batch loss, e.g. class logits. targets (tf.Tensor): Ground truth targets. is_training (bool): The model is training. Returns: tf.Tensor: Losses of type 'int32' and shape [batch_size, 1] """
[docs] @abc.abstractmethod def regularization(self, hparams): """ Args: hparams: Hyperparameters, e.g. weight_decay Returns: """
@property def sequence_length(self): """If max_sequence_length is None, this gives the deterministic length of a sequence, else the minimum sequence length. Only used if ``image_scope == 'seq'``. Returns: int or None: """ return None @property def max_sequence_length(self): """Maximum sequence length. Only used if ``image_scope == 'seq'``. Returns: int or None: """ return None @property def character_set(self): """The Japanese characters (e.g. kana, kanji) of interest. Returned value may include the following component sets: * hiragana * katakana * kana * kanji * punct (punctuation) * misc Returns: str: ID for the character set """ return 'all' @property def reserved(self): """Reserved tokens for the tasks. The index of each token in the returned tuple will be used as its integer ID. Returns: tuple: The reserved characters """ return PAD_TOKEN, UNK_TOKEN @property def num_classes(self): """Total number of output nodes, includes reserved tokens.""" return self._meta.vocab.get_num_classes() @property @abc.abstractmethod def target(self): """Determines the value against which predictions are compared. For a list of possible targets, refer to carpedm.data.util.ImageMeta.generate_features() Returns: str: feature key for the target """ @property @abc.abstractmethod def image_scope(self): """Portion of original image for each example. Available scopes are 'char', 'seq', 'line', 'page'. Returns: str: Task image scope """ @property @abc.abstractmethod def chunk(self): """When creating a dataset, instead of using the original image, extract non-overlapping chunks of size `image_shape` and the corresponding features from the original image on a regular grid. The original image is padded to divide evenly by `image_shape`. Note: currently only objects that are entirely contained in the block will have its features propagated. Returns: bool: """ @property @abc.abstractmethod def character(self): """When creating a dataset, tell the meta_loader to generate character features, e.g. label, bbox. Returns: bool: Use character features. """ @property @abc.abstractmethod def line(self): """When creating a dataset, tell the meta_loader to generate line features, e.g. bbox. Returns: bool: Use line features. """ @property @abc.abstractmethod def label(self): """When creating a dataset, generate character labels. Returns: bool: Use character labels """ @property @abc.abstractmethod def bbox(self): """When creating a dataset, generate appropriate bounding boxes for the tasks (determined by e.g. self.character, self.line). Returns: bool: Use bounding boxes. """ @property @abc.abstractmethod def sparse_labels(self): """Generate labels as a SparseTensor, e.g. for CTC loss. Returns: (bool): Use sparse labels. """ # ====================== END TASK INTERFACE ====================== # @property def task_data_dir(self): """Directory where tasks data is stored. Returns: str """ shape = re.sub( r'([,])', '_', re.sub(r'([() ])', '', str(self._shape_store))) data_split = "test={}_dev={}".format(self._test_split, self._dev_split) dir_path = os.path.join(self._task_dir, self.task_id, 'data', self._dataset_format, shape, data_split) return dir_path @property def task_log_dir(self): return os.path.join(self._task_dir, self.task_id, 'results') @property def task_id(self): num_classes = self._meta.vocab.get_num_classes() - len(self.reserved) return "{}_{}-{}".format(self.__class__.__name__, self.character_set, num_classes)
[docs] def input_fn(self, batch_size, subset, num_shards, overwrite=False): """Returns (sharded) batches of data. Args: batch_size (int): The batch_size subset (str): The subset to use. One of {train, dev, test}. num_shards (int): Number of data_shards to produce. overwrite (bool): Overwrite existing data. Returns: (list): Features of length num_shards. (list): Labels of length num_shards. """ dataset = self._meta.generate_dataset( out_dir=self.task_data_dir, subset=subset, target_id=self.target, num_shards=self._num_shards, num_threads=self._num_threads, format_store=self._dataset_format, shape_store=self._shape_store, shape_in=self._shape_in, chunk=self.chunk, character=self.character, line=self.line, label=self.label, bbox=self.bbox, overwrite=overwrite) self._original_format = dataset.format def _input_fn(): feature_batch, label_batch = dataset.make_batch(batch_size) if num_shards <= 1: # No GPU available or only 1 GPU. return [feature_batch], [label_batch] else: return shard_batch(feature_batch, label_batch, batch_size, num_shards) return _input_fn
[docs] def model_fn(self, model, variable_strategy, num_gpus, num_workers, devices=None): """ Model function used by TensorFlow Estimator class. Args: model (pmjtc.models.generic.Model): The models to run. variable_strategy (str): Where to locate variable operations, either 'CPU' or 'GPU'. num_gpus (int): Number of GPUs to use, if available. devices (tuple): Specific devices to use. If provided, overrides num_gpus. num_workers (int): Parameter for distributed training. Returns: """ if num_gpus == 0: num_devices = 1 device_type = 'cpu' else: num_devices = num_gpus device_type = 'gpu' if not devices: devices = range(num_devices) def _model_fn(features, labels, mode, params): is_training = (mode == tf.estimator.ModeKeys.TRAIN) tower_features = features tower_targets = labels tower_losses = [] tower_gradvars = [] tower_preds = [] data_format = params.data_format if not data_format: if num_gpus == 0: data_format = 'channels_last' else: data_format = 'channels_first' if data_format != self._original_format: if self._original_format == 'channels_last': # Computation requires channels_first. axes_order = [0, 3, 1, 2] else: # Computation requires channels_last. axes_order = [0, 2, 3, 1] else: axes_order = None for i in devices: worker_device = '/{}:{}'.format(device_type, i) if variable_strategy == 'CPU': device_setter = local_device_setter( worker_device=worker_device) elif variable_strategy == 'GPU': device_setter = local_device_setter( ps_device_type='gpu', worker_device=worker_device, ps_strategy=GreedyLoadBalancingStrategy( num_gpus, tf.contrib.training.byte_size_load_fn)) else: raise ValueError("variable_strategy must be CPU or GPU.") with tf.variable_scope(model.name, reuse=bool(i != 0)): with tf.name_scope(TOWER_NAME + '_%d' % i) as name_scope: with tf.device(device_setter): loss, gradvars, preds = _tower_fn( features=tower_features[i], targets=tower_targets[i], data_format=data_format, axes_order=axes_order, is_training=is_training, params=params) tower_losses.append(loss) tower_gradvars.append(gradvars) tower_preds.append(preds) if i == 0: update_ops = tf.get_collection( tf.GraphKeys.UPDATE_OPS, name_scope) # Device that runs the ops to apply global gradient updates. if variable_strategy == 'GPU': consolidation_device = '/gpu:0' else: consolidation_device = '/cpu:0' with tf.device(consolidation_device): gradvars, loss = compute_global_grads_loss(tower_gradvars, tower_losses) optimizer = config_optimizer(params) train_op = group_train_op(optimizer, gradvars, update_ops) tensors_to_log, predictions, metrics = self.results( loss, tower_features, tower_preds, tower_targets, is_training) train_hooks = make_hooks(tensors_to_log, optimizer, num_workers, params) return tf.estimator.EstimatorSpec( mode=mode, predictions=predictions, loss=loss, train_op=train_op, training_hooks=train_hooks, eval_metric_ops=metrics) def _tower_fn(features, targets, data_format, axes_order, is_training, params): preds = model.forward_pass( features, data_format, axes_order, is_training) if params.init_dir: if os.path.exists(params.init_dir): variable_mapping = model.initialize_pretrained( params.init_dir) # First initialization only if not os.path.exists( os.path.join(self.task_log_dir, self.job_id)): tf.train.init_from_checkpoint(params.init_dir, variable_mapping) else: print("Initialization directory %s does not exist." % params.init_dir, "Using default initialization.") loss = tf.reduce_mean( self.loss_fn(features, preds, targets, is_training)) loss += self.regularization(params) # gradient model_params = tf.trainable_variables() gradient = tf.gradients(loss, model_params) if params.gradient_clipping: cc = params.gradient_clipping gradient = [tf.clip_by_value(grad, -cc, cc) for grad in gradient] gradvars = zip(gradient, model_params) return loss, gradvars, preds return _model_fn