Source code for gastop.genalg

"""genalg.py
This file is a part of GASTOp
Authors: Amlan Sinha, Cristian Lacey, Daniel Shaw, Paul Kaneelil, Rory Conlin, Susan Redmond
Licensed under GNU GPLv3.
This module implements the GenAlg class.

"""

import numpy as np
import json
from tqdm import tqdm
from multiprocessing import Pool
import copy
import colorama
from gastop import Truss, Mutator, Crossover, Selector, Evaluator, FitnessFunction, encoders, utilities, ProgMon
colorama.init()  # for progress bars on ms windows


[docs]class GenAlg(): """Creates, updates, tracks, loads, and saves populations. The GenAlg Class orchestrates all of the other functions that perform functions to change the population and its elements. In this case, such classes are crossover, evaluator, encoders, fitness, mutator, selector, and truss. In brief, GenAlg calls many other functions in order to create a generation which is then analyzed to fully determine its relavant properties. These properties are then used to create a new generation and the process repeats until a final solution is reached. """
[docs] def __init__(self, config): """Creates a GenAlg object Once created, the object will store all of the relavant information about a population. The object also contains the necessary functions to modify itself, evaluate its 'goodness', and then create new members for the next generation. Args: Either: config (dict): Configuration dictionary with parameters, such as one created by :meth:`gastop.utilities.init_file_parser` config (str): File path to config file to be parsed. Used instead of passing config dictionary directly. Returns: GenAlg callable object """ if isinstance(config, str): config = utilities.init_file_parser(config) self.config = config self.ga_params = config['ga_params'] self.mutator_params = config['mutator_params'] self.random_params = config['random_params'] self.crossover_params = config['crossover_params'] self.selector_params = config['selector_params'] self.population = [] self.evaluator = Evaluator(**config['evaluator_params']) self.fitness_function = FitnessFunction(**config['fitness_params']) # progress monitor stuff self.pop_progress = [] # initialize as empty array, DELETE? # self.progress_display = progress_display #type of progress display # [0,1,2] = [no display, terminal display, plot display], change to text later if isinstance(self.random_params['rng_seed'], tuple): np.random.set_state(self.random_params['rng_seed']) elif isinstance(self.random_params['rng_seed'], int): np.random.seed(self.random_params['rng_seed']) else: np.random.seed(1729)
[docs] def generate_random(self): # Dan '''Generates and returns new truss objects with random properties The random method first determines the desired ranges of all values that will be calculated. Then, random numbers for the node locations, connections, and properties are all determined with the numpy.random methods. Args: None Returns: (Truss object): Truss object with the newly determined values ''' # Generates new random chromosomes with uniform distribution num_rand_nodes = self.random_params['num_rand_nodes'] num_rand_edges = self.random_params['num_rand_edges'] user_spec_nodes = self.random_params['user_spec_nodes'] num_user_spec_nodes = user_spec_nodes.shape[0] domain = self.random_params['domain'] # First, generate the new nodes: new_nodes = np.random.uniform( domain[0], domain[1], (num_rand_nodes, 3)) # 2nd, generate the new edges between the nodes: new_edges = np.random.randint(num_rand_nodes + num_user_spec_nodes, size=(num_rand_edges, 2)) new_properties = np.random.randint(self.random_params['num_material_options'], size=(num_rand_edges)) return Truss(user_spec_nodes, new_nodes, new_edges, new_properties)
[docs] def initialize_population(self, pop_size=None): '''Initializes population with randomly creates Truss objects. Population is stored in instance of GenAlg object as population attribute. Args: pop_size (int): size of the population. If not specified, it defaults to what is in the config dict. Returns: None ''' if pop_size: self.ga_params['pop_size'] = pop_size else: pop_size = self.ga_params['pop_size'] self.population = [] for i in tqdm(range(pop_size), total=pop_size, leave=False, desc='Initializing Population', position=0): self.population.append(self.generate_random())
[docs] def run(self, num_generations=None, progress_fitness=None, progress_truss=None, num_threads=None): '''Runs the genetic algorithm over all populations and generations Args: num_generations (int): number of generations to be performed. progress_fitness (bool): Whether to show display window showing fitness score vs generation. progress_truss (bool): Whether to show display window showing truss evolution. num_threads (int): number of threads the multiprocessing should employ. If zero or None, it will use the number returned by ``os.cpu_count()``. Returns: 2-element tuple containing: - **best** *(Truss)*: The Truss with the best fitness score after elapsed generations. - **pop_progress** (dict): Dictionary of dictionaries containing: - ``'Generation 1'`` *(dict)*: Dictionary of info about generation 1. - ``'Generation 2'`` *(dict)*: Dictionary of info about generation 2, etc. ''' if num_threads is None: if self.ga_params['num_threads'] is None: num_threads = 1 else: num_threads = self.ga_params['num_threads'] if num_generations is None: num_generations = self.ga_params['num_generations'] else: self.ga_params['num_generations'] = num_generations if progress_fitness is None: progress_fitness = self.config['monitor_params']['progress_fitness'] if progress_truss is None: progress_truss = self.config['monitor_params']['progress_truss'] # Initialize progress monitor object progress = ProgMon(progress_fitness, progress_truss, num_generations, domain=self.random_params['domain'], loads=self.config['evaluator_params']['boundary_conditions']['loads'], fixtures=self.config['evaluator_params']['boundary_conditions']['fixtures']) # Set chunksize used in multithreading based on size of population if self.ga_params['pop_size'] < 1e4: chunksize = int(np.amax((self.ga_params['pop_size']/100, 1))) else: chunksize = int(np.sqrt(self.ga_params['pop_size'])) # Loop over all generations, updating progress bar with tqdm: for current_gen in tqdm(range(num_generations), desc='Overall', position=0): self.ga_params['current_generation'] = current_gen # No parallelization if num_threads == 1: for current_truss in tqdm(self.population, desc='Evaluating', position=1): self.evaluator(current_truss) for current_truss in tqdm(self.population, desc='Scoring', position=1): self.fitness_function(current_truss) # With multithreading else: with Pool(num_threads) as pool: self.population = list(tqdm(pool.imap( self.evaluator, self.population, chunksize), total=self.ga_params['pop_size'], desc='Evaluating', position=1)) self.population = list(tqdm(pool.imap( self.fitness_function, self.population, chunksize), total=self.ga_params['pop_size'], desc='Scoring', position=1)) # Sort population by fitness score (lowest score = most fit) self.population.sort(key=lambda x: x.fitness_score) # Update progress monitor plots progress.progress_monitor(current_gen, self.population) # Create next generation self.update_population() # Periodically save config and population to JSON files if self.ga_params['save_frequency'] != 0 and (current_gen % self.ga_params['save_frequency']) == 0: self.save_state( dest_config=self.ga_params['config_save_name'], dest_pop=self.ga_params['pop_save_name']) return self.population[0], progress.pop_progress
[docs] def save_state(self, dest_config='config.json', dest_pop='population.json'): # Cristian '''Saves the current population and config settings to JSON files. Args: dest_config (string): Path to save config data file. If file doesn't exist, creates it. dest_pop (string): Path to save population data file. If file doesn't exist, creates it. Returns: None ''' # Save rng_seed for reloading self.config['random_params']['rng_seed'] = np.random.get_state() config = self.config population = copy.deepcopy(self.population) # Save config data with open(dest_config, 'w') as f: config_dumped = json.dumps(config, cls=encoders.ConfigEncoder) json.dump(config_dumped, f) # Save population data with open(dest_pop, 'w') as f: pop_dumped = json.dumps(population, cls=encoders.PopulationEncoder) json.dump(pop_dumped, f)
[docs] @staticmethod def load_state(dest_config='config.json', dest_pop='population.json'): # Cristian '''Loads the current population and config settings from JSON files. Args: dest_config (string): Path to config data file. dest_pop (string): Path to population data file. Returns: ga (GenAlg object) ''' # Load config data with open(dest_config, 'r') as f: config_loaded = json.load(f) config = json.loads(config_loaded, object_hook=encoders.numpy_decoder) # Load population data with open(dest_pop, 'r') as f: pop_loaded = json.load(f) population = json.loads(pop_loaded, object_hook=encoders.numpy_decoder) population = (Truss(**dct) for dct in population) ga = GenAlg(config) ga.population = population return ga
[docs] def update_population(self): # Cristian ''' Creates new population by performing crossover and mutation, as well as taking elites and randomly generating trusses. First sorts the population by fitness score, from most fit to least fit. Creates selector object from population and method. Calls selector to get list of parents for crossover and mutation. Performs crossover and mutation. Args: None Returns: None ''' # Store parameters for readability population = self.population pop_size = self.ga_params['pop_size'] percent_crossover = self.ga_params['percent_crossover'] percent_mutation = self.ga_params['percent_mutation'] num_elite = self.ga_params['num_elite'] # Sort population by fitness score (lowest score = most fit) # population.sort(key=lambda x: x.fitness_score) #remove # Calculate parents needed for crossover, ensure even number num_crossover = round((pop_size-num_elite)*percent_crossover) if (num_crossover % 2) != 0: # If odd, decrement by 1 num_crossover -= 1 # Calculate parents needed for mutation num_mutation = round((pop_size-num_elite)*percent_mutation) # Calculate remaining number of trusses in next population num_random = pop_size - num_elite - num_crossover - num_mutation if num_random < 0: # Raise exception if input params are unreasonable raise RuntimeError('percent_crossover + percent_mutation > 1') # Instantiate objects selector = Selector(self.selector_params) crossover = Crossover(self.crossover_params) mutator = Mutator(self.mutator_params) # Select parents as indices in current population crossover_parents = selector(num_crossover, population) mutation_parents = selector(num_mutation, population) # Save most fit trusses as elites pop_elite = population[:num_elite] pbar = tqdm(total=(num_crossover+num_mutation+num_random), desc='Updating', position=1) # Portion of new population formed by crossover pop_crossover = [] for i in range(0, num_crossover, 2): parentindex1 = crossover_parents[i] parentindex2 = crossover_parents[i+1] parent1 = population[parentindex1] parent2 = population[parentindex2] child1, child2 = crossover(parent1, parent2) pop_crossover.extend((child1, child2)) pbar.update(2) # Portion of new population formed by mutation pop_mutation = [] for i in range(num_mutation): parentindex = mutation_parents[i] parent = population[parentindex] child = mutator(parent) pop_mutation.append(child) pbar.update() # Create new random trusses with remaining spots in generation pop_random = [] for i in range(num_random): pop_random.append(self.generate_random()) pbar.update() # Append separate lists to form new generation population = pop_elite + pop_crossover + pop_mutation + pop_random pbar.close() # Update population attribute self.population = population return