Source code for sciSOM.SOM_learn.train

import numpy as np
from scipy.spatial.distance import cdist
import math
import random # Might want to take a closer look at radom number generators in the futuer

[docs] class SOM: """ SOM class: SOM object that can be trained and used to classify data. One of the goals of this object will be to switch between different SOM implementations. For now, it will be a simple implementation and the cSOM implementation. """ def __init__(self, x_dim: int, y_dim: int, input_dim: int, n_iter: int, learning_parameters: np.ndarray, decay_type: str = "exponential", neighborhood_decay: str = "geometric_series", som_type: str="Kohonen", mode: str="batch", save_weight_cube_history: bool = False, gamma_off: bool = False, weight_cube: np.ndarray = None, weight_cube_save_states: np.ndarray = None, custom_scale_sup_matrix: float = 0, csom_learning_radius: int = 1, histories: bool = False): """ Initialize the SOM object. Parameters ---------- x_dim : (int) The x dimension of the SOM. y_dim : (int) The y dimension of the SOM. input_dim : (int) The dimension of the input data. n_iter : (int) The number of iterations to train the SOM. learning_parameters : (ndarray) for Khononen SOM -> The learning rate and sigma for the SOM. for a cSOM -> The learning rate, beta, and gamma of the SOM. (The main difference between this paper and our implementation is that we also update the immidiate neighbors of the BMU) https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=23839 ** Could maybe make it more general in the future so it can accept tables** decay_type : (str) The type of decay the SOM should follow. Default is set to exponential. Could be linear or schedule. neighborhood_decay : (str) The type of decay the neighborhood function should follow. defualt is set to geometric_series. Could be exponential or none. som_type (str): The type of SOM to use. Current options are Kohonen or cSOM, this can be expanded. mode : (str) The mode of the SOM. default is set to batch. Could be online. save_weight_cube_history : (bool) Saves the history of how often each neuron was the BMU. default is set to False. gamma_off : (bool) Turns off the gamma term in the cSOM algorithm. Effecively making it a Kohonen SOM with a constant neighborhood size of 1. default is set to False. Returns ------- None """ self.x_dim = x_dim self.y_dim = y_dim self.input_dim = input_dim self.n_iter = n_iter self.learning_parameters = learning_parameters self.decay_type = decay_type self.som_type = som_type self.mode = mode self.neighborhood_decay = neighborhood_decay self.is_trained = False self.save_weight_cube_history = save_weight_cube_history self.gamma_off = gamma_off self.histories = histories self.csom_learning_radius = csom_learning_radius self.weight_cube_save_states = weight_cube_save_states self.custom_scale_sup_matrix = custom_scale_sup_matrix if weight_cube is None: self.weight_cube = np.random.rand(x_dim, y_dim, input_dim) else: self.weight_cube = weight_cube self.mode_methods = { 'batch': self._train_batch, 'online': self._train_online } if mode not in self.mode_methods: raise ValueError(f"Mode {mode} is not supported. Choose from {list(self.mode_methods.keys())}") # Check if the learning parameters are correct if histories == True: self.learning_rate_history = np.zeros(n_iter) self.learning_radius_history = np.zeros(n_iter) self.bais_matrix_history = np.zeros((x_dim, y_dim, n_iter)) self.save_neighborhood_function = np.zeros((x_dim, y_dim, n_iter)) self.track_mbu = np.zeros((2, n_iter)) self.track_radius_limits = np.zeros((4, n_iter)) self.frequency_matrix_history = np.zeros((x_dim, y_dim, n_iter)) self.frequency_matrix = np.zeros((x_dim, y_dim)) self.bais_matrix = np.zeros((x_dim, y_dim)) if weight_cube_save_states is not None: self.som_save_state = np.zeros(((len(weight_cube_save_states)), x_dim, y_dim, input_dim)) #self.weight_cube.copy() if self.save_weight_cube_history: self.weight_cube_history = np.zeros((self.x_dim, self.y_dim))
[docs] def train(self, data): """ Train the SOM object. Parameters: data (np.array): The data to train the SOM on. """ # Check if the learning parameters are correct check_field_exists(self.learning_parameters, "alpha") # Decide the order of the input for traiing: train_method = self.mode_methods.get(self.mode) # Maybe output an array with random indexes to train the SOM? data_shuffled_index = train_method(data) # Train the SOM if self.som_type == "Kohonen": self.Kohonen_SOM(data, data_shuffled_index) elif self.som_type == "cSOM": self.cSOM(data, data_shuffled_index) else: raise ValueError(f"SOM type {self.som_type} is not supported. Choose from Kohonen or cSOM") self.is_trained = True
[docs] def Kohonen_SOM(self, data, indecies): """ Train the SOM using the Kohonen algorithm. """ # Records parameters to confirm SOM is training correctly alpha_rec = np.zeros(self.n_iter) sigma_rec = np.zeros(self.n_iter) radius_rec = np.zeros(self.n_iter) counter = 0 # Might want to pick a mode outside the loop to save time. for i in range(self.n_iter): #distances = cdist(self.weight_cube.reshape(-1, # self.weight_cube.shape[-1]), # data[int(indecies[i])].reshape(1,self.input_dim), # metric='euclidean') #w_neuron = np.argmin(distances, axis=0) #x_bmu, y_bmu = np.unravel_index(w_neuron, (self.x_dim, self.y_dim)) x_bmu, y_bmu = self.compute_bmu(data, indecies, i) if self.save_weight_cube_history: self.weight_cube_history[x_bmu, y_bmu] += 1 # Need to calculate the sigma radius. # Might want to write this in a more modular way. if self.decay_type == "exponential": tao = self.n_iter / self.learning_parameters["max_radius"] sigma = int(self.learning_parameters["sigma"] * np.exp(-i / tao)) alpha = self.learning_parameters["alpha"] * np.exp(-i / tao) radius = math.ceil(self.learning_parameters["max_radius"] * np.exp(-i / tao)) elif self.decay_type == "linear": #tao = self.n_iter / self.learning_parameters["max_radius"] sigma = int(self.learning_parameters["sigma"] - self.learning_parameters["sigma"] / self.n_iter * i) alpha = self.learning_parameters["alpha"] - self.learning_parameters["alpha"] / self.n_iter * i radius = math.ceil(self.learning_parameters["max_radius"] - self.learning_parameters["max_radius"] / self.n_iter * i) elif self.decay_type == "schedule": # Need to check if the current time step is and pic the sigma from the schedule. current_schedule = np.sum(self.learning_parameters["time"] <= i) sigma = self.learning_parameters["sigma"][current_schedule] alpha = self.learning_parameters["alpha"][current_schedule] radius = self.learning_parameters["max_radius"][current_schedule] else: raise ValueError(f"Decay type {self.decay_type} is not supported. Choose from exponential, linear or schedule") # Now compute the neighbors to update x_min, x_max, y_min, y_max = self.compute_neighborhood(x_bmu, y_bmu, radius) neighborhood_radius = self.neighborhood_function(int(x_bmu), int(y_bmu), i, radius) self.learning_rate_history[i] = alpha self.learning_radius_history[i] = radius # This is currently updating the BMU, but we need to update the BMU and its neighbors. # Missing the radius decay. (further away points should be updated less) self.weight_cube[x_min:x_max, y_min:y_max] += ( alpha * neighborhood_radius[x_min:x_max, y_min:y_max, np.newaxis] * (data[int(indecies[i])] - self.weight_cube[x_min:x_max, y_min:y_max])) # Make this into a function later on to reduce code duplication if self.weight_cube_save_states is not None: if i == self.weight_cube_save_states[counter]: self.som_save_state[counter,:,:,:] = self.weight_cube.copy() counter += 1
[docs] def cSOM(self, data, indecies): """ Train the SOM using the concious SOM algorithm. The implementation of the conscience SOM follows NeuralWare's implementation that is used in the NeuroScope environment and is given in NeuralWare documentation as well as in the documentation of NeuroScope ann-SOMconsc module. For more information you may contact [Erzsébet Merényi](erzsebet@rice.edu): Prof. Merényi was not consulted on the implementation of sciSOM functions that intend to mimic NeuroScope functionalities of the same name, nor did she have opportunity to inspect proof of faithfulness to the same-name module in NeuroScope or correctness of the corresponding sciSOM code. Therefore, Dr. Merényi and the NeuroScope group take no responsibility for the likeness and the correctness of the functions implemented to mimic (partial) NeuroScope capabilities in sciSOM. """ # Test in controlling the learning radius: learning_radius = self.csom_learning_radius # Leave this for SOM development, but should be set to 1 for cSOM counter = 0 for i in range(self.n_iter): # We cannont calculate the BMU in the same way as before # We first need the other values to calculate the BMU # Calculate all frequency values, initial state is 0 # Conciouse mechanism alpha, beta, gamma = self.decay_cSOM(i) if self.gamma_off == True: gamma = 0 # Calculate bais term if self.custom_scale_sup_matrix == 0: self.bais_matrix = gamma * ((1/(self.x_dim * self.y_dim)) - self.frequency_matrix) else: self.bais_matrix = gamma * (self.custom_scale_sup_matrix - self.frequency_matrix) x_concious_bmu, y_concious_bmu = self.compute_bmu_cSOM(data, indecies, i, self.bais_matrix) if self.save_weight_cube_history: self.weight_cube_history[x_concious_bmu, y_concious_bmu] += 1 # Update the frequency term for next round self.frequency_matrix[x_concious_bmu, y_concious_bmu] += beta * (1 - self.frequency_matrix[x_concious_bmu, y_concious_bmu]) if self.histories == True: self.frequency_matrix_history[:, :, i] = self.frequency_matrix self.bais_matrix_history[:, :, i] = self.bais_matrix self.learning_rate_history[i] = alpha self.learning_radius_history[i] = learning_radius x_min, x_max, y_min, y_max = self.compute_neighborhood(x_concious_bmu, y_concious_bmu, learning_radius) neighborhood_radius = self.neighborhood_function(int(x_concious_bmu), int(y_concious_bmu), i, learning_radius) self.weight_cube[x_min:x_max, y_min:y_max] += ( alpha * neighborhood_radius[x_min:x_max, y_min:y_max, np.newaxis] * (data[int(indecies[i])] - self.weight_cube[x_min:x_max, y_min:y_max])) # Make this into a function later on to reduce code duplication if self.weight_cube_save_states is not None: if i == self.weight_cube_save_states[counter]: self.som_save_state[counter,:,:,:] = self.weight_cube.copy() counter += 1
###### Everything bellow here is the old code #distances = (distances ** 2) - self.suppresion_matrix.reshape(self.suppresion_matrix.shape[0] # * self.suppresion_matrix.shape[1], -1) #w_neuron = np.argmin(distances, axis=0) #x_bmu, y_bmu = np.unravel_index(w_neuron, (self.x_dim, self.y_dim)) # recalculate winning neuron
[docs] def compute_bmu(self, data, indecies, iteration): distances = cdist(self.weight_cube.reshape(-1, self.weight_cube.shape[-1]), data[int(indecies[iteration])].reshape(1,self.input_dim), metric='euclidean') w_neuron = np.argmin(distances, axis=0) x_idx, y_idx = np.unravel_index(w_neuron, (self.x_dim, self.y_dim)) return x_idx, y_idx
[docs] def compute_bmu_cSOM(self, data, indecies, iteration, suppession_matrix): distances = cdist(self.weight_cube.reshape(-1, self.weight_cube.shape[-1]), data[int(indecies[iteration])].reshape(1,self.input_dim), metric='euclidean') # When plotting it looks like the suppresion matrix becomes negative # which does the opposite of baising the BMU. I will try to make it # positive to see what happens. distances = distances - suppession_matrix.reshape(suppession_matrix.shape[0] * suppession_matrix.shape[1], -1) w_neuron = np.argmin(distances, axis=0) x_idx, y_idx = np.unravel_index(w_neuron, (self.x_dim, self.y_dim)) assert len(x_idx) == 1 assert len(y_idx) == 1 return x_idx, y_idx
[docs] def neighborhood_function(self, x_bmu, y_bmu, iter, radius): """ Calculate the neighborhood function for the SOM. This function should decay with the distance from the BMU. """ x_min, x_max, y_min, y_max = self.compute_neighborhood(x_bmu, y_bmu, radius) # python ignores the last number in a range so we have to add 1 #x_max += 1 #y_max += 1 # Cant just add 1, breaks other things, just apply it to : in the array update_neighborhood = np.zeros((self.x_dim, self.y_dim)) # Simple neighborhood function #-=]\update_neighborhood[x_min:x_max, y_min:y_max] = 1 # Could maybe speed up with numba # Make tests to ensure BMU is ser to 1 and decays with distance if self.neighborhood_decay == "geometric_series": for i in range(x_min, x_max): for j in range(y_min, y_max): update_neighborhood[i, j] = 1 / 2 ** (max((np.abs(x_bmu - i)), np.abs(y_bmu - j) )) #np.exp(-np.linalg.norm([i-x_bmu, j-y_bmu]) / sigma) elif self.neighborhood_decay == "exponential": for i in range(x_min, x_max): for j in range(y_min, y_max): update_neighborhood[i, j] = np.exp(-np.linalg.norm([i-x_bmu, j-y_bmu]) ** 2 / (2 * radius ** 2)) elif self.neighborhood_decay == "none": update_neighborhood[x_min:x_max:, y_min: y_max] = 1 if self.histories == True: self.save_neighborhood_function[:,:,iter] = update_neighborhood self.track_mbu[:,iter] = [x_bmu, y_bmu] self.track_radius_limits[:, iter] = [x_min, x_max, y_min, y_max] return update_neighborhood
# Want to make it so the value is 1 at the BMU and decays with distance.
[docs] def compute_neighborhood(self, x_bmu, y_bmu, radius): """ Compute the neighborhood of the BMU. """ x_min = max(0, x_bmu - radius) x_max = min(self.x_dim, x_bmu + radius + 1) # removed -1 from self.x_dim never ran into error y_min = max(0, y_bmu - radius) y_max = min(self.y_dim, y_bmu + radius + 1) return int(x_min), int(x_max), int(y_min), int(y_max)
[docs] def decay_kohonen(self, i): """ Decides how the learning rate and other parameters will decrease over time ** Not in use yet ** """ if self.decay_type == "exponential": tao = self.n_iter / self.learning_parameters["max_radius"] sigma = int(self.learning_parameters["sigma"] * np.exp(-i / tao)) alpha = self.learning_parameters["alpha"] * np.exp(-i / tao) radius = math.ceil(self.learning_parameters["max_radius"] * np.exp(-i / tao)) elif self.decay_type == "linear": #tao = self.n_iter / self.learning_parameters["max_radius"] sigma = int(self.learning_parameters["sigma"] - self.learning_parameters["sigma"] / self.n_iter * i) alpha = self.learning_parameters["alpha"] - self.learning_parameters["alpha"] / self.n_iter * i radius = math.ceil(self.learning_parameters["max_radius"] - self.learning_parameters["max_radius"] / self.n_iter * i) elif self.decay_type == "schedule": # Need to check if the current time step is and pic the sigma from the schedule. current_schedule = np.sum(self.learning_parameters["time"] <= i) sigma = self.learning_parameters["sigma"][current_schedule] alpha = self.learning_parameters["alpha"][current_schedule] radius = self.learning_parameters["max_radius"][current_schedule] else: raise ValueError(f"Decay type {self.decay_type} is not supported. Choose from exponential, linear or schedule") return alpha, sigma, radius
[docs] def decay_cSOM(self, i): """ Decides how the learning rate and other parameters will decrease over time ** Not in use yet ** """ if self.decay_type == "exponential": tao = self.n_iter alpha = self.learning_parameters["alpha"][0] * np.exp(-i / tao) beta = self.learning_parameters["beta"][0] * np.exp(-i / tao) gamma = self.learning_parameters["gamma"][0] * np.exp(-i / tao) #radius = 1 elif self.decay_type == "linear": #tao = self.n_iter / self.learning_parameters["max_radius"] alpha = self.learning_parameters["alpha"][0] - self.learning_parameters["alpha"][0] / self.n_iter * i beta = self.learning_parameters["beta"][0] - self.learning_parameters["beta"][0] / self.n_iter * i gamma = self.learning_parameters["gamma"][0] - self.learning_parameters["gamma"][0] / self.n_iter * i #radius = math.ceil(self.learning_parameters["max_radius"] - self.learning_parameters["max_radius"] / self.n_iter * i) elif self.decay_type == "schedule": # Need to check if the current time step is and pic the sigma from the schedule. current_schedule = np.sum(self.learning_parameters["time"] <= i) alpha = self.learning_parameters["alpha"][current_schedule] beta = self.learning_parameters["beta"][current_schedule] gamma = self.learning_parameters["gamma"][current_schedule] else: raise ValueError(f"Decay type {self.decay_type} is not supported. Choose from exponential, linear or schedule") return alpha, beta, gamma
def _train_batch(self, data): """ Train the SOM in batch mode. This is making an assumption that the data is bigger than the itteration, this is not always true """ batches = math.ceil(self.n_iter/ len(data)) reminder = self.n_iter % len(data) indecies = np.zeros(self.n_iter) for batch in range(batches): if batch != batches - 1: indecies[batch*len(data): (1 + batch)*len(data)] = random.sample(list(np.arange(len(data))), len(data)) elif reminder != 0: indecies[(batch)*len(data):] = random.sample(list(np.arange(len(data))), reminder) return indecies def _train_online(self, data): """ Train the SOM in online mode. """ return random.choices(np.arange(len(data)), k=self.n_iter)
[docs] def weight_cube(self): """ Return the weight cube of the SOM. """ return self.weight_cube
[docs] def check_field_exists(structured_array: np.ndarray, field_name: str) -> bool: """ Check if a field exists in a structured array. """ return field_name in structured_array.dtype.names
[docs] def log_parameters(parameters: dict): """ Make a log of the parameters used to train the SOM and save this onto a file. """ raise NotImplementedError