pygama.flow package

Routines for performing structured queries on LEGEND data.

Queries selectively access LEGEND data using simple boolean selection expressions to access data from multiple datasources, including metadata and parameter databases, and many tiers of data production. The queries return the data in Tabular formats (ak, pd, np) that can be used to perform further analysis.

They also are designed to access data efficiently and at scale, by (as much as possible) accessing only what is necessary to complete the query, and by breaking the query into chunks that can fit in memory and be operated on in parallel.

Several query commands exist, to access data at different levels:

  • query_runs() accesses all runs in a data production, using information from the cycle names

  • query_meta() accesses channel and run data, grabbing information from the metadata repository, the parameters databases, and the run information from cycle names

  • query_data() accesses event data for select channels and runs, grabbing information from all data production tiers, parameter databases, and run information from cycle names

  • query_hist() accesses event data, as above, and returns a histogram

  • query_evt() accesses evt tier data for a selection of runs; note that this cannot make use of other data tiers or of metadata (yet…)

  • build_iterator() creates a LH5Iterator object to load a selection of data fields and metadata for a selection of channels and runs. This can be used to perform more advanced queries if needed.

The information about the data production required to perform these queries can be accessed from its dataflow-config.yaml file. The information needed comes from the paths used for dataflow, and a set of query parameters (which are arguments of the above set of functions). Note that the dataflow-config.yaml need not come from an existing production; one can be modified to point towards additional paths as needed!

A template for a minimal dataflow-config.yaml file:
paths:
    metadata: $_/inputs # path to metadata root dir; usually $REFPROD/inputs

    tier_raw: $_/generated/tier/raw # path to raw tier root dir
    tier_tla: $_/generated/tier/... # path to root dir for tier named TLA
    ...

    par_tla: $_/generated/par/... # path to root dir for pargen database
    ...

query:
    cycle_def: experiment-period-run-datatype-starttime # REQUIRED: hyphen-separated-list-of-fields-in-cycle-name; these will be columns of run db
    metadata: LegendMetadata # REQUIRED: name of metadata class (e.g. LegendMetadata)
    ignored_cycles: dataprod/config/ignored_cycles # path in metadata to list of cycles to skip; by default do not ignore any
    tiers: ["raw", "dsp", "hit", "evt"] # list of tiers TLAs to use from paths for parameter and data queries; default use all
    chan_db: # path in metadata to list of channels for a given run. Use format string syntax, which may refer to any values in the run DB (i.e. cycle_def fields, cycle name, and relative path). If no value was provided, call "metadata.channelmap(on = starttime)", where "starttime" is drawn from the run db
    par_db: #optional info for navigating par dbs. If missing use "par_db.on(starttime)[@chan.name]"
        cycle_entry: # sub-path to entry for cycle, using format string syntax, which may include values from run db; if missing or falsey, call .on
        chan_entry: "@chan.name" # sub-sub-path to entry for channel, using format string syntax, which may include values from run or chan dbs; if missing or falsey, assume same values for all channels in a cycle (useful if only one channel per cycle...)

    meta_dbs: # optional list of metadata DBs
        name: # @name of db for access in query_meta
            path: # metadata path to database
            cycle_entry: # sub-path to entry for cycle, using format string syntax, which may include values from run db; if missing or falsey, call .on
            chan_entry: # sub-sub-path to entry for channel, using format string syntax, which may include values from run or chan dbs; if missing or falsey, use same value for all chans
            # based on these, we will search "metadata["[path][.on(starttime)|/cycle_entry][/chan_entry]
        ...

    tables:
        raw: ch{@chan.daq.rawid:07d}/raw
        evt: evt
        tla: # path to table for channel in tier_tla lh5 files. Use format string syntax, which can refer to values from the run_db and chan_db; e.g. "ch{@chan.daq.rawid}/raw"
        ...

These dataflow-config.yaml files should not often need to be created, and will be provided with most dataprods! If you set the environment variable $REFPROD then the file will automatically be accessed from the referenced directory!

Submodules

pygama.flow.build_iterator module

pygama.flow.build_iterator.build_iterator(fields, runs, channels, *, dataflow_config='$REFPROD/dataflow-config.yaml', tiers=None, tables=None, return_alias_map=False, progress=True, **query_meta_kwargs)

build a :class:LH5Iterator to access data across multiple tiers and databases.

Parameters:
  • fields (Collection[str]) – fields to include (across all tiers)

  • runs (str | Array | Mapping[str, ndarray] | DataFrame) –

    python boolean expression for selecting runs, using column names defined in cycle_def as variables. See query_runs()

    Examples:
    • "period>='p06' and period<='p08' and datatype=='cal'" selects calibration data from periods 6, 7 and 8 (assuming default cycle names)

    • "det in ['V01234A', 'V06789B'] and datatype=='th_HS2_lat_psa'" selects runs for detectors V01234A and V06789B from Th calibration data (using Hades data cycle name experiment-det-datatype-run-starttime)

  • channels (str | Array | Mapping[str, ndarray] | DataFrame) –

    expression used to select channels for each run. Expression can access values from all databases, as well as the run table.

    Examples:
    • "@chan.system=='geds' and @chan.type=='icpc' and @chan.analysis.usability=='on'" selects all ICPC detectors for each run that are marked as usable

    • "@chan.name=='S010' and @chan.analysis.processible" selects SiPM channel 10 and will only include runs where it is can be processed

    Note: if a parameter does not exist for a channel, it will evaluate to None. If this causes an error to be thrown, this expression will evaluate to False, excluding the channel. If an parameter always evaluates to False, it will raise an Exception.

  • dataflow_config (Path | str | Mapping) – config file of reference production. If not provided, use the environment variable $REFPROD as a directory, and find file dataflow-config.yaml

  • tiers (Collection[str]) – tiers to include

  • tables (Mapping[str, str]) – mapping of tiers to format strings to access tables. Format strings may reference values from run or channel DBs. If no channel-wise information is included in the string the same table will be accessed for each channel (may be useful for evt tier). If None, read from dataflow_config. This is required.

  • return_alias_map (bool) – if True, return the pair (table, alias_map) where table is the normal output of this function and alias_map is a mapping from alias names to database paths

  • progress (Status | Console | bool) – if True draw progress spinner; can also provide a rich.Status or:class:rich.Console

  • query_meta_kwargs – additional keyword arguments for query_meta() and query_runs()

pygama.flow.query_data module

pygama.flow.query_data.query_data(fields, runs, channels, entries, *, dataflow_config='$REFPROD/dataflow-config.yaml', return_query_vals=False, return_alias_map=False, processes=None, executor=None, library=None, progress=True, **kwargs)

Query data from multiple tiers and metadata. Return a table containing one entry for each hit corresponding to the selected runs, channels, and data cuts, with columns for the requested data fields. Selections may be based on data fields in any tier of data file, in select metadata tables, in the parameters databases, or in the run descriptions. Values will be returned in a tabular format denoted by library (default awkward.Array). Values from metadata and parameters databases are accessed using (see :meth:query_meta):

[alias]@db_name.par_path

In addition, parameters from data tables may be optionally aliased using:

[alias]:nested.par_name

If no alias is provided, then the on-disk name will be used; if a parameter is nested, then _ will be used to separate levels (in the above example, the default alias would be nested_par_name)

Parameters:
  • fields (Collection[str]) – list of fields to include in the table. May include fields accessible with :meth:query_runs, :meth:query_meta, and fields in any data tier accessible by this method. See above for aliasing rules.

  • runs (str | Array | DataFrame) –

    python boolean expression for selecting runs, using column names defined in cycle_def as variables. See :meth:query_runs

    Examples:

    • select calibration data from periods 6, 7 and 8 (assuming default cycle names):

      "period>='p06' and period<='p08' and datatype=='cal'"
      
    • select runs for detectors V01234A and V06789B from Th calibration data (using Hades data cycle name experiment-det-datatype-run-starttime):

      "det in ["V01234A", "V06789B"] and datatype=='th_HS2_lat_psa'``
      

  • channels (str) –

    expression used to select channels for each run. Expression can access values from all databases, as well as the run table.

    Examples:

    • select all ICPC detectors for each run that are marked as usable:

      "@chan.system=='geds' and @chan.type=='icpc' and @chan.analysis.usability=='on'"``
      
    • selects SiPM channel 10 and will only include runs where it is can be processed:

      "@chan.name=='S010' and @chan.analysis.processible"
      

    Note: if a parameter does not exist for a channel, it will evaluate to None. If this causes an error to be thrown, this expression will evaluate to False, excluding the channel. If an parameter always evaluates to False, it will raise an Exception.

  • entries (str) –

    expression used to select data entries for each run/channel. Expression can access values from any data tier, from all databases, and from run table. Parameters with aliases can be accessed using their on-disk field name or their alias.

    Examples:

    • select events with >100 keV of energy, with various event-level cuts applied:

      "(energy > 100) & (~coincident.puls) & (~coincident.spms) & (geds.multiplicity==1) & ak.all(geds.quality.is_bb_like, axis=-1)"
      
    • select hits with >500 keV of energy and manually applies the low A/E cut:

      ``"(cuspEmax_ctc_cal > 500) & (AoE_classifyer > @pars.pars.operations.AoE_Low_Cut.parameters.a)"``
      

  • dataflow_config (Path | str | Mapping) – config file of reference production. If not provided, use the environment variable $REFPROD as a directory, and find file dataflow-config.yaml

  • return_query_vals (bool) – if True, return values found in query as columns; else only return those in fields

  • return_alias_map (bool) – if True, return the pair (table, alias_map) where table is the normal output of this function and alias_map is a mapping from alias names to database paths

  • processes (Executor | int) – number of processes. If None, use number equal to threads available to executor (if provided), or else do not parallelize

  • executor (Executor) – concurrent.futures.Executor object for managing parallelism. If None, create a concurrent.futures.`ProcessPoolExecutor with number of processes equal to processes.

  • library (str) – format of returned table. Can be ak (default), pd or np

  • progress (Status | Console | bool) – if True draw progress bar; can also provide a rich.Status or:class:rich.Console

  • kwargs – see build_iterator(), query_meta() and query_runs()

pygama.flow.query_evt module

pygama.flow.query_evt.query_evt(fields, runs, events, *, dataflow_config='$REFPROD/dataflow-config.yaml', tiers=None, tables=None, return_query_vals=False, processes=None, executor=None, library=None, progress=True, **kwargs)

Query evt tier data. Return a table containing one entry for each event corresponding to the selected runs and data cuts, with columns for the requested data fields. Selections may be based on data fields in the evt tier or in the run descriptions. Values will be returned in a tabular format denoted by library (default awkward.Array). Parameters may be optionally aliased using:

[alias]:nested.par_name

If no alias is provided, then the on-disk name will be used.

Parameters:
  • fields (Collection[str]) – list of fields to include in the table. May include fields accessible with :meth:query_runs, :meth:query_meta, and fields in any data tier accessible by this method. See above for aliasing rules.

  • runs (str | Array | Mapping[str, ndarray] | DataFrame | None) –

    python boolean expression for selecting runs, using column names defined in cycle_def as variables. See :meth:query_runs

    Examples:

    • select calibration data from periods 6, 7 and 8 (assuming default cycle names):

      "period>='p06' and period<='p08' and datatype=='cal'"
      
    • select runs for detectors V01234A and V06789B from Th calibration data (using Hades data cycle name experiment-det-datatype-run-starttime):

      "det in ["V01234A", "V06789B"] and datatype=='th_HS2_lat_psa'``
      

  • events (str) –

    expression used to select data events for each run/channel. Expression can access values from the event tier. Parameters with aliases can be accessed using their on-disk field name or their alias. Awkward is available using ak.

    Examples:

    • select events with >100 keV of energy, with various event-level cuts applied:

      "(energy > 100) & (~coincident.puls) & (~coincident.spms) & (geds.multiplicity==1) & ak.all(geds.quality.is_bb_like, axis=-1)"
      
    • select hits with >500 keV of energy and manually applies the low A/E cut:

      ``"(cuspEmax_ctc_cal > 500) & (AoE_classifyer > @pars.pars.operations.AoE_Low_Cut.parameters.a)"``
      

  • dataflow_config (Path | str | Mapping) – config file of reference production. If not provided, use the environment variable $REFPROD as a directory, and find file dataflow-config.yaml

  • tiers (Collection[str]) – tiers to include

  • tables (Mapping[str, str]) – mapping of tiers to format strings to access tables. Format strings may reference values from run or channel DBs. If no channel-wise information is included in the string the same table will be accessed for each channel (may be useful for evt tier). If None, read from dataflow_config. This is required.

  • return_query_vals (bool) – if True, return values found in query as columns; else only return those in fields

  • processes (Executor | int) – number of processes. If None, use number equal to threads available to executor (if provided), or else do not parallelize

  • executor (Executor) – concurrent.futures.Executor object for managing parallelism. If None, create a concurrent.futures.`ProcessPoolExecutor with number of processes equal to processes.

  • library (str) – format of returned table. Can be ak (default), pd or np

  • progress (Status | Console | bool) – if True draw progress bar; can also provide a rich.Status or:class:rich.Console

  • kwargs – see query_runs()

pygama.flow.query_hist module

pygama.flow.query_hist.query_hist(axes, runs, channels, entries, *, dataflow_config='$REFPROD/dataflow-config.yaml', processes=None, executor=None, progress=True, **kwargs)

Query data from multiple tiers and metadata. Return a Hist filled with data from hits from selected runs, channels, and data cuts. Values from metadata and parameters databases are accessed using (see :meth:query_meta):

[alias]@db_name.par_path

In addition, parameters from data tables may be optionally aliased using:

[alias]:nested.par_name

If no alias is provided, then the on-disk name will be used; if a parameter is nested, then _ will be used to separate levels (in the above example, the default alias would be nested_par_name)

Parameters:
  • axes (Collection[<module 'hist.axis' from '/home/docs/checkouts/readthedocs.org/user_builds/pygama/envs/v2.5.0/lib/python3.12/site-packages/hist/axis/__init__.py'>] | Mapping[str, <module 'hist.axis' from '/home/docs/checkouts/readthedocs.org/user_builds/pygama/envs/v2.5.0/lib/python3.12/site-packages/hist/axis/__init__.py'>]) –

    axis, list of axes, or mapping from data field to axis to use for histogram. If axis or list of axes, use axis.name for the field. May include fields accessible with :meth:query_runs, :meth:query_meta, and fields in any data tier accessible by this method. See above for aliasing rules; if axis has no label, alias will be used

    Examples:

    • Energy histogram (300 bins ranging from 0 to 3000):

      axis.Regular(300, 0, 3000, name="cuspEmax_ctc_cal", label="Energy (keV)")
      
    • 2-D histogram with energy on x-axis, and detector name on y-axis:

      {
          "cuspEmax_ctc_cal": axis.Regular(300, 0, 3000, label="Energy (keV)"),
          "@chan.name": axis.StrCategory(label="Detector", growth=True)"
      }
      

  • runs (str | Array | DataFrame) –

    python boolean expression for selecting runs, using column names defined in cycle_def as variables. See :meth:query_runs

    Examples:

    • select calibration data from periods 6, 7 and 8 (assuming default cycle names):

      "period>='p06' and period<='p08' and datatype=='cal'"
      
    • select runs for detectors V01234A and V06789B from Th calibration data (using Hades data cycle name experiment-det-datatype-run-starttime):

      "det in ["V01234A", "V06789B"] and datatype=='th_HS2_lat_psa'``
      

  • channels (str) –

    expression used to select channels for each run. Expression can access values from all databases, as well as the run table.

    Examples:

    • select all ICPC detectors for each run that are marked as usable:

      "@chan.system=='geds' and @chan.type=='icpc' and @chan.analysis.usability=='on'"``
      
    • selects SiPM channel 10 and will only include runs where it is can be processed:

      "@chan.name=='S010' and @chan.analysis.processible"
      

    Note: if a parameter does not exist for a channel, it will evaluate to None. If this causes an error to be thrown, this expression will evaluate to False, excluding the channel. If an parameter always evaluates to False, it will raise an Exception.

  • entries (str) –

    expression used to select data entries for each run/channel. Expression can access values from any data tier, from all databases, and from run table. Parameters with aliases can be accessed using their on-disk field name or their alias.

    Examples:

    • select events with >100 keV of energy, with various event-level cuts applied:

      "(energy > 100) & (~coincident.puls) & (~coincident.spms) & (geds.multiplicity==1) & ak.all(geds.quality.is_bb_like, axis=-1)"
      
    • select hits with >500 keV of energy and manually applies the low A/E cut:

      ``"(cuspEmax_ctc_cal > 500) & (AoE_classifyer > @pars.pars.operations.AoE_Low_Cut.parameters.a)"``
      

  • dataflow_config (Path | str | Mapping) – config file of reference production. If not provided, use the environment variable $REFPROD as a directory, and find file dataflow-config.yaml

  • processes (Executor | int) – number of processes. If None, use number equal to threads available to executor (if provided), or else do not parallelize

  • executor (Executor) – concurrent.futures.Executor object for managing parallelism. If None, create a concurrent.futures.`ProcessPoolExecutor with number of processes equal to processes.

  • progress (Status | Console | bool) – if True draw progress bar; can also provide a rich.Status or:class:rich.Console

  • kwargs – see :meth:build_iterator, :meth:query_meta, :meth:query_runs, and :meth:Hist

pygama.flow.query_meta module

pygama.flow.query_meta._query_loop(run_records, col_list, channels, meta, chan_db, db_list, col_name_map, group_chans)
pygama.flow.query_meta.query_meta(fields, runs, channels, *, dataflow_config='$REFPROD/dataflow-config.yaml', group_chans=False, tiers=None, metadata=None, chan_db=None, meta_dbs=None, return_query_vals=False, return_alias_map=False, processes=None, executor=None, library='ak', progress=True, **query_run_kwargs)

Query the metadata and pars data, returning a table containing one entry for each run/channel with the requested data fields. Can also provide boolean expression to select cycles based on data from runs table, and a boolean expression to select based on information about runs and channels found in metadata and parameter databases.

Values from databases are referenced using:

[alias]@db_name.par_path

where:

  • alias: optional alias to use as column name in returned table. If not provided, column name will be db_name_par_path, replacing periods with underscores

  • @db_name: name of data source. Data sources are found on disk using information in the dataflow config file (see dataflow_config):

    • @chan: channel database from metadata.channel_map()

    • @par[_tier]: parameter database from specified tier.

    • Additional data sources defined using the meta_dbs arg or meta_dbs entry in dataflow_config

  • par_path: path in database to par, using periods to separate fields

Examples:

  • @chan.name: name of channel; will be aliased to chan_name

  • rid@chan.daq.raw_id: DAQ id of channel; aliased to rid

  • lt@run.livetime_in_s: livetime from runinfo; aliased to lt

  • aoe_lo@par_hit.pars.operations.AoE_Low_Cut.parameters.a: cut value

    for low A/E cut from hit tier

Parameters:
  • fields (Collection[str]) –

    list of fields to include in the table. See above for description of syntax for naming data sources from metadata and parameter databases.

    Example:

    ["@chan.daq.rawid", "@run.livetime", "aoe_low_cut@par_hit.pars.operations.AoE_Low_Cut.parameters.a"]
    

  • runs (str | Array | DataFrame) –

    boolean python expression for selecting runs, using column names defined in cycle_def as variables. See query_runs()

    Examples:

    • select calibration data from periods 6, 7 and 8 (assuming l200-style cycle names):

      "period>='p06' and period<='p08' and datatype=='cal'"
      
    • select runs for detectors V01234A and V06789B from Th calibration data (using Hades data cycle name experiment-det-datatype-run-starttime):

      "det in ["V01234A", "V06789B"] and datatype=='th_HS2_lat_psa'"
      

  • channels (str) –

    expression used to select channels for each run. Expression can access values from channel, metadata, and parameter databases (with the channel database @chan likely being the most useful)

    Examples:

    • select all ICPC channels for each run that are marked as usable:

      "@chan.type=='icpc' and @chan.analysis.usability=='on'"
      
    • select SiPM channel 10 and will only include runs where it is can be processed:

      "@chan.name=='S010' and @chan.analysis.processible"
      

    Note: if a parameter does not exist for a channel, it will evaluate to None. If this causes an error to be thrown, this expression will evaluate to False, excluding the channel. If an parameter always evaluates to None, it will raise an Exception.

  • dataflow_config (Path | str | Mapping) – config file of reference production. If not provided, use the environment variable $REFPROD as a directory, and find file dataflow-config.yaml

  • tiers (Collection[str] | None) –

    search only provided tiers for pars. If None search all found tiers. By default, get from dataflow-config.

    Examples: ["raw", "dsp", "hit"] or ["raw", "psp", "pht", "evt"]

  • metadata (str | type | MetadataRepository) – class or name of class to use to construct metadata

  • chan_db (str | None) – format string for path in metadata to list of channels for a given cycle. Format string may reference values from the run DB. By default, get from dataflow-config or call LegendMetadata.channelmap(starttime)().

  • meta_dbs (Mapping | None) –

    mapping from database name (i.e. the thing after @) to mapping of configuration parameters. Parameters are as follows:

    • path: path to root of database relative to root of metadata

    • cycle_entry [optional]: sub-path to entry for a given cycle. May be a format string with references to run DB fields. If not provided use dbetto.TextDB.on(starttime)() to find the cycle entry

    • channel_entry [optional]: sub-sub-path to entry for a given channel. May be a format string with references to run DB and channel DB fields. If not provided and group_cycle is False, use @chan.name

    Examples:

    meta_dbs = {
        "runinfo": {
            "path": "path/to/runinfo",
            "cycle_entry": "{period}/{run}/{datatype}",
        },
        "chaninfo": {
            "path": "path/to/chaninfo",
            "cycle_entry": None, # use chaninfo.on(starttime)
            "channel_entry": "@chan.name"
        }
        ...
    }
    

  • group_chans (bool) – if True, return one entry for each run or group of runs, with channel data nested as ragged arrays. Else, return one entry for each channel/run.

  • return_query_vals (bool) – if True, return values found in query as columns; else only return those in fields

  • return_alias_map (bool) – if True, return the pair (table, alias_map) where table is the normal output of this function and alias_map is a mapping from alias names to database paths

  • processes (int | None) – number of processes. If None, use number equal to threads available to executor (if provided), or else do not parallelize

  • executor (Executor | None) – concurrent.futures.Executor object for managing parallelism. If None, create a concurrent.futures.`ProcessPoolExecutor with number of processes equal to processes.

  • library (str) – format of returned table. Can be ak (default), pd or np

  • progress (Status | Console | bool) – if True draw progress spinner; can also provide a rich.Status or:class:rich.Console

  • query_run_kwargs – see query_runs()

pygama.flow.query_runs module

pygama.flow.query_runs._get_run_records_loop(files, relpath, col_names, tiers, removed, runs)
pygama.flow.query_runs.query_runs(runs=None, *, dataflow_config='$REFPROD/dataflow-config.yaml', group_by=None, sort_by='cycle', cycle_def=None, tiers=None, ignored_cycles=None, processes=None, executor=None, library='ak', progress=True)

Query runs and return a table containing one entry for each cycle and data extracted from cycle names. Optionally apply a boolean selection of runs to include using an expression runs.

Run DB is built by recursively cycling through directories in one of the data tiers (using a list of excluded files from metadata). The fields are parsed from the hyphen-separated elements of cycle names (as defined by the cycle-def arg below).

Parameters:
  • runs (str | None) –

    boolean python expression for selecting runs, using column names defined in cycle_def as variables.

    Examples:

    • select calibration data from periods 6, 7 and 8 (assuming l200-style cycle names):

      "period>='p06' and period<='p08' and datatype=='cal'"
      
    • select runs for detectors V01234A and V06789B from Th calibration data (using Hades data cycle name experiment-det-datatype-run-starttime):

      "det in ['V01234A', 'V06789B'] and datatype=='th_HS2_lat_psa'"
      

  • dataflow_config (Path | str | Mapping) – config file of reference production. If not provided, use the environment variable $REFPROD as a directory, and find file dataflow-config.yaml

  • group_by (str | Collection[str] | None) – if None (default) return a flat array with all cycles. If one or more fields are provided, group entries by these fields (using ak.run_lengths(), so group consecutive equal values; this is done after sorting, so be careful if sorting changes order!) Fields that vary within groups will be un-flattened into 2-D ragged arrays. Note that runs query cannot act collectively on grouped cycles.

  • sort_by (str | Collection[str]) – field by which to sort table, or list of fields in order by priority

  • cycle_def (str | None) –

    hyphen-separated names of fields in cycle names; names will be used for columns. By default get from dataflow-config.

    Examples: - experiment-period-run-datatype-cycle for a L200 cycle, e.g. l200-p03-r001-cal-19720101T000000Z - experiment-chan-datatype-run-starttime for a Hades cycle, e.g. char_data-V05268A-th_HS2_lat_psa-r001-20201008T122118Z

  • tiers (str | Collection[str] | Mapping[str, str] | None) – tiers used to find files. First tier in list is used to walk through directories to populate run DB. Remaining tiers are checked for presence of cycles; a cycle is only added if it exists for each tier. File relative path for each tier’s file is added as a column called tier_[t]. Can provide: - Mapping from tier name to path to root of tier - List of tier names/single tier name. Paths will be found in dataflow_config["paths"] - None: read from dataflow_config; if tiers entry not found, use "raw"

  • ignored_cycles (str | Collection[str] | None) – path(s) in metadata to list(s) of ignored cycles. By default get from dataflow-config, or else do not skip any cycles.

  • processes (int | None) – number of processes. If None, use number equal to threads available to executor (if provided), or else do not parallelize

  • executor (Executor | None) – concurrent.futures.Executor object for managing parallelism. If None, create a concurrent.futures.`ProcessPoolExecutor with number of processes equal to processes.

  • library (str) – format of returned table. Can be ak (default), pd or np

  • progress (Status | Console | bool) – if True draw progress spinner; can also provide a rich.Status or:class:rich.Console

pygama.flow.utils module

pygama.flow.utils.format_vars(fstring)

Helper to get list of variables referenced in format string

pygama.flow.utils.get_recursive(db, path)

Helper to recursively access values from nested dict-likes

pygama.flow.utils.parse_query_paths(expr, fullmatch=False)

Parse input string for variable names of the form:

[alias][@ or :][par.path]

and return a list of each matching 3-tuple of the form:

(full_match, alias, path)

Aliases and names in paths must be legal python names (i.e. alphanumeric, doesn’t start with a digit). If @ is used to separate the alias and path, it is left in the path (to denote a metadata location); if : is used, it is omitted. Note that function names (i.e. a valid name followed by () are excluded. Values inside of [...], {...}, "...", and '...' are also excluded.

If fullmatch is True, expect full string to match pattern and return single tuple. Otherwise return a list of tuples, for each match found.

Return type:

list[tuple[str, str | None, str]] | tuple[str, str | None, str]