pygama.flow package¶
High-level data flow handling routines.
Submodules¶
pygama.flow.data_loader module¶
Routines for high-level data loading and skimming.
- class pygama.flow.data_loader.DataLoader(config, filedb=None, file_query=None)¶
Bases:
objectFacilitate loading of processed data across several tiers.
Where possible, uses a
FileDBobject so that a user can quickly select a subset of cycle files for interest, and access information at each processing tier.Example JSON configuration file:
{ "filedb": "path/to/filedb.h5" "levels": { "hit": { "tiers": ["raw", "dsp", "hit"] }, "tcm": { "tiers": ["tcm"], "parent": "hit", "child": "evt", "tcm_cols": { "child_idx": "coin_idx", "parent_tb": "table_key", "parent_idx": "row_in_table" } }, "evt": { "tiers": ["evt"] } } }
Examples
>>> from pygama.flow import DataLoader >>> dl = DataLoader("loader-config.json") >>> dl.set_files("file_status == 26 and timestamp == '20220716T130443Z'") >>> dl.set_datastreams([3, 6, 8], "ch") >>> dl.set_cuts({"hit": "daqenergy > 1000 and AoE > 3", "evt": "muon_veto == False"}) >>> dl.set_output(fmt="pd.DataFrame", columns=["daqenergy", "channel"]) >>> data = dl.load()
Be careful,
load()loads data in memory regardless of its size. If loading a lot of data (e.g. waveforms), you might want to do it in chunks.next()does exactly this:>>> for chunk in dl.load(): ... run_my_processing(chunk)
Advanced Usage:
>>> from pygama.flow import DataLoader >>> dl = DataLoader("loader-config.json", filedb="filedb-config.json") # or any value accepted by the FileDB constructor >>> dl.set_files("all") >>> dl.set_datastreams([0], "ch") >>> dl.set_cuts({"hit": "wf_max > 30000"}) >>> el = dl.build_entry_list(tcm_level="tcm", mode="any") >>> el.query("hit_table == 20", inplace=True) >>> dl.set_output(fmt="pd.DataFrame", columns=["daqenergy", "channel"]) >>> data = dl.load(el)
- Parameters:
configuration dictionary or JSON file, see above for a specification. Accepts strings in the following format:
path/to/config.json[field1/field2/...]]
to specify the location of the
DataLoaderconfiguration in theconfig.jsondictionary, if not at the first level.filedb (str | dict | FileDB) –
the loader needs a file database. It can be specified in multiple ways:
an instance of
FileDB.an LH5 file containing a
FileDB(see alsoFileDB.to_disk()).a
FileDBconfiguration dictionary or JSON file.
If
None, uses the value of thefiledbkey in config to instantiate aFileDBobject.file_query (str) – string query that should operate on columns of a
FileDB.
Note
No data is loaded in memory at this point.
- browse(entry_list=None, buffer_len=128, **kwargs)¶
Return a
WaveformBrowserobject for waveform inspection.- Parameters:
entry_list (DataFrame | None) – the output of
build_entry_list(). IfNone, builds it according to the current configuration.buffer_len (int) – number of waveforms to keep in memory at a time.
**kwargs – keyword arguments forwarded to
WaveformBrowser.
- Return type:
See also
WaveformBrowser
- build_entry_list(tcm_level=None, tcm_table=None, mode='only', save_output_columns=False, in_memory=True, output_file=None)¶
Applies cuts to the tables and files of interest.
Can only load up to two levels, those joined by tcm_level.
- Parameters:
tcm_level (str | None) – the type of TCM to be used. If
None, will only return information from lowest level.tcm_table (int | str | None) – the identifier of the table inside this TCM level that you want to use. If unspecified, there must only be one table inside a TCM file in tcm_level.
mode (str) – if
any, returns every hit in the event if any hit in the event passes the cuts. Ifonly, only returns hits that pass the cuts.save_output_columns (bool) – if
True, saves any columns needed for both the cut and the output to the self.entry_list.in_memory (bool) – if
True, returns the generated entry list in memory.output_file (str | None) – HDF5 file name to write the entry list to.
- Returns:
entries – the entry list containing columns for
{parent}_idx,{parent}_table,{child}_idxand output columns if applicable. Only returned if in_memory isTrue.- Return type:
Note
Does not load the column information into memory. This is done by
load().
- build_hit_entries(save_output_columns=False, in_memory=True, output_file=None)¶
Called by
build_entry_list()to handle the case when tcm_level is unspecified.Ignores any cuts set on levels above lowest level.
- Parameters:
- Returns:
entries – the entry list containing columns for
{low_level}_idx,{low_level}_table, and output columns if applicable. Only returned if in_memory isTrue.- Return type:
- get_file_list()¶
Returns a copy of the file database with its dataframe pared down to the current file list.
- Return type:
- get_tiers_for_col(columns, merge_files=None)¶
For each column given, get the tiers and tables in that tier where that column can be found.
- Parameters:
- Returns:
col_tiers –
col_tiers[file]["tables"][tier]gives a list of tables intierthat contain a column of interest.col_tiers[file]["columns"][column]gives the tier thatcolumncan be found in. If self.merge_file`s then ` col_tiers[tier]` is a list of tables intierthat contain a column of interest.- Return type:
- load(entry_list=None, in_memory=True, output_file=None, orientation='hit', tcm_level=None)¶
Loads the requested data from disk.
Loads the requested columns in self.output_columns for the entries in the given entry_list.
- Parameters:
entry_list (DataFrame | None) – the output of
build_entry_list(). IfNone, builds it according to the current configuration.in_memory (bool) – if
True, returns the loaded data in memory and stores in self.data.output_file (str | None) – if not
None, writes the loaded data to the specified file.orientation (str) – specifies the orientation of the output table. Can be
hitorevt.tcm_level (str | None) – which TCM was used to create the
entry_list.
- Returns:
data – The data loaded from disk, as specified by self.output_format, self.output_columns, and self.merge_files. Only returned if in_memory is
True.- Return type:
- load_cal_pars(query)¶
access the cal_pars parameter database, run a query, and return some tables.
- load_detector(det_id)¶
special version of load designed to retrieve all file files, tables, column names, and potentially calibration/dsp parameters relevant to one single detector.
- load_dsp_pars(query)¶
access the dsp_pars parameter database (probably JSON format) and do some kind of query to retrieve parameters of interest for our file list, and return some tables.
- load_evts(entry_list=None, in_memory=False, output_file=None, tcm_level=None)¶
Called by
load()when orientation isevt.
- load_hits(entry_list, in_memory=False, output_file=None, tcm_level=None)¶
Called by
load()when orientation ishit.
- load_iterator(entry_list=None, tcm_level=None, buffer_len=3200)¶
Creates an :class:LH5Iterator that will load the requested columns in self.output_columns for the entries in the given entry_list in chunks. This is more memory efficient than filling a whole table and is recommended for use when loading waveforms.
- Parameters:
entry_list (DataFrame | None) – the output of
build_entry_list(). IfNone, builds it according to the current configuration.tcm_level (str | None) – which TCM was used to create the
entry_list.buffer_len (int) – how many entries to load in a single chunk
- Returns:
data – LH5 Iterator, which yields (lh5 table, entry, n_entries) when iterated over.
- Return type:
- load_settings()¶
get metadata stored in raw files, usually from a DAQ machine.
- next(entry_list=None, chunk_size=10000, **kwargs)¶
Loads the requested data from disk in chunks.
This method should be used instead of
load()to handle large data sets.Note
It is a user responsibility to optimize the chunk size in order to achieve best performance.
- Parameters:
- Returns:
data – see
load().- Return type:
Examples
>>> for chunk in dl.next(): >>> # 'chunk' has the same type of the output of dl.load()
See also
- reset()¶
Resets all fields to their default values.
As if this is a newly created data loader.
- set_config(config)¶
Load configuration dictionary.
$_expands to the config file location, if possible, otherwise the current working directory.
- set_cuts(cuts, append=False)¶
Apply a selection on columns in the data tables.
- Parameters:
cut – the cuts on the columns of the data table, e.g.
trapEftp_cal > 1000. If passing a dictionary, the dictionary should be structured asdict[tier] = cut_expr. If passing a list, each item in the array should be able to be applied on one level of tables. The cuts at different levels will be joined with an AND.append (bool) – if True, appends cuts to the existing cuts instead of overwriting
Example
>>> dl.set_cuts({"raw": "daqenergy > 1000", "hit": "AoE > 3"})
- set_datastreams(ds, word, append=False)¶
Apply selection on data streams (or channels).
Sets self.table_list.
- Parameters:
ds (list | tuple | ndarray) – identifies the detectors of interest. Can be a list of detector names, serial numbers, or channels or a list of subsystems of interest e.g.
ged.word (str) – the type of identifier used in ds. Should be a key in the given channel map or a word defined in the configuration file.
append (bool) – if
True, appends datastreams to the existing self.table_list instead of overwriting.
Example
>>> dl.set_datastreams(np.arange(40, 45), "ch")
- set_files(query, append=False)¶
Apply a file selection.
Sets self.file_list, which is a list of indices corresponding to the rows in the file database.
- Parameters:
query (str | list[str]) – if single string, defines an operation on the file database columns supported by
pandas.DataFrame.query(). In addition, theallkeyword is supported to select all files in the database. If list of strings, will be interpreted as key (cycle timestamp) list.append (bool) – if
True, appends files to the existing self.file_list instead of overwriting.
Note
Call this function before any other operation. A second call to
set_files()does not replace the current file list, which gets instead integrated with the new list. Usereset()to reset the file query.Example
>>> dl.set_files("file_status == 26 and timestamp == '20220716T130443Z'")
- set_output(fmt=None, merge_files=None, columns=None, aoesa_to_vov=None)¶
Set the parameters for the output format of load
- Parameters:
fmt (str | None) –
lgdo.Tableorpd.DataFrame.merge_files (bool | None) – if
True, information from multiple files will be merged into one table.columns (list | None) – the columns that should be copied into the output.
aoesa_to_vov (bool | None) – output
ArrayOfEqualSizedArraysasVectorOfVectors.
Example
>>> dl.set_output( ... fmt="pd.DataFrame", ... merge_files=False, ... columns=["daqenergy", "trapEmax", "channel"] ... )
- skim_waveforms(mode='hit', hit_list=None, evt_list=None)¶
handle this one separately because waveforms can easily fill up memory.
- pygama.flow.data_loader.iskeyword()¶
x.__contains__(y) <==> y in x.
pygama.flow.file_db module¶
Utilities for LH5 file inventory.
- class pygama.flow.file_db.FileDB(config, scan=True)¶
Bases:
objectLH5 file database.
A class containing a
pandas.DataFramethat has additional functions to scan the data directory, fill the dataframe’s columns with information about each file, and read or write to disk in an LGDO format.The database contains the following columns:
file keys: the fields specified in the configuration file’s
file_formatthat are required to generate a file name e.g.run,type,timestampetc.{tier}_file: generated file name for the tier.{tier}_size: size of file on disk, if applicable.file_status: contains a bit corresponding to whether or not a file for each tier exists for a given cycle e.g. If we have tiers raw, dsp, and hit, but only the raw file has been produced,file_statuswould be0b100.{tier}_tables: available data streams (channels) in the tier.{tier}_col_idx:file_db.columns[{tier}_col_idx]will return the list of columns available in the tier’s file.
The database must be configured by a JSON file (or corresponding dictionary), which defines the data file names, paths and LH5 layout. For example:
{ "data_dir": "prod-ref-l200/generated/tier", "tier_dirs": { "raw": "/raw", "dsp": "/dsp", "hit": "/hit", "tcm": "/tcm", "evt": "/evt" }, "file_format": { "raw": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_raw.lh5", "dsp": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_dsp.lh5", "hit": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_hit.lh5", "evt": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_evt.lh5", "tcm": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_tcm.lh5" }, "table_format": { "raw": "ch{ch:03d}/raw", "dsp": "ch{ch:03d}/dsp", "hit": "{ch}/hit", "evt": "{grp}/evt", "tcm": "hardware_tcm" }, "tables": { "raw": [0, 1, 2, 4, 5, 6, 7], "dsp": [0, 1, 2, 4, 5, 6, 7], "hit": [0, 1, 2, 4, 5, 6, 7], "tcm": [""], "evt": [""] }, "columns": { "raw": ["baseline", "waveform", "daqenergy"], "dsp": ["trapEftp", "AoE", "trapEmax"], "hit": ["trapEftp_cal", "trapEmax_cal"], "tcm": ["table_key", "row_in_table"], "evt": ["lar_veto", "muon_veto", "ge_mult"] } }
FileDBobjects can be also stored on disk and read-in at later times.Examples
>>> from pygama.flow import FileDB >>> db = FileDB("./filedb_config.json") >>> db.scan_tables_columns() # read in also table columns names >>> print(db) << Columns >> [['baseline', 'card', 'ch_orca', 'channel', 'crate', 'daqenergy', 'deadtime', 'dr_maxticks', 'dr_start_pps', 'dr_start_ticks', 'dr_stop_pps', 'dr_stop_ticks', 'eventnumber', 'fcid', 'numtraces', 'packet_id', 'runtime', 'timestamp', 'to_abs_mu_usec', 'to_dt_mu_usec', 'to_master_sec', 'to_mu_sec', 'to_mu_usec', 'to_start_sec', 'to_start_usec', 'tracelist', 'ts_maxticks', 'ts_pps', 'ts_ticks', 'waveform'], ['bl_intercept', 'bl_mean', 'bl_slope', 'bl_std', 'tail_slope', 'tail_std', 'wf_blsub'], ['table_key', 'row_in_table', 'cumulative_length']] << DataFrame >> exp period run timestamp type ... hit_col_idx tcm_tables tcm_col_idx evt_tables evt_col_idx 0 l60 p01 r014 20220716T105236Z cal ... None [] [2] None None 1 l60 p01 r014 20220716T104550Z cal ... None [] [2] None None >>> db.to_disk("file_db.lh5")
- Parameters:
config (str | dict | list[str]) – dictionary or path to JSON file specifying data directories, tiers, and file name templates. Can also be path (or list of paths or regular expression) to existing LH5 file containing
FileDBobject serialized byto_disk().scan (bool) – whether the file database should scan the directory containing raw files to fill its rows with file keys.
- from_disk(path)¶
Read FileDBs from disk.
Overrides the dataframe, configuration dictionary and columns with the information from a file created by
to_disk().
- get_table_columns(table, tier, ifile=0)¶
Return list of columns in table table, tier tier.
Assumes that the table contents do not change across data files. If desired, ifile (default is 0) can be used to select a different file.
- get_table_name(tier, tb)¶
Get the table name for a tier given its table identifier.
- scan_daq_files(daq_dir, daq_template)¶
Does the exact same thing as
scan_files()but with extra configuration arguments for a DAQ directory and template instead of using the lowest tier.
- scan_files(dirs=None)¶
Scan the directory containing files from the lowest tier and fill the dataframe.
The lowest tier is defined as the first element of the tiers array. Only fills columns that can be populated with just these files.
- scan_tables_columns(to_file=None, override=False, dir_files_conform=False)¶
Open files to read (and store) available tables (and columns therein) names.
Adds the available table names in each tier as a column in the dataframe by searching for group names that match the configured
table_formatand saving the associated keyword values.Returns a list with each unique list of columns found in each table and adds a column
{tier}_col_idxto the dataframe that maps to the column table.- Parameters:
to_file (str | None) – Optionally write the column table to an LH5 file (as a
VectorOfVectors).override (bool) – If the
FileDBalready has a columns field, the scan will not run unless this parameter is set toTrue.dir_files_conform (bool) – if
True, assume that all files in a directory contain tables with the same columns (i.e. all file contents conform to the same format) and scan only the first file. Significantly reduces processing time.
- Return type:
- set_config(config, config_path=None)¶
Read in the configuration dictionary.
- set_file_sizes()¶
Add columns for each tier containing the corresponding file size in bytes.
As reported by
os.path.getsize().
- set_file_status()¶
Add a column with a bit corresponding to whether each tier’s file exists.
For example, if we have tiers raw, dsp, and hit, but only the raw file has been produced,
file_statuswould be 4 (0b100in binary representation).
pygama.flow.utils module¶
Utility functions.
- pygama.flow.utils.dict_to_table(col_dict, attr_dict)¶
- pygama.flow.utils.fill_col_dict(tier_table, col_dict, attr_dict, tcm_idx, table_length, aoesa_to_vov)¶
- pygama.flow.utils.inplace_sort(df, by)¶
- pygama.flow.utils.to_datetime(key)¶
Convert LEGEND cycle key to
datetime.Assumes key is formatted as
YYYYMMDDTHHMMSSZ(UTC).- Return type:
- pygama.flow.utils.to_unixtime(key)¶
Convert LEGEND cycle key to POSIX timestamp.
- Return type: