Source code for qumas.LensmodelWrapper.lensmodel_reader


### all of this should be here 
[docs] def make_dictionary(list,prefix="p"): return {f"{prefix}[{i}]":float(list[i]) for i in range(len(list))}
[docs] def get_numeric_values(text): numeric_values=re.findall(r'[-+]?\d*\.\d+|\d+|\(?\s*[-+]?\d*\.?\d*e[+-]?\d*\s*\)?', text) return numeric_values
[docs] def get_images_result(list_lensmodel): images_result = {"ra_imput":[],"dec_imput":[],"radec_error":[],"flux_imput":[],"flux_error":[],"time_delay_imput":[],"time_delay_error":[],"ra_output":[],"dec_output":[],"flux_output":[],"time_delay_output":[]} for j,imagen in enumerate(list_lensmodel): imagen = imagen.replace("\n","").split("->") numeric_values = [i.replace(")","").replace("(","") for i in get_numeric_values(imagen[0])]+[float(i) for i in get_numeric_values(imagen[1])] for i,key in enumerate(images_result.keys()): images_result[key].append(float(numeric_values[i])) return images_result
[docs] def get_kappa_gamma(list_of_lines): kappa_gamma = {"index":[],"x":[],"y":[],"kappa":[],"gamma":[],"theta":[]} for j in range(2,len(list_of_lines)): linea = [i for i in list_of_lines[j].replace("\n","").split(" ") if i!=""] for i,key in enumerate(kappa_gamma.keys()): kappa_gamma[key].append(float(linea[i])) kappa_gamma = {key: np.array(value) for key, value in kappa_gamma.items()} kappa_gamma["magnification"] =((1-kappa_gamma['kappa'])**2 - kappa_gamma['gamma']**2)**-1 return {"kappa_gamma":kappa_gamma}
[docs] def get_result_lensmodel(list_of_lines): result = {} for n,line in enumerate(list_of_lines): if "LENS PARMS" in line: #what happend if i have 2 galaxies ? result["LENS PARMS"] ={} for i in range(1,10): if "alpha" in list_of_lines[n+i]: result["LENS PARMS"][f"alpha{i}"] = make_dictionary([i.replace("\n","") for i in list_of_lines[n+1].split(" ") if (len(i)>0 and i!='alpha')]) if "SOURCE PARMS" in line: result["SOURCE PARMS"] ={} for i in range(1,10): if "ptsrc" in list_of_lines[n+i]: result["SOURCE PARMS"][f"ptsrc{i}"] = make_dictionary([i.replace("\n","") for i in list_of_lines[n+1].split(" ") if (len(i)>0 and i!='ptsrc')],prefix="s") if "CHISQ" in line: chi_values,chi_columns = list_of_lines[n].split("#") result["CHISQ"]= {v: float(k) for k, v in zip(chi_values.replace("CHISQ:","").split(" ")[1:-1],chi_columns.replace("\n","").split(" ")[1:])} if "Source" in line: #should be updated for cases with more source result["Source"] = {i[0].replace(" ",""):i[1] for i in [list_of_lines[n+i].replace("\n","").split(":") for i in range(1,6)]} if "images" in line: result["images"] = get_images_result(list_of_lines[n:][1:-2]) return result
[docs] def get_critical_caustic(list_of_lines): return {"critical_caustic":{key:np.array([[i for i in i.split(" ")[1:] if len(i)>0] for i in [i.replace("\n","") for i in list_of_lines][1:] if len(i)>0],dtype="object").astype(float)[::,n] for n,key in enumerate(["x","y","u","v"])}}
[docs] def get_grid(list_of_lines): return {"grid":{key:np.array([[i for i in i.split(" ") if len(i)>0] for i in [i.replace("\n","") for i in list_of_lines][1:] if len(i)>0],dtype="object").astype(float)[::,n] for n,key in enumerate(["x","y","u","v"])}}
[docs] def get_RE(list_of_lines): re = {'RE':float(list_of_lines[0].split(" ")[0])} print(re) return re
[docs] def run_lensmodel(modeling_path,run_name): #run_name = "model_run" try: path_to_run = os.path.join(modeling_path,f"{run_name}.dat") where_we_are = os.getcwd() path_to_lensmodel = os.path.join(os.getcwd(),"time_scales/lensmodel/lensmodel") st = os.stat(path_to_lensmodel) # Get permissions and make the file executable os.chmod(path_to_lensmodel, st.st_mode | stat.S_IEXEC) # Change the current working directory to work_pad os.chdir(os.path.join(where_we_are,"time_scales/lensmodel")) with open(path_to_run, 'r') as f: proc = subprocess.Popen([path_to_lensmodel], stdin = f, stdout=subprocess.DEVNULL,stderr=subprocess.DEVNULL) proc.wait() os.chdir(where_we_are) except KeyboardInterrupt: os.chdir(where_we_are)
#print("Execution stopped by the user.")
[docs] def full_modeling_result(modeling_path,model_setup): model_result = {} for step in ["step_1","step_2","final_step","kappa_gamma","critical_caustic","grid","RE"]: #only step+"kappa_gamma","critical_caustic","grid" file_name = f"{step}.dat" if "step" in step: func = get_result_lensmodel if f"{step}.best" in os.listdir(modeling_path): file_name = f"{step}.best" elif step=="kappa_gamma": func = get_kappa_gamma elif step=="critical_caustic": func = get_critical_caustic elif step=="grid": func = get_grid elif step=="RE": func =get_RE if file_name not in os.listdir(modeling_path): continue file = open(os.path.join(modeling_path,file_name), "r") list_of_lines = file.readlines() model_result[step] = func(list_of_lines) [os.remove(os.path.join(modeling_path,i) )for i in os.listdir(modeling_path) if step in i] model_result["model_setup"] = {"model_setup":model_setup} return model_result
[docs] def save_dict_to_hdf5(dic, filename): with h5py.File(filename, 'w') as f: _save_dict_to_hdf5_recursive(f, '/', dic)
def _save_dict_to_hdf5_recursive(f, path, dic): for key, item in dic.items(): if isinstance(item, dict): _save_dict_to_hdf5_recursive(f, path + key + '/', item) else: f[path + key] = item
[docs] def read_hdf5(filename): with h5py.File(filename, 'r') as f: data = _read_hdf5_recursive(f, '/') return data
def _read_hdf5_recursive(f, path): data = {} for key, item in f[path].items(): if isinstance(item, h5py.Group): data[key] = _read_hdf5_recursive(f, path + key + '/') else: # Decode byte strings to regular strings if isinstance(item, h5py.Dataset) and isinstance(item[()], bytes): data[key] = item[()].decode('utf-8') # Specify the appropriate encoding elif isinstance(item, h5py.Dataset) and isinstance(item[()], np.ndarray): data[key] = [x if type(x)==np.float64 else x.decode('utf-8') for x in item[()] ] # Specify the appropriate encoding else: data[key] = item[()] return data
[docs] def compare_dicts(dict1, dict2): # Check if both dictionaries have the same keys if set(dict1.keys()) != set(dict2.keys()): return False # Check if the values for each key are the same false_key=[] for key in dict1.keys(): if np.all(dict1[key] != dict2[key]): false_key.append(key) if len(false_key)>0: #print(false_key) return False return True