pygama.flow package#

Subpackage description

Submodules#

pygama.flow.data_loader module#

Routines for high-level data loading and skimming.

class pygama.flow.data_loader.DataLoader(config: str | dict, filedb: str | dict | pygama.flow.file_db.FileDB, file_query: Optional[str] = None)#

Bases: object

Facilitate loading of processed data across several tiers.

Where possible, uses a FileDB object so that a user can quickly select a subset of cycle files for interest, and access information at each processing tier.

Example JSON configuration file:

{
    "levels": {
        "hit": {
            "tiers": ["raw", "dsp", "hit"]
        },
        "tcm": {
            "tiers": ["tcm"],
            "parent": "hit",
            "child": "evt",
            "tcm_cols": {
                "child_idx": "coin_idx",
                "parent_tb": "array_id",
                "parent_idx": "array_idx"
            }
        },
        "evt": {
            "tiers": ["evt"]
        }
    },
    "channel_map": {}
}

Examples

>>> from pygama.flow import DataLoader
>>> dl = DataLoader("loader-config.json", "filedb-config.json")
>>> dl.set_files("file_status == 26 and timestamp == '20220716T130443Z'")
>>> dl.set_datastreams([3, 6, 8], "ch")
>>> dl.set_cuts({"hit": "daqenergy > 1000 and AoE > 3", "evt": "muon_veto == False"})
>>> dl.set_output(fmt="pd.DataFrame", columns=["daqenergy", "channel"])
>>> data = dl.load()

Advanced Usage:

>>> from pygama.flow import DataLoader
>>> dl = DataLoader("loader-config.json", "filedb-config.json")
>>> dl.set_files("all")
>>> dl.set_datastreams([0], "ch")
>>> dl.set_cuts({"hit": "wf_max > 30000"})
>>> el = dl.build_entry_list(tcm_level="tcm", mode="any")
>>> el.query("hit_table == 20", inplace=True)
>>> dl.set_output(fmt="pd.DataFrame", columns=["daqenergy", "channel"])
>>> data = dl.load(el)
Parameters:
  • config (str | dict) – configuration dictionary or JSON file, see above for specifications.

  • filedb (str | dict | FileDB) –

    the loader needs a file database. It can be specified in multiple ways:

  • file_query (str) – string query that should operate on columns of a FileDB.

Note

No data is loaded in memory at this point.

browse(query, dsp_config=None)#

Interface between DataLoader and WaveformBrowser.

build_entry_list(tcm_level: Optional[str] = None, tcm_table: Optional[Union[int, str]] = None, mode: str = 'only', save_output_columns: bool = False, in_memory: bool = True, output_file: Optional[str] = None) dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None#

Applies cuts to the tables and files of interest.

Can only load up to two levels, those joined by tcm_level.

Parameters:
  • tcm_level (Optional[str]) – the type of TCM to be used. If None, will only return information from lowest level.

  • tcm_table (Optional[Union[int, str]]) – the identifier of the table inside this TCM level that you want to use. If unspecified, there must only be one table inside a TCM file in tcm_level.

  • mode (str) – if any, returns every hit in the event if any hit in the event passes the cuts. If only, only returns hits that pass the cuts.

  • save_output_columns (bool) – if True, saves any columns needed for both the cut and the output to the self.entry_list.

  • in_memory (bool) – if True, returns the generated entry list in memory.

  • output_file (Optional[str]) – HDF5 file name to write the entry list to.

Returns:

entries – the entry list containing columns for {parent}_idx, {parent}_table, {child}_idx and output columns if applicable. Only returned if in_memory is True.

Return type:

dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None

Note

Does not load the column information into memory. This is done by load().

build_hit_entries(save_output_columns: bool = False, in_memory: bool = True, output_file: Optional[str] = None) dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None#

Called by build_entry_list() to handle the case when tcm_level is unspecified.

Ignores any cuts set on levels above lowest level.

Parameters:
  • save_output_columns (bool) – If True, saves any columns needed for both the cut and the output to the entry list.

  • in_memory (bool) – If True, returns the generated entry list in memory.

  • output_file (Optional[str]) – HDF5 file name to write the entry list to.

Returns:

entries – the entry list containing columns for {low_level}_idx, {low_level}_table, and output columns if applicable. Only returned if in_memory is True.

Return type:

dict[int, pandas.core.frame.DataFrame] | pandas.core.frame.DataFrame | None

get_file_list() DataFrame#

Returns a copy of the file database with its dataframe pared down to the current file list.

Return type:

DataFrame

get_table_name(tier: str, tb: str) str#

Get the table name for a tier given its table identifier.

Parameters:
  • tier (str) – specify the tier whose table format will be used.

  • tb (str) – the table identifier that will be passed to the table format.

Returns:

table_name – the name of the table in tier with table identifier tb

Return type:

str

get_tiers_for_col(columns: list | numpy.ndarray, merge_files: Optional[bool] = None) dict#

For each column given, get the tiers and tables in that tier where that column can be found.

Parameters:

columns (list | numpy.ndarray) – the columns to look for.

Returns:

col_tierscol_tiers[file]["tables"][tier] gives a list of tables in tier that contain a column of interest. col_tiers[file]["columns"][column] gives the tier that column can be found in. If self.merge_file`s then ` col_tiers[tier]` is a list of tables in tier that contain a column of interest.

Return type:

dict

load(entry_list: Optional[DataFrame] = None, in_memory: bool = True, output_file: Optional[str] = None, orientation: str = 'hit', tcm_level: Optional[str] = None) None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame#

Loads the requested columns in self.output_columns for the entries in the given entry_list.

Parameters:
  • entry_list (Optional[DataFrame]) – the output of build_entry_list().

  • in_memory (bool) – if True, returns the loaded data in memory and stores in self.data.

  • output_file (Optional[str]) – if not None, writes the loaded data to the specified file.

  • orientation (str) – specifies the orientation of the output table. Can be hit or evt.

  • tcm_level (Optional[str]) – which TCM was used to create the entry_list.

Returns:

data – The data loaded from disk, as specified by self.output_format, self.output_columns, and self.merge_files. Only returned if in_memory is True.

Return type:

None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame

load_cal_pars(query)#

access the cal_pars parameter database, run a query, and return some tables.

load_detector(det_id)#

special version of load designed to retrieve all file files, tables, column names, and potentially calibration/dsp parameters relevant to one single detector.

load_dsp_pars(query)#

access the dsp_pars parameter database (probably JSON format) and do some kind of query to retrieve parameters of interest for our file list, and return some tables.

load_evts(entry_list: Optional[DataFrame] = None, in_memory: bool = False, output_file: Optional[str] = None, tcm_level: Optional[str] = None) None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame#

Called by load() when orientation is evt.

Return type:

None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame

load_hits(entry_list: DataFrame, in_memory: bool = False, output_file: Optional[str] = None, tcm_level: Optional[str] = None) None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame#

Called by load() when orientation is hit.

Return type:

None | pygama.lgdo.table.Table | pygama.lgdo.struct.Struct | pandas.core.frame.DataFrame

load_settings()#

get metadata stored in raw files, usually from a DAQ machine.

reset()#

Resets all fields to their default values, as if this is a newly created data loader.

set_config(config: dict) None#

Load configuration dictionary.

set_cuts(cuts: dict | list) None#

Apply a selection on columns in the data tables.

Parameters:

cut – the cuts on the columns of the data table, e.g. trapEftp_cal > 1000. If passing a dictionary, the dictionary should be structured as dict[tier] = cut_expr. If passing a list, each item in the array should be able to be applied on one level of tables. The cuts at different levels will be joined with an AND.

Example

>>> dl.set_cuts({"raw": "daqenergy > 1000", "hit": "AoE > 3"})
set_datastreams(ds: list | tuple | numpy.ndarray, word: str) None#

Apply selection on data streams (or channels).

Sets self.table_list.

Parameters:
  • ds (list | tuple | numpy.ndarray) – identifies the detectors of interest. Can be a list of detector names, serial numbers, or channels or a list of subsystems of interest e.g. ged.

  • word (str) – the type of identifier used in ds. Should be a key in the given channel map or a word defined in the configuration file.

Example

>>> dl.set_datastreams(np.arange(40, 45), "ch")
set_files(query: str | list[str]) None#

Apply a file selection.

Sets self.file_list, which is a list of indices corresponding to the rows in the file database.

Parameters:

query (str | list[str]) – if single string, defines an operation on the file database columns supported by pandas.DataFrame.query(). In addition, the all keyword is supported to select all files in the database. If list of strings, will be interpreted as key (cycle timestamp) list.

Note

Call this function before any other operation. A second call to set_files() does not replace the current file list, which gets instead integrated with the new list. Use reset() to reset the file query.

Example

>>> dl.set_files("file_status == 26 and timestamp == '20220716T130443Z'")
set_output(fmt: Optional[str] = None, merge_files: Optional[bool] = None, columns: Optional[list] = None) None#

Set the parameters for the output format of load

Parameters:
  • fmt (Optional[str]) – lgdo.Table or pd.DataFrame.

  • merge_files (Optional[bool]) – If True, information from multiple files will be merged into one table.

  • columns (Optional[list]) – The columns that should be copied into the output.

Example

>>> dl.set_output(fmt="pd.DataFrame", merge_files=False, columns=["daqenergy", "trapEmax", "channel"])
skim_waveforms(mode: str = 'hit', hit_list=None, evt_list=None)#

handle this one separately because waveforms can easily fill up memory.

pygama.flow.data_loader.iskeyword()#

x.__contains__(y) <==> y in x.

pygama.flow.datagroup module#

class pygama.flow.datagroup.DataGroup(config=None, nfiles=None, load=False)#

Bases: object

A class to create an in-memory or on-disk set of files, according to the LEGEND data convention. Typically requires a JSON config file with:

  • path to DAQ and LH5 directories

  • format strings for daq/lh5 files

  • partitions for the LH5 data directory

Reference: https://docs.legend-exp.org/index.php/apps/files/?dir=/LEGEND%20Documents/Technical%20Documents/Analysis&fileid=9140#pdfviewer

DOCME

get_lh5_cols()#

compute the LH5 filenames.

need to generate the file names, and then figure out which folder to store them in. probably best to separate these tasks

lh5_dir_setup(user_dir=False)#

generate paths to LH5 data directories, using self.lh5_dir if user_dir is True, create them in self.lh5_user instead.

load_df(fname=None)#

DOCME

load_keys(fname=None)#

load a list of file keys and parse data into columns according to the format string

save_df(fname=None)#

save the current self.fileDB dataframe. If we’ve added extra columns specific to an experiment (outside this class), this will preserve them.

save_keys(fname=None)#

default: save the unique_key and the relative path to the DAQ file, as a CSV file. this will probably change in the future, but at least this way we can:

  • easily get a list of available DAQ files

  • regenerate the DataFrame from scan_daq_dir by parsing format string

scan_daq_dir(verbose=False)#

scan the DAQ directory and build a DataFrame of file keys. don’t make any experiment-specific choices here.

set_config(config)#

DOCME

pygama.flow.file_db module#

Utilities for LH5 file inventory.

class pygama.flow.file_db.FileDB(config: str | dict, scan: bool = True)#

Bases: object

LH5 file database.

A class containing a pandas.DataFrame that has additional functions to scan the data directory, fill the dataframe’s columns with information about each file, and read or write to disk in an LGDO format.

The database contains the following columns:

  • file keys: the fields specified in the configuration file’s file_format that are required to generate a file name e.g. run, type, timestamp etc.

  • {tier}_file: generated file name for the tier.

  • {tier}_size: size of file on disk, if applicable.

  • file_status: contains a bit corresponding to whether or not a file for each tier exists for a given cycle e.g. If we have tiers raw, dsp, and hit, but only the raw file has been produced, file_status would be 0b100.

  • {tier}_tables: available data streams (channels) in the tier.

  • {tier}_col_idx: file_db.columns[{tier}_col_idx] will return the list of columns available in the tier’s file.

The database must be configured by a JSON file (or corresponding dictionary), which defines the data file names, paths and LH5 layout. For example:

{
    "data_dir": "prod-ref-l200/generated/tier",
    "tier_dirs": {
        "raw": "/raw",
        "dsp": "/dsp",
        "hit": "/hit",
        "tcm": "/tcm",
        "evt": "/evt"
    },
    "file_format": {
        "raw": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_raw.lh5",
        "dsp": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_dsp.lh5",
        "hit": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_hit.lh5",
        "evt": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_evt.lh5",
        "tcm": "/{type}/{period}/{run}/{exp}-{period}-{run}-{type}-{timestamp}-tier_tcm.lh5"
    },
    "table_format": {
        "raw": "ch{ch:03d}/raw",
        "dsp": "ch{ch:03d}/dsp",
        "hit": "{ch}/hit",
        "evt": "{grp}/evt",
        "tcm": "hardware_tcm"
    },
    "tables": {
        "raw": [0, 1, 2, 4, 5, 6, 7],
        "dsp": [0, 1, 2, 4, 5, 6, 7],
        "hit": [0, 1, 2, 4, 5, 6, 7],
        "tcm": [""],
        "evt": [""]
    },
    "columns": {
        "raw": ["baseline", "waveform", "daqenergy"],
        "dsp": ["trapEftp", "AoE", "trapEmax"],
        "hit": ["trapEftp_cal", "trapEmax_cal"],
        "tcm": ["cumulative_length", "array_id", "array_idx"],
        "evt": ["lar_veto", "muon_veto", "ge_mult"]
    }
}

FileDB objects can be also stored on disk and read-in at later times.

Examples

>>> from pygama.flow import FileDB
>>> db = FileDB("./filedb_config.json")
>>> db.get_tables_columns()  # read in also table columns names
>>> print(db)
<< Columns >>
[['baseline', 'card', 'ch_orca', 'channel', 'crate', 'daqenergy', 'deadtime', 'dr_maxticks', 'dr_start_pps', 'dr_start_ticks', 'dr_stop_pps', 'dr_stop_ticks', 'eventnumber', 'fcid', 'numtraces', 'packet_id', 'runtime', 'timestamp', 'to_abs_mu_usec', 'to_dt_mu_usec', 'to_master_sec', 'to_mu_sec', 'to_mu_usec', 'to_start_sec', 'to_start_usec', 'tracelist', 'ts_maxticks', 'ts_pps', 'ts_ticks', 'waveform'], ['bl_intercept', 'bl_mean', 'bl_slope', 'bl_std', 'tail_slope', 'tail_std', 'wf_blsub'], ['array_id', 'array_idx', 'cumulative_length']]
<< DataFrame >>
   exp period   run         timestamp type  ... hit_col_idx tcm_tables tcm_col_idx evt_tables evt_col_idx
0  l60    p01  r014  20220716T105236Z  cal  ...        None         []         [2]       None        None
1  l60    p01  r014  20220716T104550Z  cal  ...        None         []         [2]       None        None
>>> db.to_disk("file_db.lh5")
Parameters:
  • config (str | dict) – dictionary or path to JSON file specifying data directories, tiers, and file name templates. Can also be path to existing LH5 file containing FileDB object serialized by to_disk().

  • scan (bool) – whether the file database should scan the directory containing raw files to fill its rows with file keys.

from_disk(filename: str) None#

Fills the dataframe (and configuration dictionary) with the information from a file created by to_disk().

scan_daq_files(daq_dir: str, daq_template: str) None#

Does the exact same thing as scan_files() but with extra configuration arguments for a DAQ directory and template instead of using the lowest tier.

scan_files() None#

Scan the directory containing files from the lower tier and fill the dataframe.

The lower tier is defined as the first element of the tiers array. Only fills columns that can be populated with just the raw files.

scan_tables_columns(to_file: Optional[str] = None, override: bool = False) list[str]#

Open files in the database to read (and store) available tables (and columns therein) names.

Adds the available table names in each tier as a column in the dataframe by searching for group names that match the configured table_format and saving the associated keyword values.

Returns a list with each unique list of columns found in each table and adds a column {tier}_col_idx to the dataframe that maps to the column table.

Parameters:
  • to_file (Optional[str]) – Optionally write the column table to an LH5 file (as a VectorOfVectors)

  • override (bool) – If the FileDB already has a columns field, the scan will not run unless this parameter is set to True

Return type:

list[str]

set_config(config: dict, config_path: Optional[str] = None) None#

Read in the configuration dictionary.

set_file_sizes() None#

Add columns (for each tier) to the database containing the corresponding file size in bytes as reported by os.path.getsize().

set_file_status() None#

Add a column to the dataframe with a bit corresponding to whether each tier’s file exists.

For example, if we have tiers raw, dsp, and hit, but only the raw file has been produced, file_status would be 4 (0b100 in binary representation).

to_disk(filename: str, wo_mode='write_safe') None#

Serializes database to disk.

Parameters:
  • filename (str) – output LH5 file name.

  • wo_mode – passed to write_object().

pygama.flow.file_db.to_datetime(key: str) datetime#

Convert LEGEND cycle key to datetime.

Assumes key is formatted as YYYYMMDDTHHMMSSZ (UTC).

Return type:

datetime

pygama.flow.file_db.to_unixtime(key: str) int#

Convert LEGEND cycle key to POSIX timestamp.

Return type:

int