nnrf/_nndt.py
import numpy as np
from nnrf.ml import get_activation, get_regularizer, get_optimizer
from nnrf.ml.activation import PReLU
from nnrf.utils import check_XY, one_hot, decode, calculate_weight, \
create_random_state, BatchDataset
from nnrf.analysis import get_metrics
from nnrf.utils._estimator import BaseClassifier
class NNDT(BaseClassifier):
"""
Neural Network structured as a Decision Tree.
Parameters
----------
d : int, default=5
Depth, or number of layers, of the NNDT.
r : int, float, None, or {'sqrt', 'log2'}, default='sqrt'
The number of features provided as input to each
node. Must be one of:
- int : Use `r` features.
- float : Use `r * n_features` features.
- 'sqrt' : Use `sqrt(n_features)` features.
- 'log2' : Use `log2(n_features)` features.
- None : Use all features.
loss : str, LossFunction, default='cross-entropy'
Loss function to use for training. Must be
one of the default loss functions or an object
that extends LossFunction.
activation : str, Activation, default=PReLU(0.2)
Activation function to use at each node in the NNDT.
Must be one of the default loss functions or an
object that extends Activation.
optimizer : str, Optimizer, default='adam'
Optimization method. Must be one of
the default optimizers or an object that
extends Optimizer.
regularize : str, Regularizer, None, default=None
Regularization function to use at each node in the NNDT.
Must be one of the default regularizers or an object that
extends Regularizer. If None, no regularization is done.
alpha : float, default=0.001
Learning rate. NNDT uses gradient descent.
max_iter : int, default=10
Maximum number of epochs to conduct during training.
tol : float, default=1e-4
Convergence criteria for early stopping.
batch_size : int, float, default=None
Batch size for training. Must be one of:
- int : Use `batch_size`.
- float : Use `batch_size * n_samples`.
- None : Use `n_samples`.
class_weight : dict, 'balanced', or None, default=None
Weights associated with classes in the form
`{class_label: weight}`. Must be one of:
- None : All classes have a weight of one.
- 'balanced': Class weights are automatically calculated as
`n_samples / (n_samples * np.bincount(Y))`.
verbose : int, default=0
Verbosity of estimator; higher values result in
more verbose output.
warm_start : bool, default=False
Determines warm starting to allow training to pick
up from previous training sessions.
metric : str, Metric, or None, default='accuracy'
Metric for estimator score.
random_state : None or int or RandomState, default=None
Initial seed for the RandomState. If seed is None,
return the RandomState singleton. If seed is an int,
return a RandomState with the seed set to the int.
If seed is a RandomState, return that RandomState.
Attributes
----------
n_classes_ : int
Number of classes.
n_features_ : int
Number of features.
fitted_ : bool
True if the model has been deemed trained and
ready to predict new data.
weights_ : ndarray, shape=(2**d - 1, r + 1, 2)
Weights of the NNDT. Note that the root input node
contains an extra dead input to allow simpler
weight storage.
bias_ : ndarray, shape=(2**d - 1, 1, 2)
Biases of the NNDT.
sweights_ : ndarray, shape=(2**(d-1), n_classes)
Weights of the softmax aggregator layer.
sbias_ : ndarray, shape=(n_classes,)
Biases of the softmax aggregator layer.
"""
def __init__(self, d=5, r='sqrt', loss='cross-entropy',
activation=PReLU(0.2), regularize=None, optimizer='adam',
max_iter=100, tol=1e-4, batch_size=None, class_weight=None,
verbose=0, warm_start=False, metric='accuracy',
random_state=None):
super().__init__(loss=loss, max_iter=max_iter, tol=tol,
batch_size=batch_size, verbose=verbose,
warm_start=warm_start, class_weight=class_weight,
metric=metric, random_state=random_state)
self.d = d
self.r = r
self.activation = get_activation(activation)
self.softmax = get_activation('softmax')
self.regularizer = get_regularizer(regularize)
self.optimizer = get_optimizer(optimizer)
self.weights_ = np.array([])
self.bias_ = np.array([])
self.inputs_ = np.array([])
self.sweights_ = np.array([])
self.sbias_ = np.array([])
self._p = []
self._s = []
def _initialize(self):
"""
Initialize the parameters of the NNDT.
"""
self._calculate_r()
n, s = 2**self.d - 1, self.r + 1
self.weights_ = self.random_state.randn(n, s, 2) * 0.1
self.bias_ = self.random_state.randn(n, 1, 2) * 0.1
self.inputs_ = self.inputs_.reshape(-1, self.r).astype(int)
for i in range(n):
input = self.random_state.choice(self.n_features_, size=(1, self.r), replace=False)
self.inputs_ = np.concatenate((self.inputs_, input))
self.sweights_ = self.random_state.randn(2**self.d, self.n_classes_) * 0.1
self.sbias_ = self.random_state.randn(self.n_classes_) * 0.1
keys = ['sw', 'sb']
for i in range(n):
keys += ['w' + str(i), 'b' + str(i)]
self.optimizer.setup(keys)
def decision_path(self, X):
"""
Returns the nodes in each layer that comprises
the decision path of the NNDT.
Parameters
----------
X : array-like, shape=(n_samples, n_features)
Data to predict.
Returns
-------
paths : ndarray, shape=(n_samples, n_layers)
List of paths for each data sample.
Each path consists of a list of nodes active
in each layer (e.g. if the third element is
2, the third node in the third layer was active).
"""
pred = self._forward(X)
paths = np.array([np.zeros(len(X))] + self._s[:-1])[:,np.newaxis]
return np.concatenate(paths).T
def _is_fitted(self):
"""
Return True if the model is properly ready
for prediction.
Returns
-------
fitted : bool
True if the model can be used to predict data.
"""
weights = len(self.weights_) > 0
bias = len(self.bias_) > 0
inputs = len(self.inputs_) > 0
return weights and bias and inputs and self.fitted_
def _forward(self, X):
"""
Conduct the forward propagation steps through the NNDT.
Only one node in each layer is active and activates only
one child node.
Parameters
----------
X : array-like, shape=(n_samples, n_features)
Data to predict.
Returns
-------
Y_hat : array-like, shape=(n_samples, n_classes)
Softmax output.
"""
s = np.zeros(len(X))
p = s.reshape(-1,1)
self._x, self._z, self._s, self._p = [], [], [s], [p]
for d in range(self.d):
n = (2**d + s - 1).astype(int)
weights = self._get_weights(n)
bias = self._get_bias(n).reshape(-1,2)
X_d = self._get_inputs(n, X)
X_d = np.concatenate((X_d, p), axis=1)
z = np.einsum('ijk,ij->ik', weights, X_d) + bias
a = self.activation.activation(z)
k = np.argmax(a, axis=1).astype(int)
s = (2 * s + k).astype(int)
i = (np.arange(len(X)), k)
p = a[i].reshape(-1,1)
self._x.append(X_d)
self._z.append(z)
self._s.append(s)
self._p.append(p)
p = np.zeros((len(X), 2**self.d)) # N(2**d)
s = (2 * self._s[-2]).reshape(-1,1)
s = np.concatenate((s, s+1), axis=1).astype(int)
i = (np.arange(len(X)).reshape(-1,1), s)
p[i] += a
z = np.dot(p, self.sweights_) + self.sbias_ # NC
Y_hat = self.softmax.activation(z)
self._z.append(z)
self._p.append(p)
return Y_hat
def _backward(self, Y_hat, Y, weights=None):
"""
Conduct the backward propagation steps through the NNDT.
Parameters
----------
Y_hat : array-like, shape=(n_samples, n_classes)
Softmax output.
Y : array-like, shape=(n_samples, n_classes)
Target labels in one hot encoding.
weights : array-like, shape=(n_samples,), default=None
Sample weights. If None, then samples are equally weighted.
"""
weights = calculate_weight(decode(Y), self.n_classes_,
class_weight=self.class_weight, weights=weights)
dY = self.loss.gradient(Y_hat, Y) * weights.reshape(-1,1)
dsZ = dY * self.softmax.gradient(self._z[-1]) # NC
dsW = np.dot(self._p[-1].T, dsZ) / len(Y_hat) # (2**d)C
dsb = np.sum(dsZ, axis=0) / len(Y_hat) # C
s = (2 * self._s[-2]).reshape(-1,1) # N1
s = np.concatenate((s, s+1), axis=1).astype(int).flatten() # (2N)
weights = self.sweights_[s].reshape(-1,2,self.sweights_.shape[-1]) # N2C
dp = np.einsum('ik,ijk->ij', dsZ, weights) # NC * N2C -> N2
if self.regularizer is not None:
dsW += self.regularizer.gradient(self.sweights_)
self.sweights_ -= self.optimizer.update('sw', dsW)
self.sbias_ -= self.optimizer.update('sb', dsb)
for i in range(self.d):
z = self._z[-2-i] # N2
if i > 0:
idx = tuple([np.arange(len(z)), self._s[-1-i] % 2])
dZ_ = np.sum(dp, axis=1) * self.activation.gradient(z[idx]) # N1 * N1 -> N1
dZ = np.zeros(z.shape) # N2
dZ[idx] += dZ_.reshape(-1) # N2
else:
dZ = dp * self.activation.gradient(z) # N2 * N2 -> N2
dW_ = np.einsum('ij,ik->ijk', self._x[-1-i], dZ) # NM * N2 -> NM2
db_ = dZ.reshape(-1,1,2) # N12
nodes = self._s[-2-i].astype(int)
n = (2**(self.d - i - 1) + nodes - 1).astype(int)
weights = self._get_weights(n)[:,-1,:].reshape(-1,1,2) # N12
dp = np.einsum('ik,ijk->ij', dZ, weights).reshape(-1,1) # N2 * N12 -> N1
n_count = np.bincount(nodes.reshape(-1))
layer_size = 2**(self.d - i - 1)
dW = np.zeros((layer_size, self.r + 1, 2))
db = np.zeros((layer_size, 1, 2))
if self.regularizer is not None:
reg = self.regularizer.gradient(self.weights_)[n] # NM2
reg = np.einsum('ijk,i->ijk', reg, 1 / n_count[nodes])
np.add.at(dW, nodes, reg)
np.add.at(dW, nodes, np.einsum('ijk,i->ijk', dW_, 1 / n_count[nodes]))
np.add.at(db, nodes, np.einsum('ijk,i->ijk', db_, 1 / n_count[nodes]))
for l in range(layer_size):
node = int(2**(self.d - i - 1) + l - 1)
l_key = str(node)
self.weights_[node] -= self.optimizer.update('w' + l_key, dW[l])
self.bias_[node] -= self.optimizer.update('b' + l_key, db[l])
def _get_weights(self, node):
"""
Get weight matrices for each node given
by `node`.
Parameters
----------
node : array-like, shape=(n_samples,)
List of nodes of interest.
Returns
-------
weights : array-like, shape=(n_samples, r + 1, 2)
List of weight matrices for each data sample.
"""
return self.weights_[node]
def _get_bias(self, node):
"""
Get bias matrices for each node given
by `node`.
Parameters
----------
node : array-like, shape=(n_samples,)
List of nodes of interest.
Returns
-------
bias : array-like, shape=(n_samples, 1, 2)
List of bias matrices for each data sample.
"""
return self.bias_[node]
def _get_inputs(self, node, X):
"""
Get input features for each node given
by `node`.
Parameters
----------
node : array-like, shape=(n_samples,)
List of nodes of interest.
Returns
-------
inputs : array-like, shape=(n_samples, r)
List of inputs for each data sample.
"""
indices = (np.arange(len(X)).reshape(-1,1), self.inputs_[node])
return X[indices]
def _calculate_r(self):
"""
Calculate the number of features provided
to each node in the NNDT.
"""
if self.n_features_ is None:
raise ValueError("Number of features needs to be set first to calculate r")
if self.r is None : self.r = self.n_features_
elif self.r == 'sqrt' : self.r = int(np.sqrt(self.n_features_))
elif self.r == 'log2' : self.r = int(np.log(self.n_features_) / np.log(2))
elif isinstance(self.r, float) and 0 < self.r <= 1 : self.r = int(self.r * self.n_features_)
elif not (isinstance(self.r, int) and self.r > 0):
raise ValueError("R must be None, a positive int or float in (0,1]")
if self.r == 0 : self.r = 1