In [155]:
import numpy as np
import random

from csv import reader
from datetime import datetime
from math import exp

In [156]:
#  Functions to initialize network

def init_net( n_inp, n_hidden_layer, n_hidden_node, n_out ):
    """"
    Initialize a simple (deep) neural network
    
    n_inp:           Number of input values
    n_hidden_layer:  Number of hidden layers
    n_hidden_node:   Number of nodes per hidden layer
    n_out:           Number of output categories
    """
    
    #  A network is a list of layers: input, 1 or more hidden plus output
    
    net = [ ]
    
    #  Hidden layers, each neuron has n_inp links from previous layer
    #  (fully connected), plus last list entry for layer's bias
    
    for i in range( 0, n_hidden_layer ):
        hidden_layer = [ ]
        
        for j in range( 0, n_hidden_node[ i ] ):
            
            #  Randomize link weights and bias
            
            n_prev = n_inp + 1 if i == 0 else n_hidden_node[ i - 1 ] + 1
            
            neuron_wt = [ random.random() for k in range( 0, n_prev ) ]
            hidden_layer.append( { 'weights': neuron_wt } )
            
        net.append( hidden_layer )    # Add hidden layer to network
        
    #  Create output layer similarly, but with n_out neurons
    
    output_layer = [ ]
    for i in range( 0, n_out ):
        neuron_wt = [ random.random() for j in range( 0, n_hidden + 1 ) ]
        output_layer.append( { 'weights': neuron_wt } )
        
    net.append( output_layer )  # Add output layer to network
    
    return net

#  End function init_net

In [157]:
#  Functions to perform forward propegation

def activate( weight, inp ):
    """
    Compute activation of a neuron, given incoming activations, weights, neuron bias
    
    weight:  List of weights for incoming neuron activations
    inp:     Incoming activations
    """
    
    actv = weight[ -1 ]    # Bias is last weight
    
    for i in range( 0, len( weight ) - 1 ):    # For all incoming acivations
        actv += weight[ i ] * inp[ i ]
    
    return actv

#  End function activate


def activation_fn( actv ):
    """
    Activation function (sigmoid, in our case) used to smooth activation values
    
    actv:  Raw activation values to transform
    """
    
    return 1.0 / ( 1.0 + exp( -actv ) )

#  End function activation_fn
    


def forward_prop( net, row ):
    """
    Forward propegate input row of data, passing it through hidden layers in network, and
    using results to define the final output layer value
    
    
    net:  Hidden and output layer weights and biases
    inp:  Input layer values
    """

    inp = row
    
    for layer in net:    # For all hidden layers
        new_inp = [ ]    # Activation results for current layer
        
        for neuron in layer:    # For all neurons in current layer
            actv = activate( neuron[ 'weights' ], inp )
            neuron[ 'output' ] = activation_fn( actv )
            new_inp.append( neuron[ 'output' ] )
            
        inp = new_inp    # Output from layer forms input for next layer
        
    return inp    # Return output for last layer, output layer

#  End function forward_prop

In [158]:
#  Functions to perform backpropegation of error (cost) through network

def transfer_deriv( actv ):
    """
    Compute derivate of neuron output, for sigmoid derivative is d/dx f(x)=f(x)(1-f(x))
    
    actv:  Output activation to differentiate
    """
    
    return actv * ( 1.0 - actv )

#  End function transfer_deriv


def update_wt( net, row, l_rate ):
    """
    Update weights for network given input row, current learning rate
    
    net:     Network to update
    row:     Input row
    l_rate:  Learning rate
    """
    
    for i in range( 0, len( net ) ):    # For all layers
        if i == 0:    # Use original input for first layer
            inp = row[ :-1 ]
        else:    # Use previous layer's output as layer's input
            inp = [ neuron[ 'output' ] for neuron in net[ i - 1 ] ]
            
        for neuron in net[ i ]:
            
            #  For each neuron, update its weight by adding learning rate times
            #  slope in error from expect values times pervious layer's input
            
            for j in range( len( inp ) ):
                neuron[ 'weights' ][ j ] += l_rate * neuron[ 'delta' ] * inp[ j ]
                
            #  Update bias, assume input's bias is 1.0
            
            neuron[ 'weights' ][ -1 ] += l_rate * neuron[ 'delta' ]

#  End function update_wt


def backprop_error( net, exp ):
    """
    Backpropegate error from output layer through hidden layers to input layer;
    error in output layer is expected classification minus derived classification
    times slow of derived classification (activation).
    
    In hidden layer, we accumlate for each neuron weight on link between neuron
    and next layer times weighted error times slope of cost function
    
    net:  Results of forward propegation on network
    exp:  Expected output values
    """
    
    #  Walk backwards through layers in the network
    
    for i in reversed( range( 0, len( net ) ) ):
        layer = net[ i ]
        err = [ ]
        
        if i == len( net ) - 1:    # Output layer?
            for j in range( 0, len( layer ) ):    # For all output neurons
                neuron = layer[ j ]
                err.append( exp[ j ] - neuron[ 'output' ] )
                
        else:    # Hidden layer
            for j in range( 0, len( layer ) ):    # For all hidden neurons
                err_val = 0.0
                
                for neuron in net[ i + 1 ]:
                    err_val += ( neuron[ 'weights' ][ j ] * neuron[ 'delta' ] )
                    err.append( err_val )
                    
        #  Now, compute cost function derivative (delta) for each neuron, add it
        #  to neuron's dictionary
    
        for j in range( 0, len( layer ) ):
            neuron = layer[ j ]
            neuron[ 'delta' ] = err[ j ] * transfer_deriv( neuron[ 'output' ] )
        
#  End function backprop_error

In [159]:
#  Function to train network, predict results

def predict( net, row ):
    """
    For a trained network, predict output for given input row
    
    net:  Trained network
    row:  Row to classify
    """
    
    out = forward_prop( net, row )
    return out.index( max( out ) )

#  End function predict


def train_net( net, train, l_rate, decay, n_epoch, n_out ):
    """
    Train network w/stochastic gradient descent
    
    net:      (Initialized) network to train
    train:    Training set
    l_rate:   Learning rate
    decay:    Learning rate decay
    n_epoch:  Number of epochs (iterations) to train
    n_out:    Nuber of output classifications
    """
    
    init_l_rate = l_rate    # Save initial learning rate
    
    for epoch in range( 0, n_epoch ):
        sum_err = 0
        
        for i,row in enumerate( train ):    # For all training samples
            
            #  1. Forward prop training row thru network
            #  2. One hot encode expected output classification
            #  3. Backprop error through network to get error gradient (delta)
            #  4. Update weights based on error delta
            #  5. Decay learning rate
            
            out = forward_prop( net, row )    # Step 1
            
            exp = [ 0 for i in range( 0, n_out ) ]    # Step 2
            exp[ row[ -1 ] ] = 1
            
            #  Compute sum of error at each neuron, square to make positive
            
            for i in range( 0, len( exp ) ):
                sum_err += ( exp[ i ] - out[ i ] ) ** 2
                
            backprop_error( net, exp )    # Step 3
            update_wt( net, row, l_rate )    # Step 4
            
            l_rate = init_l_rate * ( 1.0 / ( 1.0 + ( decay * i ) ) )    # Step 5
            
        if epoch % 50 == 0 or epoch == n_epoch - 1:
            print( 'Epoch: %3d;  Error: %.3f' % ( epoch, sum_err ) )
            if epoch == n_epoch - 1:
                print( '\n' )
            
#  End function train_network

In [160]:
#  File processing functions

def acc_metric( actual, predicted ):
    """
    Determine accuracy of actual versus predicted
    
    actual:     Actual correct values
    predicted:  Predicted values
    """
    
    correct = 0
    for i in range( 0, len( actual ) ):
        correct = ( correct + 1 ) if actual[ i ] == predicted[ i ] else correct
        
    return correct / float( len( actual ) ) * 100.0

#  End function acc_metric


def cross_validation_split( dataset, n_folds ):
    """
    Split dataset into folds for n-fold cross validation
    
    dataset:  Dataset to split
    n_folds:  Number of folds
    """
    
    dataset_split = [ ]
    dataset_copy = list( dataset )
    
    fold_sz = int( len( dataset ) / n_folds )
    for i in range( 0, n_folds ):
        fold = [ ]
        while len( fold ) < fold_sz:
            index = random.randrange( len( dataset_copy ) )
            fold.append( dataset_copy.pop( index ) )
            
        dataset_split.append( fold )
        
    return dataset_split

#  End function cross_validation_split


def dataset_col_minmax( dataset ):
    """
    Return min,max (float) values for each column in dataset
    
    dataset:  Dataset to query
    """
    
    np_dataset = np.array( dataset )
    stats = [ ]
    for i in range( 0, len( dataset[ 0 ] ) ):
        min_val = min( np_dataset[ :, i ] )
        max_val = max( np_dataset[ :, i ] )
        stats.append( [ min_val, max_val ] )
        
    return stats

#  End function dataset_col_minimax


def dataset_norm( dataset, minmax ):
    """
    Normalize each dataset column to range 0..1
    
    dataset:  Dataset to normalize
    minmax:   List of column [min,max]
    """
    
    rng = [ ]    # Compute minmax range for each column
    for i in range( 0, len( minmax ) ):
        rng.append( minmax[ i ][ 1 ] - minmax[ i ][ 0 ] )
        
    for row in dataset:
        for i in range( 0, len( row ) ):
            row[ i ] = ( row[ i ] - minmax[ i ][ 0 ] ) / rng[ i ]
            
#  End function dataset_norm


def eval_alg( dataset, alg, n_folds, args ):
    """
    
    Evaluate algorithms using n_fold cross validation
    
    dataset:  Dataset to train/test
    alg:      Algorithm to test
    n_fold:   Number of cross validation folds
    args:     Argument dictionary for algorithm: { l_rate, epoch, hidden }
    """
    
    #  Input are number of neurons in first hidden layer. Outputs are unique target values,
    #  remember target value for each training sample is last row element
    
    #  Add one to every hidden layer node count to make space for bias
    
    for i in range( 0, len( arg[ 'hidden' ] ) ):
        arg[ 'hidden' ][ i ] += 1
        
    n_inp = len( dataset[ 0 ] ) - 1
    n_out = len( set( row[ -1 ] for row in dataset ) )
    net = init_net( n_inp, arg[ 'layer' ], arg[ 'hidden' ], n_out )
    
    folds = cross_validation_split( dataset, n_folds )
    scores = [ ]
    
    for i,fold in enumerate( folds ):
        print( 'Fold %d:' % ( i + 1 ) )
        
        #  Training set is everything except what's in fold
        
        train_set = list( folds )
        train_set.remove( fold )
        
        #  Use list comprehension to flatten 2d list into 1d list of rows
        
        train_set = [ val for sub in train_set for val in sub ]
        
        #  Test set is all rows in current fold
        
        test_set = [ ]
        for row in fold:
            row_cp = list( row )
            test_set.append( row_cp )

        train_net( net, train_set, arg[ 'l_rate' ], arg[ 'decay' ], arg[ 'epoch' ], n_out )
        
        correct = 0
        for row in test_set:
            p = predict( net, row )
            a = row[ -1 ]
            correct = ( correct + 1 ) if a == p else correct
            
        acc = correct / len( test_set ) * 100.0
        scores.append( acc )
        
    return scores

#  End function evaluate_arg


def load_csv( fname, header=True ):
    """
    Load a CSV dataset
    
    fname:   Dataset filename
    header:  Header line flag, default=Yes
    """
    
    dataset = [ ]
    with open( fname, 'r' ) as file:
        csv_reader = reader( file )
        for row in csv_reader:
            if not row:
                continue
            dataset.append( row )
    
    if header:
        dataset = dataset[ 1: ]    # Remove header line
            
    file.close()
    return dataset

#  End function load_csv


def str_col_2_float( dataset, col ):
    """
    Convert string column to float values
    
    dataset:  Dataset to process
    col:      Column to convert
    """
    
    for row in dataset:
        row[ col ] = float( row[ col ].strip() )
        
#  End function str_col_2_float


def str_col_2_int( dataset, col ):
    """
    Convert string column to int values
    
    dataset:  Dataset to process
    col:      Column to convert
    """
    
    #  Extract unique string values
    
    uniq = set( [row[ col ] for row in dataset ] )
    
    lookup = { }    # Building value to integer lookup database
    for i,val in enumerate( uniq ):
        lookup[ val ] = i
        
    for row in dataset:
        row[ col ] = lookup[ row[ col ] ]
        
    return lookup    # Return value to integer lookup database
        
#  End function str_col_2_int

In [161]:
#  Mainline

random.seed( datetime.now() )    # Seed random number generator

dataset = load_csv( 'C:/Users/healey/Downloads/wheat-seeds.csv' )
n_row = len( dataset )
n_col = len( dataset[ 0 ] )

#  Attributes are read as strings, convert to floats

for i in range( 0, n_col - 1 ):
    str_col_2_float( dataset, i )
    
#  Target seed type read as string, convert to (classification) integer

str_col_2_int( dataset, n_col - 1 )

#  Normalize columns in dataset

minmax = dataset_col_minmax( dataset )
dataset_norm( dataset, minmax )

#  numpy converts everything to float, so reset target values to int,
#  b/c we're doing classification

str_col_2_int( dataset, n_col - 1 )

#  Evaluate prediction algorithm

n_folds = 5
l_rate = 0.1
l_decay = 0.01
n_epoch = 1000
n_layer = 1
n_hidden = 10

arg = {
    'l_rate': l_rate,
    'decay': l_decay,
    'epoch': n_epoch,
    'layer': n_layer,
    'hidden': [ n_hidden ] * n_layer
}

scores = eval_alg( dataset, backprop_error, n_folds, arg )

print( 'Scores:  ', end='' )
for i in range( 0, len( scores ) ):
    print( '%.3f%%; ' % scores[ i ], end='' )
print( '\n' )

print( 'Mean accuracy: %.3f%%' % ( sum( scores ) / float( len( scores ) ) ) )

Fold 1:
Epoch:   0;  Error: 307.394
Epoch:  50;  Error: 92.897
Epoch: 100;  Error: 29.400
Epoch: 150;  Error: 20.502
Epoch: 200;  Error: 16.978
Epoch: 250;  Error: 15.201
Epoch: 300;  Error: 14.087
Epoch: 350;  Error: 13.292
Epoch: 400;  Error: 12.678
Epoch: 450;  Error: 12.180
Epoch: 500;  Error: 11.763
Epoch: 550;  Error: 11.403
Epoch: 600;  Error: 11.085
Epoch: 650;  Error: 10.799
Epoch: 700;  Error: 10.538
Epoch: 750;  Error: 10.294
Epoch: 800;  Error: 10.065
Epoch: 850;  Error: 9.845
Epoch: 900;  Error: 9.632
Epoch: 950;  Error: 9.423
Epoch: 999;  Error: 9.222


Fold 2:
Epoch:   0;  Error: 8.480
Epoch:  50;  Error: 7.521
Epoch: 100;  Error: 7.255
Epoch: 150;  Error: 7.057
Epoch: 200;  Error: 6.876
Epoch: 250;  Error: 6.699
Epoch: 300;  Error: 6.526
Epoch: 350;  Error: 6.356
Epoch: 400;  Error: 6.192
Epoch: 450;  Error: 6.034
Epoch: 500;  Error: 5.883
Epoch: 550;  Error: 5.740
Epoch: 600;  Error: 5.602
Epoch: 650;  Error: 5.471
Epoch: 700;  Error: 5.346
Epoch: 750;  Error: 5.226
Ep