Source code for qumas.census

import glob
import numpy as np
import pandas as pd
from .LensmodelWrapper.utils import pandas_to_model
import os 

module_dir = os.path.dirname(os.path.abspath(__file__))
#list with the info for the different filters, this can be add to LensedQsoCensus

#filter_info =  pd.read_csv(f"{module_dir}/tables/filter_unique.csv")

[docs] class LensedQsoCensus: def __init__(self, path="Tables/photometry"): self.path_lens_tables = glob.glob(f"{module_dir}/{path}/*") #print(self.path_lens_tables) self.lens_census = self._do_full_lens_census() self.lens_census["year"] = [i[0:4] for i in self.lens_census["Bibcode"]] self.points = self.lens_census[["ra", "dec"]].values self.distances = np.linalg.norm(self.points[:, np.newaxis] - self.points, axis=2) def _do_full_lens_census(self): # Concatenate all tables into a single DataFrame return pd.concat( [pd.read_csv(file).assign(file=file) for file in self.path_lens_tables] ).reset_index(drop=True)
[docs] def look_for_a_system(self, word, column="name", separation_limit=0.1): # Create a mask for non-NaN values and those containing the word #TODO add a filter for the columns i want to see mask = self.lens_census[column].notna() & self.lens_census[column].astype("str").str.contains(str(word), regex=False) pandas_obj = self.lens_census[mask] if pandas_obj.empty: raise ValueError("Object not found in census.") # Calculate distance-based mask if needed if column in ["name", "z_s"]: idx = pandas_obj.index[0] close_mask = self.distances[idx] < separation_limit mask |= close_mask # Filter the data based on the combined mask filtered_data = self.lens_census[mask].dropna(axis=1, how='all') return filtered_data
[docs] def unique_systems_count(self,get_unique_names=False,get_unique_years=False): # Initialize a mask for visited systems visited = np.zeros(len(self.lens_census), dtype=bool) count = 0 names = [] years = [] # Vectorized approach to count unique systems for idx in range(len(self.lens_census)): if not visited[idx]: # Mark all systems within the separation limit as visited close_mask = self.distances[idx] < 0.1 visited |= close_mask count += 1 if get_unique_names: names.append(self.lens_census.name[close_mask].drop_duplicates().values[0]) if get_unique_years: years.append([self.lens_census.name[close_mask].drop_duplicates().values[0],min(self.lens_census.year[close_mask].drop_duplicates().values.astype(int))]) if get_unique_names: return names if get_unique_years: return years return count
[docs] def hierarchical_selection(self,name): system = self.look_for_a_system(name).sort_values("year", ascending=False) pandas_s = np.array([[i,len(system[system["Bibcode"]==i])] for i in system.Bibcode.drop_duplicates().values]) n_ = np.argmax(pandas_s[:,1].astype(int)) data_to_model = system[system["Bibcode"]==system.Bibcode.drop_duplicates().values[n_]].dropna(axis=1, how='all').copy() if 'z_l' in system.columns and 'Bibcode' in system.columns: zl = [[zl, bibcode] for zl, bibcode in system[["z_l", "Bibcode"]].drop_duplicates().values if not pd.isnull(zl)][0] data_to_model[["z_l","Bibcode_zl"]] = [zl]* len(data_to_model) # Process the source redshift 'z_s' if 'z_s' in system.columns and 'Bibcode' in system.columns: zs = [[zs, bibcode] for zs, bibcode in system[["z_s", "Bibcode"]].drop_duplicates().values if not pd.isnull(zs)][0] data_to_model[["z_s","Bibcode_zs"]] = [zs] * len(data_to_model) data_to_model["known_names"] = [system.name.drop_duplicates().values]* len(data_to_model) data_to_model["can_be_modeled"] = [any([(("band" in col) and ("ima" in data_to_model["IS"].values)) for col in data_to_model.columns])] * len(data_to_model) return data_to_model
# def pandas_for_model(self): # pandas_to_be_use_in_model = pd.concat([pandas_to_model(self.hierarchical_selection(system_name)) for system_name in self.unique_systems_count(get_unique_names=True)] # ).reset_index(drop=True) # pandas_to_be_use_in_model['total_lens'] = pandas_to_be_use_in_model['total_lens'].fillna(0) # return pandas_to_be_use_in_model