pygama.flow package#
Subpackage description
Submodules#
pygama.flow.data_loader module#
Routines for high-level data loading and skimming.
- class pygama.flow.data_loader.DataLoader(config: str | dict, filedb: str | dict | pygama.flow.file_db.FileDB, file_query: Optional[str] = None)#
Bases:
objectFacilitate loading of processed data across several tiers.
Where possible, uses a
FileDBobject so that a user can quickly select a subset of cycle files for interest, and access information at each processing tier.Example JSON configuration file:
{ "levels": { "hit": { "tiers": ["raw", "dsp", "hit"] }, "tcm": { "tiers": ["tcm"], "parent": "hit", "child": "evt", "tcm_cols": { "child_idx": "coin_idx", "parent_tb": "array_id", "parent_idx": "array_idx" } }, "evt": { "tiers": ["evt"] } }, "channel_map": {} }
Examples
>>> from pygama.flow import DataLoader >>> dl = DataLoader("loader-config.json", "filedb-config.json") >>> dl.set_files("file_status == 26 and timestamp == '20220716T130443Z'") >>> dl.set_datastreams([3, 6, 8], "ch") >>> dl.set_cuts({"hit": "daqenergy > 1000 and AoE > 3", "evt": "muon_veto == False"}) >>> dl.set_output(fmt="pd.DataFrame", columns=["daqenergy", "channel"]) >>> data = dl.load()
Advanced Usage:
>>> from pygama.flow import DataLoader >>> dl = DataLoader("loader-config.json", "filedb-config.json") >>> dl.set_files("all") >>> dl.set_datastreams([0], "ch") >>> dl.set_cuts({"hit": "wf_max > 30000"}) >>> el = dl.build_entry_list(tcm_level="tcm", mode="any") >>> el.query("hit_table == 20", inplace=True) >>> dl.set_output(fmt="pd.DataFrame", columns=["daqenergy", "channel"]) >>> data = dl.load(el)
- Parameters:
config (str | dict) – configuration dictionary or JSON file, see above for specifications.
filedb (str | dict | FileDB) –
the loader needs a file database. It can be specified in multiple ways:
an instance of
FileDB.an LH5 file containing a
FileDB(see alsoFileDB.to_disk()).a
FileDBconfiguration dictionary or JSON file.
file_query (str) – string query that should operate on columns of a
FileDB.
Note
No data is loaded in memory at this point.
- browse(query, dsp_config=None)#
Interface between DataLoader and WaveformBrowser.
- build_entry_list(tcm_level: Optional[str] = None, tcm_table: Optional[Union[int, str]] = None, mode: str = 'only', save_output_columns: bool = False, in_memory: bool = True, output_file: Optional[str] = None) dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None#
Applies cuts to the tables and files of interest.
Can only load up to two levels, those joined by tcm_level.
- Parameters:
tcm_level (Optional[str]) – the type of TCM to be used. If
None, will only return information from lowest level.tcm_table (Optional[Union[int, str]]) – the identifier of the table inside this TCM level that you want to use. If unspecified, there must only be one table inside a TCM file in tcm_level.
mode (str) – if
any, returns every hit in the event if any hit in the event passes the cuts. Ifonly, only returns hits that pass the cuts.save_output_columns (bool) – if
True, saves any columns needed for both the cut and the output to the self.entry_list.in_memory (bool) – if
True, returns the generated entry list in memory.output_file (Optional[str]) – HDF5 file name to write the entry list to.
- Returns:
entries – the entry list containing columns for
{parent}_idx,{parent}_table,{child}_idxand output columns if applicable. Only returned if in_memory isTrue.- Return type:
dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None
Note
Does not load the column information into memory. This is done by
load().
- build_hit_entries(save_output_columns: bool = False, in_memory: bool = True, output_file: Optional[str] = None) dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None#
Called by
build_entry_list()to handle the case when tcm_level is unspecified.Ignores any cuts set on levels above lowest level.
- Parameters:
- Returns:
entries – the entry list containing columns for
{low_level}_idx,{low_level}_table, and output columns if applicable. Only returned if in_memory isTrue.- Return type:
dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None
- get_file_list() DataFrame#
Returns a copy of the file database with its dataframe pared down to the current file list.
- Return type:
- get_tiers_for_col(columns: list | numpy.ndarray, merge_files: Optional[bool] = None) dict#
For each column given, get the tiers and tables in that tier where that column can be found.
- Parameters:
columns (list | numpy.ndarray) – the columns to look for.
- Returns:
col_tiers –
col_tiers[file]["tables"][tier]gives a list of tables intierthat contain a column of interest.col_tiers[file]["columns"][column]gives the tier thatcolumncan be found in. If self.merge_file`s then ` col_tiers[tier]` is a list of tables intierthat contain a column of interest.- Return type:
- load(entry_list: Optional[DataFrame] = None, in_memory: bool = True, output_file: Optional[str] = None, orientation: str = 'hit', tcm_level: Optional[str] = None) None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame#
Loads the requested columns in self.output_columns for the entries in the given entry_list.
- Parameters:
entry_list (Optional[DataFrame]) – the output of
build_entry_list().in_memory (bool) – if
True, returns the loaded data in memory and stores in self.data.output_file (Optional[str]) – if not
None, writes the loaded data to the specified file.orientation (str) – specifies the orientation of the output table. Can be
hitorevt.tcm_level (Optional[str]) – which TCM was used to create the
entry_list.
- Returns:
data – The data loaded from disk, as specified by self.output_format, self.output_columns, and self.merge_files. Only returned if in_memory is
True.- Return type:
None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame
- load_cal_pars(query)#
access the cal_pars parameter database, run a query, and return some tables.
- load_detector(det_id)#
special version of load designed to retrieve all file files, tables, column names, and potentially calibration/dsp parameters relevant to one single detector.
- load_dsp_pars(query)#
access the dsp_pars parameter database (probably JSON format) and do some kind of query to retrieve parameters of interest for our file list, and return some tables.
- load_evts(entry_list: Optional[DataFrame] = None, in_memory: bool = False, output_file: Optional[str] = None, tcm_level: Optional[str] = None) None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame#
Called by
load()when orientation isevt.- Return type:
None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame
- load_hits(entry_list: DataFrame, in_memory: bool = False, output_file: Optional[str] = None, tcm_level: Optional[str] = None) None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame#
Called by
load()when orientation ishit.- Return type:
None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame
- load_settings()#
get metadata stored in raw files, usually from a DAQ machine.
- reset()#
Resets all fields to their default values, as if this is a newly created data loader.
- set_cuts(cuts: dict | list) None#
Apply a selection on columns in the data tables.
- Parameters:
cut – the cuts on the columns of the data table, e.g.
trapEftp_cal > 1000. If passing a dictionary, the dictionary should be structured asdict[tier] = cut_expr. If passing a list, each item in the array should be able to be applied on one level of tables. The cuts at different levels will be joined with an AND.
Example
>>> dl.set_cuts({"raw": "daqenergy > 1000", "hit": "AoE > 3"})
- set_datastreams(ds: list | tuple | numpy.ndarray, word: str) None#
Apply selection on data streams (or channels).
Sets self.table_list.
- Parameters:
ds (list | tuple | numpy.ndarray) – identifies the detectors of interest. Can be a list of detector names, serial numbers, or channels or a list of subsystems of interest e.g.
ged.word (str) – the type of identifier used in ds. Should be a key in the given channel map or a word defined in the configuration file.
Example
>>> dl.set_datastreams(np.arange(40, 45), "ch")
- set_files(query: str | list[str]) None#
Apply a file selection.
Sets self.file_list, which is a list of indices corresponding to the rows in the file database.
- Parameters:
query (str | list[str]) – if single string, defines an operation on the file database columns supported by
pandas.DataFrame.query(). In addition, theallkeyword is supported to select all files in the database. If list of strings, will be interpreted as key (cycle timestamp) list.
Note
Call this function before any other operation. A second call to
set_files()does not replace the current file list, which gets instead integrated with the new list. Usereset()to reset the file query.Example
>>> dl.set_files("file_status == 26 and timestamp == '20220716T130443Z'")
- pygama.flow.data_loader.iskeyword()#
x.__contains__(y) <==> y in x.
pygama.flow.datagroup module#
- class pygama.flow.datagroup.DataGroup(config=None, nfiles=None, load=False)#
Bases:
objectA class to create an in-memory or on-disk set of files, according to the LEGEND data convention. Typically requires a JSON config file with:
path to DAQ and LH5 directories
format strings for daq/lh5 files
partitions for the LH5 data directory
DOCME
- get_lh5_cols()#
compute the LH5 filenames.
need to generate the file names, and then figure out which folder to store them in. probably best to separate these tasks
- lh5_dir_setup(user_dir=False)#
generate paths to LH5 data directories, using self.lh5_dir if user_dir is True, create them in self.lh5_user instead.
- load_df(fname=None)#
DOCME
- load_keys(fname=None)#
load a list of file keys and parse data into columns according to the format string
- save_df(fname=None)#
save the current self.fileDB dataframe. If we’ve added extra columns specific to an experiment (outside this class), this will preserve them.
- save_keys(fname=None)#
default: save the unique_key and the relative path to the DAQ file, as a CSV file. this will probably change in the future, but at least this way we can:
easily get a list of available DAQ files
regenerate the DataFrame from scan_daq_dir by parsing format string
- scan_daq_dir(verbose=False)#
scan the DAQ directory and build a DataFrame of file keys. don’t make any experiment-specific choices here.
- set_config(config)#
DOCME
pygama.flow.file_db module#
Utilities for LH5 file inventory.
- class pygama.flow.file_db.FileDB(config: str | dict, scan: bool = True)#
Bases:
objectLH5 file database.
A class containing a
pandas.DataFramethat has additional functions to scan the data directory, fill the dataframe’s columns with information about each file, and read or write to disk in an LGDO format.The database contains the following columns:
file keys: the fields specified in the configuration file’s
file_formatthat are required to generate a file name e.g.run,type,timestampetc.{tier}_file: generated file name for the tier.{tier}_size: size of file on disk, if applicable.file_status: contains a bit corresponding to whether or not a file for each tier exists for a given cycle e.g. If we have tiers raw, dsp, and hit, but only the raw file has been produced,file_statuswould be0b100.{tier}_tables: available data streams (channels) in the tier.{tier}_col_idx:file_db.columns[{tier}_col_idx]will return the list of columns available in the tier’s file.
The database must be configured by a JSON file (or corresponding dictionary), which defines the data file names, paths and LH5 layout. For example:
{ "data_dir": "prod-ref-l200/generated/tier", "tier_dirs": { "raw": "/raw", "dsp": "/dsp", "hit": "/hit", "tcm": "/tcm", "evt": "/evt" }, "file_format": { "raw": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_raw.lh5", "dsp": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_dsp.lh5", "hit": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_hit.lh5", "evt": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_evt.lh5", "tcm": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_tcm.lh5" }, "table_format": { "raw": "ch{ch:03d}/raw", "dsp": "ch{ch:03d}/dsp", "hit": "{ch}/hit", "evt": "{grp}/evt", "tcm": "hardware_tcm" }, "tables": { "raw": [0, 1, 2, 4, 5, 6, 7], "dsp": [0, 1, 2, 4, 5, 6, 7], "hit": [0, 1, 2, 4, 5, 6, 7], "tcm": [""], "evt": [""] }, "columns": { "raw": ["baseline", "waveform", "daqenergy"], "dsp": ["trapEftp", "AoE", "trapEmax"], "hit": ["trapEftp_cal", "trapEmax_cal"], "tcm": ["cumulative_length", "array_id", "array_idx"], "evt": ["lar_veto", "muon_veto", "ge_mult"] } }
FileDBobjects can be also stored on disk and read-in at later times.Examples
>>> from pygama.flow import FileDB >>> db = FileDB("./filedb_config.json") >>> db.get_tables_columns() # read in also table columns names >>> print(db) << Columns >> [['baseline', 'card', 'ch_orca', 'channel', 'crate', 'daqenergy', 'deadtime', 'dr_maxticks', 'dr_start_pps', 'dr_start_ticks', 'dr_stop_pps', 'dr_stop_ticks', 'eventnumber', 'fcid', 'numtraces', 'packet_id', 'runtime', 'timestamp', 'to_abs_mu_usec', 'to_dt_mu_usec', 'to_master_sec', 'to_mu_sec', 'to_mu_usec', 'to_start_sec', 'to_start_usec', 'tracelist', 'ts_maxticks', 'ts_pps', 'ts_ticks', 'waveform'], ['bl_intercept', 'bl_mean', 'bl_slope', 'bl_std', 'tail_slope', 'tail_std', 'wf_blsub'], ['array_id', 'array_idx', 'cumulative_length']] << DataFrame >> exp period run timestamp type ... hit_col_idx tcm_tables tcm_col_idx evt_tables evt_col_idx 0 l60 p01 r014 20220716T105236Z cal ... None [] [2] None None 1 l60 p01 r014 20220716T104550Z cal ... None [] [2] None None >>> db.to_disk("file_db.lh5")
- Parameters:
config (str | dict) – dictionary or path to JSON file specifying data directories, tiers, and file name templates. Can also be path to existing LH5 file containing
FileDBobject serialized byto_disk().scan (bool) – whether the file database should scan the directory containing raw files to fill its rows with file keys.
- from_disk(filename: str) None#
Fills the dataframe (and configuration dictionary) with the information from a file created by
to_disk().
- scan_daq_files(daq_dir: str, daq_template: str) None#
Does the exact same thing as
scan_files()but with extra configuration arguments for a DAQ directory and template instead of using the lowest tier.
- scan_files() None#
Scan the directory containing files from the lower tier and fill the dataframe.
The lower tier is defined as the first element of the tiers array. Only fills columns that can be populated with just the raw files.
- scan_tables_columns(to_file: Optional[str] = None, override: bool = False) list[str]#
Open files in the database to read (and store) available tables (and columns therein) names.
Adds the available table names in each tier as a column in the dataframe by searching for group names that match the configured
table_formatand saving the associated keyword values.Returns a list with each unique list of columns found in each table and adds a column
{tier}_col_idxto the dataframe that maps to the column table.
- set_config(config: dict, config_path: Optional[str] = None) None#
Read in the configuration dictionary.
- set_file_sizes() None#
Add columns (for each tier) to the database containing the corresponding file size in bytes as reported by
os.path.getsize().
- pygama.flow.file_db.to_datetime(key: str) datetime#
Convert LEGEND cycle key to
datetime.Assumes key is formatted as
YYYYMMDDTHHMMSSZ(UTC).- Return type:
- pygama.flow.file_db.to_unixtime(key: str) int#
Convert LEGEND cycle key to POSIX timestamp.
- Return type: