#
# Copyright (C) 2018 Neal Digre.
#
# This software may be modified and distributed under the terms
# of the MIT license. See the LICENSE file for details.
"""Base task class.
Todo:
* Get rid of ``model_fn`` dependency on ``input_fn``.
* LONG TERM: Training methods other than TensorFlow Estimator.
"""
import abc
import os
import re
import tensorflow as tf
from tensorflow.contrib.training import GreedyLoadBalancingStrategy
from carpedm.data.meta import MetaLoader
from carpedm.data.ops import shard_batch
from carpedm.nn.util import TOWER_NAME
from carpedm.util.train import config_optimizer
from carpedm.util.train import local_device_setter, make_hooks
from carpedm.util.train import compute_global_grads_loss, group_train_op
# Special tokens
GO_TOKEN = "<GO>"
END_TOKEN = "<END>"
PAD_TOKEN = "<PAD>"
UNK_TOKEN = "<UNK>"
[docs]class Task(object):
"""Abstract class for Tasks."""
__metaclass__ = abc.ABCMeta
[docs] def __init__(self, data_dir, task_dir, test_split='hnsd00000',
dev_split=0.1, dev_factor=1, dataset_format='tfrecords',
num_shards=8, num_threads=8, shape_store=None, shape_in=None,
vocab_size=None, min_frequency=0, seed=None, **kwargs):
"""Initializer.
Args:
data_dir (str): Directory where raw data is stored.
task_dir (str): Top-level directory for storing tasks data
and results.
test_split (float or str): Either the ratio of all data
to use for testing or specific bibliography ID(s). Use
comma-separated IDs for multiple books.
dev_split (float or str): Either the ratio of training data
to use for dev/val or specific bibliography ID(s). Use
comma-separated IDs for multiple books.
dev_factor: (int): Size of development set should be
divisible by this value. Useful for training on
multiple GPUs.
dataset_format (str): Base storage unit for the dataset.
vocab_size (int): Maximum vocab size.
min_frequency (int): Minimum frequency of type to be
included in vocab.
shape_store (tuple or None): Size to which images are resized
for storage, if needed, e.g. for TFRecords. The default
is to not perform any resize. Please see this `note on
image shape`_ for more information.
shape_in (tuple or None): Size to which images are resized
by interpolation or padding before being input to a
model. Please see this `note on image shape`_ for
more information.
num_shards (int): Number of sharded output files.
num_threads (int): Number of threads to run in parallel.
seed (int or None): Number for seeding rng.
**kwargs: Unused arguments.
"""
self._task_dir = task_dir
self._test_split = test_split
self._dev_split = dev_split
self._dataset_format = dataset_format
self._num_shards = num_shards
self._num_threads = num_threads
self._shape_store = shape_store
self._shape_in = shape_in
self.job_id = "_"
self._meta = MetaLoader(
data_dir=data_dir,
test_split=self._test_split, dev_split=self._dev_split,
dev_factor=dev_factor, vocab_size=vocab_size,
min_freq=min_frequency, reserved=self.reserved,
charset=self.character_set, image_scope=self.image_scope,
seq_len=self.sequence_length, seq_maxlen=self.max_sequence_length,
seed=seed)
# ====================== BEGIN TASK INTERFACE ==================== #
[docs] @abc.abstractmethod
def results(self, loss, tower_features, tower_preds, tower_targets,
is_training):
"""Accumulates predictions, computes metrics, and determines
the tensors to log and/or visualize.
Args:
loss (tf.float): Global loss.
tower_features (list of dict): Tower feature dicts.
tower_preds (list): Tower predictions.
tower_targets (list of tf.Tensor): Tower targets.
is_training (bool): The model is training.
Returns:
dict: The tensors to log
dict: All predictions
dict: Evaluation metrics
"""
[docs] @abc.abstractmethod
def loss_fn(self, features, model_output, targets, is_training):
"""Computes an appropriate loss for the tasks.
Must be implemented in subclass.
Args:
features (dict): Additional features for computing loss.
model_output (tf.Tensor or dict of tf.Tensor): Model output
used for computing the batch loss, e.g. class logits.
targets (tf.Tensor): Ground truth targets.
is_training (bool): The model is training.
Returns:
tf.Tensor: Losses of type 'int32' and shape [batch_size, 1]
"""
[docs] @abc.abstractmethod
def regularization(self, hparams):
"""
Args:
hparams: Hyperparameters, e.g. weight_decay
Returns:
"""
@property
def sequence_length(self):
"""If max_sequence_length is None, this gives the deterministic
length of a sequence, else the minimum sequence length.
Only used if ``image_scope == 'seq'``.
Returns:
int or None:
"""
return None
@property
def max_sequence_length(self):
"""Maximum sequence length.
Only used if ``image_scope == 'seq'``.
Returns:
int or None:
"""
return None
@property
def character_set(self):
"""The Japanese characters (e.g. kana, kanji) of interest.
Returned value may include the following component sets:
* hiragana
* katakana
* kana
* kanji
* punct (punctuation)
* misc
Returns:
str: ID for the character set
"""
return 'all'
@property
def reserved(self):
"""Reserved tokens for the tasks.
The index of each token in the returned tuple will be used as
its integer ID.
Returns:
tuple: The reserved characters
"""
return PAD_TOKEN, UNK_TOKEN
@property
def num_classes(self):
"""Total number of output nodes, includes reserved tokens."""
return self._meta.vocab.get_num_classes()
@property
@abc.abstractmethod
def target(self):
"""Determines the value against which predictions are compared.
For a list of possible targets, refer to
carpedm.data.util.ImageMeta.generate_features()
Returns:
str: feature key for the target
"""
@property
@abc.abstractmethod
def image_scope(self):
"""Portion of original image for each example.
Available scopes are 'char', 'seq', 'line', 'page'.
Returns:
str: Task image scope
"""
@property
@abc.abstractmethod
def chunk(self):
"""When creating a dataset, instead of using the original image,
extract non-overlapping chunks of size `image_shape` and the
corresponding features from the original image on a regular
grid. The original image is padded to divide evenly by
`image_shape`.
Note: currently only objects that are entirely contained in
the block will have its features propagated.
Returns:
bool:
"""
@property
@abc.abstractmethod
def character(self):
"""When creating a dataset, tell the meta_loader to generate
character features, e.g. label, bbox.
Returns:
bool: Use character features.
"""
@property
@abc.abstractmethod
def line(self):
"""When creating a dataset, tell the meta_loader to generate
line features, e.g. bbox.
Returns:
bool: Use line features.
"""
@property
@abc.abstractmethod
def label(self):
"""When creating a dataset, generate character labels.
Returns:
bool: Use character labels
"""
@property
@abc.abstractmethod
def bbox(self):
"""When creating a dataset, generate appropriate bounding boxes
for the tasks (determined by e.g. self.character, self.line).
Returns:
bool: Use bounding boxes.
"""
@property
@abc.abstractmethod
def sparse_labels(self):
"""Generate labels as a SparseTensor, e.g. for CTC loss.
Returns:
(bool): Use sparse labels.
"""
# ====================== END TASK INTERFACE ====================== #
@property
def task_data_dir(self):
"""Directory where tasks data is stored.
Returns:
str
"""
shape = re.sub(
r'([,])', '_', re.sub(r'([() ])', '', str(self._shape_store)))
data_split = "test={}_dev={}".format(self._test_split, self._dev_split)
dir_path = os.path.join(self._task_dir, self.task_id, 'data',
self._dataset_format, shape, data_split)
return dir_path
@property
def task_log_dir(self):
return os.path.join(self._task_dir, self.task_id, 'results')
@property
def task_id(self):
num_classes = self._meta.vocab.get_num_classes() - len(self.reserved)
return "{}_{}-{}".format(self.__class__.__name__,
self.character_set,
num_classes)
[docs] def model_fn(self, model, variable_strategy, num_gpus, num_workers,
devices=None):
""" Model function used by TensorFlow Estimator class.
Args:
model (pmjtc.models.generic.Model): The models to run.
variable_strategy (str): Where to locate variable
operations, either 'CPU' or 'GPU'.
num_gpus (int): Number of GPUs to use, if available.
devices (tuple): Specific devices to use. If provided,
overrides num_gpus.
num_workers (int): Parameter for distributed training.
Returns:
"""
if num_gpus == 0:
num_devices = 1
device_type = 'cpu'
else:
num_devices = num_gpus
device_type = 'gpu'
if not devices:
devices = range(num_devices)
def _model_fn(features, labels, mode, params):
is_training = (mode == tf.estimator.ModeKeys.TRAIN)
tower_features = features
tower_targets = labels
tower_losses = []
tower_gradvars = []
tower_preds = []
data_format = params.data_format
if not data_format:
if num_gpus == 0:
data_format = 'channels_last'
else:
data_format = 'channels_first'
if data_format != self._original_format:
if self._original_format == 'channels_last':
# Computation requires channels_first.
axes_order = [0, 3, 1, 2]
else:
# Computation requires channels_last.
axes_order = [0, 2, 3, 1]
else:
axes_order = None
for i in devices:
worker_device = '/{}:{}'.format(device_type, i)
if variable_strategy == 'CPU':
device_setter = local_device_setter(
worker_device=worker_device)
elif variable_strategy == 'GPU':
device_setter = local_device_setter(
ps_device_type='gpu',
worker_device=worker_device,
ps_strategy=GreedyLoadBalancingStrategy(
num_gpus, tf.contrib.training.byte_size_load_fn))
else:
raise ValueError("variable_strategy must be CPU or GPU.")
with tf.variable_scope(model.name, reuse=bool(i != 0)):
with tf.name_scope(TOWER_NAME + '_%d' % i) as name_scope:
with tf.device(device_setter):
loss, gradvars, preds = _tower_fn(
features=tower_features[i],
targets=tower_targets[i],
data_format=data_format,
axes_order=axes_order,
is_training=is_training,
params=params)
tower_losses.append(loss)
tower_gradvars.append(gradvars)
tower_preds.append(preds)
if i == 0:
update_ops = tf.get_collection(
tf.GraphKeys.UPDATE_OPS, name_scope)
# Device that runs the ops to apply global gradient updates.
if variable_strategy == 'GPU':
consolidation_device = '/gpu:0'
else:
consolidation_device = '/cpu:0'
with tf.device(consolidation_device):
gradvars, loss = compute_global_grads_loss(tower_gradvars,
tower_losses)
optimizer = config_optimizer(params)
train_op = group_train_op(optimizer, gradvars, update_ops)
tensors_to_log, predictions, metrics = self.results(
loss, tower_features, tower_preds, tower_targets,
is_training)
train_hooks = make_hooks(tensors_to_log, optimizer,
num_workers, params)
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions,
loss=loss,
train_op=train_op,
training_hooks=train_hooks,
eval_metric_ops=metrics)
def _tower_fn(features, targets, data_format, axes_order, is_training,
params):
preds = model.forward_pass(
features, data_format, axes_order, is_training)
if params.init_dir:
if os.path.exists(params.init_dir):
variable_mapping = model.initialize_pretrained(
params.init_dir)
# First initialization only
if not os.path.exists(
os.path.join(self.task_log_dir, self.job_id)):
tf.train.init_from_checkpoint(params.init_dir,
variable_mapping)
else:
print("Initialization directory %s does not exist."
% params.init_dir, "Using default initialization.")
loss = tf.reduce_mean(
self.loss_fn(features, preds, targets, is_training))
loss += self.regularization(params)
# gradient
model_params = tf.trainable_variables()
gradient = tf.gradients(loss, model_params)
if params.gradient_clipping:
cc = params.gradient_clipping
gradient = [tf.clip_by_value(grad, -cc, cc)
for grad in gradient]
gradvars = zip(gradient, model_params)
return loss, gradvars, preds
return _model_fn