import os
import math
import importlib
import psutil
import numpy as np
import tensorflow as tf
import tensorflow.keras.optimizers as tf_optimizers
from typing import Tuple, Optional, Any, List, Generator
from tensorflow.keras.layers import (
Dense,
Input,
LSTM,
Conv1D,
MaxPooling1D,
Dropout,
Activation,
BatchNormalization,
Bidirectional,
)
from tensorflow.keras.callbacks import (
EarlyStopping,
ModelCheckpoint,
TensorBoard,
CSVLogger,
)
from tensorflow.keras.models import Model, load_model, save_model
from tensorflow.keras import backend as K
from .utils import Utils
from .logger import Logger
from .exception import TerminalException
from .hyperparameters import Hyperparameters
Utils.suppress_lib_logs()
[docs]class Network(object):
""" Network factory creates DNNs.
Not thread safe since the session of keras_backend is global.
Only factory methods are allowed when generating DNN objects.
"""
LSTM = "lstm"
BI_LSTM = "bi_lstm"
CONV_1D = "conv_1d"
TYPES = [LSTM, BI_LSTM, CONV_1D]
__secret = object()
__UNKNOWN = "unknown"
def __init__(
self,
secret: Optional[object],
input_shape: Tuple,
hyperparameters: Hyperparameters,
model_path: Optional[str] = None,
backend: str = "tensorflow"
) -> None:
""" Network object initialiser used by factory methods.
Arguments:
secret {object} -- A hash only known by factory methods.
input_shape {tuple} -- A shape tuple (integers), not including the batch size.
hyperparameters {Hyperparameters} -- A configuration for hyperparameters used for training.
model_path {string} -- The path to the model file.
backend {string} -- The tensor manipulation backend (default: {tensorflow}). Only tensorflow is supported
by TF 2 and this parameter is here only for a historical reason.
Raises:
NotImplementedError -- Thrown when any network attributes are modified.
"""
assert (
secret == Network.__secret
), "Only factory methods are supported when creating instances"
Network.__set_keras_backend(backend)
if (hyperparameters.network_type == Network.__UNKNOWN and model_path is not None):
self.__model = load_model(model_path)
self.__input_shape = self.__model.input_shape[1:]
elif hyperparameters.network_type == Network.LSTM:
self.__input_shape = input_shape
self.__model = self.__lstm(
input_shape, hyperparameters
)
elif hyperparameters.network_type == Network.BI_LSTM:
self.__input_shape = input_shape
self.__model = self.__lstm(
input_shape, hyperparameters, is_bidirectional=True
)
elif hyperparameters.network_type == Network.CONV_1D:
self.__input_shape = input_shape
self.__model = self.__conv1d(
input_shape, hyperparameters
)
else:
raise ValueError("Unknown network type. Should be one of %s", str(Network.TYPES))
self.__n_type = hyperparameters.network_type
self.hyperparameters = hyperparameters
self.__LOGGER = Logger().get_logger(__name__)
[docs] @classmethod
def get_network(cls, input_shape: Tuple, hyperparameters: Hyperparameters) -> "Network":
"""Factory method for creating a network.
Arguments:
input_shape {tuple} -- A shape tuple (integers), not including the batch size.
hyperparameters {Hyperparameters} -- A configuration for hyperparameters used for training.
Returns:
Network -- A constructed network object.
"""
return cls(
cls.__secret,
input_shape,
hyperparameters
)
[docs] @classmethod
def get_from_model(cls, model_path: str, hyperparameters: Hyperparameters) -> "Network":
"""Load model into a network object.
Arguments:
model_path {string} -- The path to the model file.
hyperparameters {Hyperparameters} -- A configuration for hyperparameters used for training.
"""
hp = hyperparameters.clone()
hp.network_type = Network.__UNKNOWN
return cls(
cls.__secret,
(),
hp,
model_path=model_path
)
[docs] @classmethod
def save_model_and_weights(
cls, model_filepath: str, weights_filepath: str, combined_filepath: str
) -> None:
"""Combine model and weights and save to a file
Arguments:
model_filepath {string} -- The path to the model file.
weights_filepath {string} -- The path to the weights file.
"""
model = load_model(model_filepath)
model.load_weights(weights_filepath)
model.save(combined_filepath)
[docs] @staticmethod
def load_model_and_weights(model_filepath: str, weights_filepath: str, hyperparameters: Hyperparameters) -> "Network":
"""Load weights to the Network model.
Arguments:
model_filepath {string} -- The model file path.
weights_filepath {string} -- The weights file path.
hyperparameters {Hyperparameters} -- A configuration for hyperparameters used for training.
Returns:
Network -- Reconstructed network object.
"""
network = Network.get_from_model(model_filepath, hyperparameters)
network.__model.load_weights(weights_filepath)
return network
@property
def input_shape(self) -> Tuple:
"""Get the input shape of the network.
Returns:
tuple -- The input shape of the network.
"""
return self.__input_shape
@property
def n_type(self) -> str:
"""Get the type of the network.
Returns:
string -- The type of the network.
"""
return self.__n_type
@property
def summary(self) -> None:
"""Print out the summary of the network.
"""
self.__model.summary()
@property
def layers(self) -> List[Any]:
"""Get the layers of the network.
Returns:
list -- The statck of layers contained by the network
"""
return self.__model.layers
[docs] def get_predictions(self, input_data: np.ndarray, weights_filepath: str) -> np.ndarray:
"""Get a Numpy array of predictions.
Arguments:
input_data {numpy.ndarray} -- The input data, as a Numpy array.
weights_filepath {string} -- The weights file path.
Returns:
numpy.ndarray -- The Numpy array of predictions.
"""
self.__model.load_weights(weights_filepath)
return self.__model.predict_on_batch(input_data)
[docs] def fit_and_get_history(
self,
train_data: np.ndarray,
labels: np.ndarray,
model_filepath: str,
weights_filepath: str,
logs_dir: str,
training_log: str,
resume: bool,
) -> Tuple[List[float], List[float]]:
"""Fit the training data to the network and save the network model as a HDF file.
Arguments:
train_data {numpy.array} -- The Numpy array of training data.
labels {numpy.array} -- The Numpy array of training labels.
model_filepath {string} -- The model file path.
weights_filepath {string} -- The weights file path.
logs_dir {string} -- The TensorBoard log file directory.
training_log {string} -- The path to the log file of epoch results.
resume {bool} -- True to continue with previous training result or False to start a new one (default: {False}).
Returns:
tuple -- A tuple contains validation losses and validation accuracies.
"""
csv_logger = (
CSVLogger(training_log)
if not resume
else CSVLogger(training_log, append=True)
)
checkpoint = ModelCheckpoint(
filepath=weights_filepath,
monitor=self.hyperparameters.monitor,
verbose=1,
save_best_only=False,
save_weights_only=True,
)
tensorboard = TensorBoard(
log_dir=logs_dir,
histogram_freq=0,
write_graph=True,
write_images=True,
)
earlyStopping = EarlyStopping(monitor=self.hyperparameters.monitor, min_delta=self.hyperparameters.es_min_delta,
mode=self.hyperparameters.es_mode, patience=self.hyperparameters.es_patience,
verbose=1)
callbacks_list = [
checkpoint,
tensorboard,
csv_logger,
earlyStopping,
]
if not resume:
Optimizer = getattr(tf_optimizers, self.hyperparameters.optimizer)
self.__model.compile(
loss=self.hyperparameters.loss,
optimizer=Optimizer(learning_rate=self.hyperparameters.learning_rate),
metrics=self.hyperparameters.metrics,
)
initial_epoch = 0
if resume:
assert os.path.isfile(training_log), "{} does not exist and is required by training resumption".format(
training_log)
training_log_file = open(training_log)
initial_epoch += sum(1 for _ in training_log_file) - 1
training_log_file.close()
assert self.hyperparameters.epochs > initial_epoch, \
"The existing model has been trained for {0} epochs. Make sure the total epochs are larger than {0}".format(initial_epoch)
try:
hist = self.__model.fit(
train_data,
labels,
epochs=self.hyperparameters.epochs,
batch_size=self.hyperparameters.batch_size,
shuffle=True,
validation_split=self.hyperparameters.validation_split,
verbose=1,
callbacks=callbacks_list,
initial_epoch=initial_epoch,
)
except KeyboardInterrupt:
self.__LOGGER.warning("Training interrupted by the user")
raise TerminalException("Training interrupted by the user")
finally:
save_model(self.__model, model_filepath)
self.__LOGGER.warning("Model saved to %s" % model_filepath)
return hist.history["val_loss"], hist.history["val_acc"] if int(tf.__version__.split(".")[0]) < 2 else hist.history["val_accuracy"]
[docs] def fit_with_generator(
self,
train_data_raw: np.ndarray,
labels_raw: np.ndarray,
model_filepath: str,
weights_filepath: str,
logs_dir: str,
training_log: str,
resume: bool,
) -> Tuple[List[float], List[float]]:
"""Fit the training data to the network and save the network model as a HDF file.
Arguments:
train_data_raw {list} -- The HDF5 raw training data.
labels_raw {list} -- The HDF5 raw training labels.
model_filepath {string} -- The model file path.
weights_filepath {string} -- The weights file path.
logs_dir {string} -- The TensorBoard log file directory.
training_log {string} -- The path to the log file of epoch results.
resume {bool} -- True to continue with previous training result or False to start a new one (default: {False}).
Returns:
tuple -- A tuple contains validation losses and validation accuracies.
"""
initial_epoch = 0
batch_size = self.hyperparameters.batch_size
validation_split = self.hyperparameters.validation_split
csv_logger = (
CSVLogger(training_log)
if not resume
else CSVLogger(training_log, append=True)
)
checkpoint = ModelCheckpoint(
filepath=weights_filepath,
monitor=self.hyperparameters.monitor,
verbose=1,
save_best_only=False,
save_weights_only=True,
)
tensorboard = TensorBoard(
log_dir=logs_dir,
histogram_freq=0,
write_graph=True,
write_images=True,
)
earlyStopping = EarlyStopping(monitor=self.hyperparameters.monitor, min_delta=self.hyperparameters.es_min_delta,
mode=self.hyperparameters.es_mode, patience=self.hyperparameters.es_patience,
verbose=1)
callbacks_list = [
checkpoint,
tensorboard,
csv_logger,
earlyStopping,
]
if not resume:
Optimizer = getattr(tf_optimizers, self.hyperparameters.optimizer)
self.__model.compile(
loss=self.hyperparameters.loss,
optimizer=Optimizer(learning_rate=self.hyperparameters.learning_rate),
metrics=self.hyperparameters.metrics,
)
if resume:
assert os.path.isfile(training_log), "{} does not exist and is required by training resumption".format(
training_log)
training_log_file = open(training_log)
initial_epoch += sum(1 for _ in training_log_file) - 1
training_log_file.close()
assert self.hyperparameters.epochs > initial_epoch, \
"The existing model has been trained for {0} epochs. Make sure the total epochs are larger than {0}".format(initial_epoch)
train_generator = self.__generator(train_data_raw, labels_raw, batch_size, validation_split, is_validation=False)
test_generator = self.__generator(train_data_raw, labels_raw, batch_size, validation_split, is_validation=True)
steps_per_epoch = math.ceil(float(train_data_raw.shape[0]) * (1 - validation_split) / batch_size)
validation_steps = math.ceil(float(train_data_raw.shape[0]) * validation_split / batch_size)
try:
hist = self.__model.fit(
train_generator,
steps_per_epoch=steps_per_epoch,
validation_data=test_generator,
validation_steps=validation_steps,
epochs=self.hyperparameters.epochs,
shuffle=False,
callbacks=callbacks_list,
initial_epoch=initial_epoch,
)
except KeyboardInterrupt:
self.__LOGGER.warning("Training interrupted by the user")
raise TerminalException("Training interrupted by the user")
finally:
self.__model.save(model_filepath)
self.__LOGGER.warning("Model saved to %s" % model_filepath)
return hist.history["val_loss"], hist.history["val_acc"] if int(tf.__version__.split(".")[0]) < 2 else hist.history["val_accuracy"]
[docs] @classmethod
def simple_fit(
cls,
input_shape: Tuple,
train_data: np.ndarray,
labels: np.ndarray,
hyperparameters: Hyperparameters,
) -> Tuple[List[float], List[float]]:
"""Fit the training data to the network and save the network model as a HDF file.
Arguments:
input_shape {tuple} -- A shape tuple (integers), not including the batch size.
train_data {numpy.array} -- The Numpy array of training data.
labels {numpy.array} -- The Numpy array of training labels.
hyperparameters {Hyperparameters} -- A configuration for hyperparameters used for training.
Returns:
tuple -- A tuple contains validation losses and validation accuracies.
"""
network = cls(cls.__secret, input_shape, hyperparameters)
Optimizer = getattr(tf_optimizers, hyperparameters.optimizer)
network.__model.compile(
loss=hyperparameters.loss,
optimizer=Optimizer(learning_rate=hyperparameters.learning_rate),
metrics=hyperparameters.metrics,
)
initial_epoch = 0
hist = network.__model.fit(
train_data,
labels,
epochs=hyperparameters.epochs,
batch_size=hyperparameters.batch_size,
shuffle=True,
validation_split=hyperparameters.validation_split,
verbose=1,
initial_epoch=initial_epoch,
)
return hist.history["val_loss"], hist.history["val_acc"] if int(tf.__version__.split(".")[0]) < 2 else hist.history["val_accuracy"]
[docs] @classmethod
def simple_fit_with_generator(
cls,
input_shape: Tuple,
train_data_raw: np.ndarray,
labels_raw: np.ndarray,
hyperparameters: Hyperparameters,
) -> Tuple[List[float], List[float]]:
"""Fit the training data to the network and save the network model as a HDF file.
Arguments:
input_shape {tuple} -- A shape tuple (integers), not including the batch size.
train_data_raw {list} -- The HDF5 raw training data.
labels_raw {list} -- The HDF5 raw training labels.
hyperparameters {Hyperparameters} -- A configuration for hyperparameters used for training.
Returns:
tuple -- A tuple contains validation losses and validation accuracies.
"""
network = cls(cls.__secret, input_shape, hyperparameters)
initial_epoch = 0
batch_size = hyperparameters.batch_size
validation_split = hyperparameters.validation_split
Optimizer = getattr(tf_optimizers, hyperparameters.optimizer)
network.__model.compile(
loss=hyperparameters.loss,
optimizer=Optimizer(learning_rate=hyperparameters.learning_rate),
metrics=hyperparameters.metrics,
)
train_generator = cls.__generator(train_data_raw, labels_raw, batch_size, validation_split, is_validation=False)
test_generator = cls.__generator(train_data_raw, labels_raw, batch_size, validation_split, is_validation=True)
steps_per_epoch = math.ceil(float(train_data_raw.shape[0]) * (1 - validation_split) / batch_size)
validation_steps = math.ceil(float(train_data_raw.shape[0]) * validation_split / batch_size)
hist = network.__model.fit(
train_generator,
steps_per_epoch=steps_per_epoch,
validation_data=test_generator,
validation_steps=validation_steps,
epochs=hyperparameters.epochs,
shuffle=False,
initial_epoch=initial_epoch,
)
return hist.history["val_loss"], hist.history["val_acc"] if int(tf.__version__.split(".")[0]) < 2 else hist.history["val_accuracy"]
[docs] @staticmethod
def reset() -> None:
K.clear_session()
@staticmethod
def __lstm(input_shape: Tuple, hyperparameters: Hyperparameters, is_bidirectional: bool = False) -> Model:
inputs = Input(shape=input_shape)
hidden = BatchNormalization()(inputs)
for nodes in hyperparameters.front_hidden_size:
hidden = Bidirectional(LSTM(nodes))(hidden) if is_bidirectional else LSTM(nodes)(hidden)
hidden = BatchNormalization()(hidden)
hidden = Activation("relu")(hidden)
hidden = Dropout(hyperparameters.dropout)(hidden)
for nodes in hyperparameters.back_hidden_size:
hidden = Dense(nodes)(hidden)
hidden = BatchNormalization()(hidden)
hidden = Activation("relu")(hidden)
hidden = Dropout(hyperparameters.dropout)(hidden)
hidden = Dense(1)(hidden)
outputs = Activation("sigmoid")(hidden)
return Model(inputs, outputs)
@staticmethod
def __conv1d(input_shape: Tuple, hyperparameters: Hyperparameters) -> Model:
inputs = Input(shape=input_shape)
hidden = BatchNormalization()(inputs)
for nodes in hyperparameters.front_hidden_size:
hidden = Conv1D(filters=nodes, kernel_size=2, activation="relu", input_shape=hidden.shape[1:])(hidden)
hidden = MaxPooling1D(pool_size=1)(hidden)
hidden = BatchNormalization()(hidden)
hidden = Dropout(hyperparameters.dropout)(hidden)
for nodes in hyperparameters.back_hidden_size:
hidden = Dense(nodes)(hidden)
hidden = BatchNormalization()(hidden)
hidden = Activation("relu")(hidden)
hidden = Dropout(hyperparameters.dropout)(hidden)
hidden = Dense(1)(hidden)
outputs = Activation("sigmoid")(hidden)
return Model(inputs, outputs)
@staticmethod
def __set_keras_backend(backend: str):
# Changing backend is no longer supported by tf.keras in TF2
if K.backend() != backend:
os.environ["KERAS_BACKEND"] = backend
importlib.reload(K)
assert K.backend() == backend, "Unable to set backend to {}".format(backend)
if backend.lower() == "tensorflow":
# Set the number of inter/intra threads to the number of physical cores (experiment shows this is the best)
physical_core_num = psutil.cpu_count(logical=False)
tf.config.threading.set_inter_op_parallelism_threads(physical_core_num)
tf.config.threading.set_intra_op_parallelism_threads(1)
tf.config.set_soft_device_placement(True)
physical_devices = tf.config.experimental.list_physical_devices("GPU")
try:
for gpu in physical_devices:
tf.config.experimental.set_memory_growth(gpu, True)
except Exception:
raise ValueError("Invalid device or cannot modify virtual devices once initialised")
K.clear_session()
elif backend.lower() == "theano" or backend.lower() == "cntk":
# Backends other than tensorflow require separate installations before being used.
if (int(tf.__version__.split(".")[0]) >= 2):
raise ValueError("Multi-backend is not supported")
else:
raise ValueError("Unknown backend: {}".format(backend))
@staticmethod
def __generator(train_data_raw: np.ndarray, labels_raw: np.ndarray, batch_size: int, validation_split: float, is_validation: bool) -> Generator:
while True:
total_size = train_data_raw.shape[0]
for i in range(0, total_size, batch_size):
real_batch_size = total_size - i - 1 if total_size - i - 1 < batch_size else batch_size
train_range_right = i + int(real_batch_size * (1 - validation_split))
if is_validation:
batched_train_data = train_data_raw[train_range_right:i + real_batch_size]
batched_labels = labels_raw[train_range_right:i + real_batch_size]
else:
batched_train_data = train_data_raw[i:train_range_right]
batched_labels = labels_raw[i:train_range_right]
np_batched_train_data = np.array(batched_train_data)
np_batched_labels = np.array(batched_labels)
rand = np.random.permutation(np.arange(len(np_batched_labels)))
np_batched_random_train_data = np_batched_train_data[rand]
np_batched_random_labels = np_batched_labels[rand]
np_batched_random_train_data = np.array(
[np.rot90(m=val, k=1, axes=(0, 1)) for val in np_batched_random_train_data]
)
np_batched_random_train_data = np_batched_random_train_data - np.mean(np_batched_random_train_data,
axis=0)
yield np_batched_random_train_data, np_batched_random_labels