Source code for CASBI.preprocessing
import numpy as np
import pandas as pd
import pynbody as pb
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Pool, cpu_count
import logging
import sys
import os
import re
from tqdm import tqdm
# Configure logging to suppress messages from pynbody
logging.getLogger('pynbody').setLevel(logging.CRITICAL)
"""
===========================================================================
GENERATION OF THE FILEs OF OBSERVATIONS AND PARAMETERS FOR THE TRAINING SET
===========================================================================
Functions to extract the parameters and observables from the simulation snapshots and save them in .npz files.
"""
[docs]
def extract_parameter_array(sim_path='str', file_path='str', position_flag=False) -> None:
"""
Extract the parameters and observables from the path. Checks all the possible errors and if one is found it is saved as an 'error_file'.
If no stars were formed in the snapshot, the function dosen't save any file. Two .npz files are returned, one with the parameters and another with the observables.
In order to load the parameters values use the common way of accessing numpy array in .npz file, for example: np.load('file.npz')['star_mass'].
The parameters that are extracted are: star_mass, infall_time.
The observables that are extracted are: [Fe/H], [O/Fe], refered to as 'feh' and 'ofe'.
Parameters
----------
sim_path : str
Path to the simulation snapshot. The path should end with 'simulation_name.snapshot_number' and it is used to create the name of the .npz files.
file_path : str
Path to the folder where the file will be saved. The file is a .npz file with parameters and observables stored in it.
position_flag : bool
flag to save the positions of the stars in the snapshot. Default is False.
Returns
-------
file : .npz array
The file is save in the folder '/file_path/name_file_parameters.npz'.
The parameters are:
file['star_mass'] : float
Total mass of the formed stars in the snapshot
file['infall_time'] : float
Time at which the snapshot was taken in Gyr
file['position'] : array
Array with the positions of the formed stars in the snapshot
The observables are:
file['feh'] : np.array
Array with the [Fe/H] of the formed stars in the snapshot
file['ofe'] : np.array
Array with the [O/Fe] of the formed stars in the snapshot
"""
#extract the name of the simulation+snapshot_number to create the name of the files to save
regex = r'[^/]+$'
name_file = re.search(regex, sim_path).group()
# Redirect stderr to suppress error messages
original_stderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
error_dataframe = pd.DataFrame(columns=['Galaxy_name', 'error'])
try:
#check if the file can be loaded
sim = pb.load(sim_path)
sim.physical_units()
except:
error_dataframe.loc[len(error_dataframe)] = {'Galaxy_name': name_file, 'error': 'load'}
np.savez(file=os.path.join(file_path, name_file+'_load_error.npz'), emppty=np.array([0]))
else:
try:
#check if the halos can be loaded
h = sim.halos()
h_1 = h[1]
pb.analysis.angmom.faceon(h_1)
except:
error_dataframe.loc[len(error_dataframe)] = {'Galaxy_name': name_file, 'error': 'halos'}
np.savez(file=file_path + name_file + '_halos_error.npz', emppty=np.array([0]))
print('halo error')
else:
try:
mass = h_1.s['mass']
except:
error_dataframe.loc[len(error_dataframe)] = {'Galaxy_name': name_file, 'error': 'mass'}
np.savez(file=os.path.join(file_path, name_file+'_mass_error.npz'), emppty=np.array([0]))
print('mass error')
else:
#check if the simualtion has formed stars
if len(h_1.s['mass']) > 0:
file_name = file_path + name_file + '.npz'
#PARAMETERS
star_mass = np.array(h_1.s['mass'].sum()) #in Msol
infall_time = np.array(h_1.properties['time'].in_units('Gyr'))
try:
#check if the [Fe/H] and [O/Fe] can be extracted
feh = h_1.s['feh']
ofe = h_1.s['ofe']
except:
error_dataframe.loc[len(error_dataframe)] = {'Galaxy_name': name_file, 'error': 'chemical'}
np.savez(file=os.path.join(file_path, name_file+'_FeO_error.npz'), emppty=np.array([0]))
print('chemistry error')
else:
if position_flag == False:
np.savez(file=file_name,
feh=feh,
ofe=ofe,
star_mass=star_mass,
infall_time=infall_time,
Galaxy_name=name_file,
)
else:
position = np.array(h_1.s['pos'].in_units('kpc'))
np.savez(file=file_name,
feh=feh,
ofe=ofe,
star_mass=star_mass,
infall_time=infall_time,
Galaxy_name=name_file,
position=position
)
else:
error_dataframe.loc[len(error_dataframe)] = {'Galaxy_name': name_file, 'error': 'no_stars'}
print('Not formed stars yet')
finally:
# Restore stderr
sys.stderr.close()
sys.stderr = original_stderr
return error_dataframe
[docs]
def gen_files(sim_path: str, file_path: str, position_flag=False) -> None:
"""
Generate the parameter and observable files for all the given paths, and save them in the 2 separate folders for parameters and observables.
It is suggested to use the glob library to get all the paths of the snapshots in the simulation like: path = glob.glob('storage/g?.??e??/g?.??e??.0????')
Saves also a dataframe with the errors that occurred during the extraction of the parameters and observables, in the same directory as the files.
Parameters
----------
sim_path : str
Path to the simulation snapshots. The path should end with 'simulation_name.snapshot_number' and it is used to create the name of the .npz files.
file_path : str
Path to the folder where the files will be saved.
Returns
-------
None
"""
with Pool() as pool:
df_list = pool.starmap(extract_parameter_array, zip(sim_path, [file_path]*len(sim_path), [position_flag]*len(sim_path)))
error_dataframe = pd.concat(df_list, ignore_index=True)
error_dataframe.to_parquet(os.path.join(file_path, 'error_dataframe.parquet'))
"""
===========================================================================
GENERATION DATAFRAME
===========================================================================
The dataframe with information on parameters and observables for all the galaxy available in the simulation.
The preprocess is used to cut numerical errors and outliers (especially in the chemical plane)
"""
[docs]
def preprocess(file_dir:str, preprocess_dir:str) -> None:
"""
Save the necessary files to preprocess the data for the training set. It saves aggregated information of Galaxy Mass, Number of stars, [Fe/H] and [O/Fe] in the preprocess_dir.
so that percentile cut can be computed in gen_dataframe funciton
Parameters
----------
file_dir : str
Path to the folder where the files with the parameters and observables are saved.
preprocess_dir : str
Path to the folder where the preprocess information will be saved.
Returns
-------
preprocess_file_path: str
Path to the file with the preprocess information.
"""
Galaxy_Mass = []
Galaxy_infall_time = []
FeH = []
OFe = []
for galaxy in tqdm(os.listdir(file_dir)):
if not("error" in galaxy):
path = os.path.join(file_dir,galaxy)
mass = np.load(path)['star_mass']
time = np.load(path)['infall_time']
Galaxy_Mass.append(float(mass))
Galaxy_infall_time.append(float(time))
#we need a general pictures of the entire distribution of [Fe/H] and [O/Fe] to cut the outliers
feh = np.load(path)['feh']
ofe = np.load(path)['ofe']
for f, o in zip(feh, ofe):
FeH.append(f)
OFe.append(o)
Galaxy_Mass = np.array(Galaxy_Mass)
Galaxy_infall_time = np.array(Galaxy_infall_time)
np.savez(file=os.path.join(preprocess_dir, 'preprocess_file'), Galaxy_Mass=Galaxy_Mass, Galaxy_infall_time=Galaxy_infall_time, FeH=FeH, OFe=OFe)
return f'{preprocess_dir}preprocess_file.npz'
[docs]
def load_data(file_path):
"""
Load the data from the file_path and return a pandas dataframe with the data. This function is then distributed in CASBI.preprocessing.gen_dataframe function
Parameters
----------
file_path : str
Path to the file with the parameters and observables.
Returns
-------
df_temp : pandas.DataFrame
The dataframe with the data from the file_path.
"""
properties = ['star_mass', 'infall_time', 'Galaxy_name', 'max_feh', 'max_ofe']
data = [np.load(file_path)[prop].item() for prop in properties[:3]]
#get the maximum of feh and ofe
data.append(np.load(file_path)['feh'].max())
data.append(np.load(file_path)['ofe'].max())
df_temp = pd.DataFrame(columns = properties)
df_temp.loc[0] = data
return df_temp
[docs]
def gen_dataframe(file_dir: str, dataframe_path: str) -> None:
"""
Genereate the dataframe used for the sampling process in the CASBI.template_library class
Parameters
----------
file_dir : str
Path to the folder where the files with the parameters and observables are saved.
dataframe_path : str
Path to the folder where the dataframe will be saved
Returns
-------
df : pandas.DataFrame
The dataframe with the data from the file_dir.
"""
#access all the file created by preprocessing.gen_files
all_files = sorted(os.listdir(file_dir))
regex = r'^(?!.*error)'
file_path = [os.path.join(file_dir,path) for path in all_files if re.search(regex, path)]
#distributed the data access
with Pool(processes=100) as pool:
df_list = pool.map(load_data, file_path)
#concatenate the dataframes
df = pd.concat(df_list, ignore_index=True)
#save the dataframe
df.to_parquet(os.path.join(dataframe_path, 'dataframe.parquet'))
return df