import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns import torch import torch.nn as nn from sklearn.preprocessing import MinMaxScaler class LSTM( nn.Module ): def __init__( self, input_size=1, hidden_n=100, output_size=1 ): super( LSTM, self ).__init__() self.hidden_layer_size = hidden_n self.lstm = nn.LSTM( input_size, hidden_n ) self.fcn = nn.Linear( hidden_n, output_size ) # Hidden cell contains previous hidden state, previous cell state, # randomized to start self.hidden_cell =\ ( torch.zeros( 1, 1, self.hidden_layer_size ), torch.zeros( 1, 1, self.hidden_layer_size ) ) # End method __init__ def forward( self, input_seq ): # input_seq is a 12-value tensor, the 12 months of passengers to # train on, normalized on the range -1..1, as a single row # Input to an LSTM is of shape (sequence_length,batch,input_size), # so below view creates a column of 12 "samples", each sample is # a single value, a batch size of 1, and a length of 1 (one 12-value # sample sequence) input_seq = input_seq.view( len( input_seq ), 1, 1 ) # Ask the LSTM for the output and the (hidden,cell) state based # on the 12-value input and the current (hidden,cell) state, this # will recurse the LSTM 12 times lstm_out,self.hidden_cell = self.lstm( input_seq, self.hidden_cell ) lstm_out = lstm_out.view( len( input_seq ), -1 ) # Run the final output through the LSTM's FCN to get class # probabilities predictions = self.fcn( lstm_out ) # Highest probability is the class we estimate return predictions[ -1 ] # End method forward # End class LSTM def create_IO_seq( input, tw ): # Create a set of time series to process during training, input is the # entire data stream to divide, tw is time window size in samples # # IO_seq is a (train_seq,label) tuple list, train_seq is a 12-month # set of passengers, label is a single passenger count following the # 12-month sequence IO_seq = [ ] n = len( input ) for i in range( n - tw ): # Grab tw elements as training sequence, next element that follows # is the label (i.e., the number of passengers following the given # 12-month period) train_seq = input[ i: i + tw ] # To be pendantic, pull single tensor value as float, then make # a single-element float list and convert it back to a tensor; # can be done in a single step as: # # train_label = input[ i + tw: i + tw + 1 ] # # but I found that hard to understand # # val is a single float, the number of passengers following the # 12-month training sequence we just extracted # # train_label is [ val ] (a single-element float list) as a tensor val = input[ i + tw ].item() train_label = torch.FloatTensor( [ val ] ) IO_seq.append( (train_seq,train_label) ) return IO_seq # End function create_IO_seq # Mainline flight_data = sns.load_dataset( 'flights' ) # First 132 months for train, last 12 months for test, split into # 12-month training sequences and 1-value labels: # # 1 2 3 4 5 6 7 8 9 10 11 12 13 # - 12-months of training - next value is label # so train on 12-month sequence..then see how many passengers next month all_data = flight_data[ 'passengers' ].values.astype( float ) test_data_size = 12 train_data = all_data[ :-test_data_size ] test_data = all_data[ -test_data_size: ] # Transform/normalize data to range -1..1 scaler = MinMaxScaler( feature_range=(-1,1) ) # First reshape data into a single column, then transform to range -1..1 train_data_norm =\ scaler.fit_transform( train_data.reshape( len( train_data ), 1 ) ) # Convert back to PyTorch tensor that's a single row train_data_norm =\ torch.FloatTensor( train_data_norm ).view( len( train_data_norm ) ) train_window = 12 train_IO_seq = create_IO_seq( train_data_norm, train_window ) # Create LSTM model = LSTM() loss_function = nn.MSELoss() optimizer = torch.optim.Adam( model.parameters(), lr=0.001 ) # Train LSTM model.train() epochs = 150 for i in range( 0, epochs ): for seq,label in train_IO_seq: optimizer.zero_grad() # Re-initialize hidden state and cell state to random prior to # walking over the samples in the training sequence model.hidden_cell =\ ( torch.zeros( 1, 1, model.hidden_layer_size ), torch.zeros( 1, 1, model.hidden_layer_size ) ) y_pred = model( seq ) single_loss = loss_function( y_pred, label ) single_loss.backward() optimizer.step() if i % 25 == 1: print( f'epoch {i:3}; loss: {single_loss.item():10.8f}' ) print( f'epoch: {i:3}; loss: {single_loss.item():10.10f}' ) # Predict final twelve months model.eval() fut_pred = 12 # Grab last twelve months of data, this will be the sequence used to # predict the first test value (remember, 132 training and 12 test # values were stripped at the beginning of the mainline) test_outputs = [ ] test_inputs = train_data_norm[ -train_window: ].tolist() # Run through all twelve test values for i in range( 0, fut_pred ): # Convert list of last 12 (normalized) passengers to a tensor seq = torch.FloatTensor( test_inputs[ -train_window: ] ) # with torch.no_grad() runs the LSTM without calculating gradients, we # can only do this b/c we know we don't need gradients, backwards() # is not called at the end of this training run, b/c we are passing # one single 12-value sequence and only care about the final output with torch.no_grad(): model.hidden =\ ( torch.zeros( 1, 1, model.hidden_layer_size ), torch.zeros( 1, 1, model.hidden_layer_size ) ) # Append output of LSTM to test_inputs, so when we loop again and # grab the last 12 values for input, it includes the output(s) # the LSTM is generating. Also, make sure to save the outputs for # later accuracy calculations test_inputs.append( model( seq ).item() ) test_outputs.append( test_inputs[ -1 ] ) # The LSTM output is normalized on range -1..1, so we need to invert this # to get actual passenger numbers, need these to do a proper comparison to # known passenger numbers for accuracy calculation actual_pred =\ scaler.inverse_transform( np.array( test_outputs ).reshape( -1, 1 ) ) # Plot the known values in blue fig_size = plt.rcParams[ 'figure.figsize' ] fig_size[ 0 ] = 15 fig_size[ 1 ] = 5 plt.rcParams[ 'figure.figsize' ] = fig_size plt.title( 'Month vs Passangers' ) plt.ylabel( 'Total Passengers' ) plt.xlabel( 'Months' ) plt.grid( True ) plt.autoscale( axis='x', tight=True ) plt.plot( flight_data[ 'passengers' ] ) # Add in the predicted values in orange x = np.arange( 132, 144, 1 ) plt.plot( x, actual_pred ) plt.show()