API documentation

Pipeline-specific modules

abstract_step

Classes AbstractStep and AbstractSourceStep are defined here.

The class AbstractStep has to be inherited by all processing step classes. The class AbstractSourceStep has to be inherited by all source step classes.

Processing steps generate output files from input files whereas source steps only provide output files. Both step types may generates tasks, but only source steps can introduce files from outside the destination path into the pipeline.

class abstract_step.AbstractStep(pipeline)[source]
add_connection(connection, constraints=None)[source]

Add a connection, which must start with ‘in/’ or ‘out/’.

add_dependency(parent)[source]

Add a parent step to this steps dependencies.

parent – parent step this step depends on

add_input_connection(connection, constraints=None)[source]

Add an input connection to this step

add_option(key, *option_types, **kwargs)[source]

Add an option. Multiple types may be specified.

add_output_connection(connection, constraints=None)[source]

Add an output connection to this step

declare_run(run_id)[source]

Declare a run. Use it like this:

with self.declare_run(run_id) as run:
    # add output files and information to the run here
dependencies = None

All steps this step depends on.

finalize()[source]

Finalizes the step.

The intention is to make further changes to the step impossible, but apparently, it’s checked nowhere at the moment.

find_upstream_info_for_input_paths(input_paths, key)[source]

Find a piece of public information in all upstream steps. If the information is not found or defined in more than one upstream step, this will crash.

generate_one_report()[source]

Gathers the output files for each outgoing connection and calls self.reports() to do the job of creating a report.

generate_report(run_id)[source]

Gathers the output files for each outgoing connection and calls self.reports() to do the job of creating a report.

get_cores()[source]

Returns the number of cores used in this step.

get_in_connections()[source]

Return all in-connections for this step

get_input_runs()[source]

Return a dict which contains all runs per parent steps.

get_module_loads()[source]

Return dictionary with module load commands to execute before starting any other command of this step

get_module_unloads()[source]

Return dictionary with module unload commands to execute before starting any other command of this step

get_option(key)[source]

Query an option.

get_options()[source]

Returns a dictionary of all given options

get_out_connections()[source]

Return all out-connections for this step

get_post_commands()[source]

Return dictionary with commands to execute after finishing any other command of this step

get_pre_commands()[source]

Return dictionary with commands to execute before starting any other command of this step

get_run(run_id)[source]

Returns a single run object for run_id or None.

get_run_ids()[source]

Returns sorted list of runs generated by step.

get_run_ids_in_connections_input_files()[source]

Return a dictionary with all run IDs from parent steps, the in connections they provide data for, and the names of the files:

run_id_1:
    in_connection_1: [input_path_1, input_path_2, ...]
    in_connection_2: ...
run_id_2: ...

Format of in_connection: in/<connection>. Input paths are absolute.

get_run_ids_out_connections_output_files()[source]

Return a dictionary with all run IDs of the current step, their out connections, and the files that belong to them:

run_id_1:
    in_connection_1: [input_path_1, input_path_2, ...]
    in_connection_2: ...
run_id_2: ...

Format of in_connection: in/<connection>. Input paths are absolute.

get_run_state(run_id)[source]

Returns run state of a run.

Determine the run state (that is, not basic but extended run state) of a run, building on the value returned by get_run_state_basic().

If a run is ready, this will:
  • return executing if an up-to-date executing ping file is found
  • otherwise return queued if a queued ping file is found
If a run is waiting, this will:
  • return queued if a queued ping file is found

Otherwise, it will just return the value obtained from get_run_state_basic().

Attention: The status indicators executing and queued may be temporarily wrong due to the possiblity of having out-of-date ping files lying around.

get_run_state_basic(run_id)[source]

Determines basic run state of a run.

Determine the basic run state of a run, which is, at any time, one of waiting, ready, or finished.

These states are determined from the current configuration and the timestamps of result files present in the file system. In addition to these three basic states, there are two additional states which are less reliable (see get_run_state()).

get_runs()[source]

Getter method for runs of this step.

If there are no runs as this method is called, they are created here.

classmethod get_step_class_for_key(key)[source]

Returns a step (or source step) class for a given key which corresponds to the name of the module the class is defined in. Pass ‘cutadapt’ and you will get the cutadapt.Cutadapt class which you may then instantiate.

get_step_name()[source]

Returns this steps name.

Returns the step name which is initially equal to the step type (== module name) but can be changed via set_step_name() or via the YAML configuration.

get_step_type()[source]

Returns the original step name (== module name).

get_tool(key)[source]

Return full path to a configured tool.

is_option_set_in_config(key)[source]

Determine whether an optional option (that is, a non-required option) has been set in the configuration.

reports(run_id, out_connection_output_files)[source]

Abstract method this must be implemented by actual step.

Raise NotImplementedError if subclass does not override this method.

require_tool(tool)[source]

Declare that this step requires an external tool. Query it later with get_tool().

run(run_id)[source]

Create a temporary output directory and execute a run. After the run has finished, it is checked that all output files are in place and the output files are moved to the final output location. Finally, YAML annotations are written.

runs(run_ids_connections_files)[source]

Abstract method this must be implemented by actual step.

Raise NotImplementedError if subclass does not override this method.

set_cores(cores)[source]

Specify the number of CPU cores this step will use.

set_options(options)[source]

Checks and stores step options.

The options are either set to values given in YAML config or the default values set in self.add_option().

set_step_name(step_name)[source]

Change the step name.

The step name is initially set to the module name. This method is used in case we need multiple steps of the same kind.

class abstract_step.AbstractSourceStep(pipeline)[source]

A subclass all source steps inherit from and which distinguishes source steps from all real processing steps because they do not yield any tasks, because their “output files” are in fact files which are already there.

Note that the name might be a bit misleading because this class only applies to source steps which ‘serve’ existing files. A step which has no input but produces input data for other steps and actually has to do something for it, on the other hand, would be a normal AbstractStep subclass because it produces tasks.

pipeline

exception pipeline.ConfigurationException(value)[source]
class pipeline.Pipeline(**kwargs)[source]

The Pipeline class represents the entire processing pipeline which is defined and configured via the configuration file config.yaml.

Individual steps may be defined in a tree, and their combination with samples as generated by one or more source leads to an array of tasks.

all_tasks_topologically_sorted = None

List of all tasks in topological order.

check_tools()[source]

checks whether all tools references by the configuration are available and records their versions as determined by [tool] --version etc.

cluster_type = None

The cluster type to be used (must be one of the keys specified in cluster_config).

config = None

Dictionary representation of configuration YAML file.

file_dependencies = None

This dict stores file dependencies within this pipeline, but regardless of step, output file tag or run ID. This dict has, for all output files generated by the pipeline, a set of input files that output file depends on.

file_dependencies_reverse = None

This dict stores file dependencies within this pipeline, but regardless of step, output file tag or run ID. This dict has, for all input files required by the pipeline, a set of output files which are generated using this input file.

input_files_for_task_id = None

This dict stores a set of input files for every task id in the pipeline.

output_files_for_task_id = None

This dict stores a set of output files for every task id in the pipeline.

states = Enum(['READY', 'EXECUTING', 'WAITING', 'QUEUED', 'FINISHED'])

Possible states a task can be in.

steps = None

This dict stores step objects by their name. Each step knows his dependencies.

task_for_task_id = None

This dict stores task objects by task IDs.

task_id_for_output_file = None

This dict stores a task ID for every output file created by the pipeline.

task_ids_for_input_file = None

This dict stores a set of task IDs for every input file used in the pipeline.

topological_step_order = None

List with topologically ordered steps.

run

class run.Run(step, run_id)[source]

The Run class is a helper class which represents a run in a step. Declare runs inside AbstractStep.runs() via:

with self.new_run(run_id) as run:
    # declare output files, private and public info here

After that, use the available methods to configure the run. The run has typically no information about input connections only about input files.

add_empty_output_connection(tag)[source]

An empty output connection has ‘None’ as output file and ‘None’ as input file.

add_output_file(tag, out_path, in_paths)[source]

Add an output file to this run. Output file names must be unique across all runs defined by a step, so it may be a good idea to include the run_id into the output filename.

  • tag: You must specify the connection annotation which must have been
    previously declared via AbstractStep.add_connection(“out/…”), but this doesn’t have to be done in the step constructor, it’s also possible in declare_runs() right before this method is called.
  • out_path: The output file path, without a directory. The pipeline
    assigns directories for you (this parameter must not contain a slash).
  • in_paths: A list of input files this output file depends on. It is
    crucial to get this right, so that the pipeline can determine which steps are up-to-date at any given time. You have to specify absolute paths here, including a directory, and you can obtain them via AbstractStep.run_ids_and_input_files_for_connection and related functions.
add_private_info(key, value)[source]

Add private information to a run. Use this to store data which you will need when the run is executed. As opposed to public information, private information is not visible to subsequent steps.

You can store paths to input files here, but not paths to output files as their expected location is not defined until we’re in AbstractStep.execute (hint: they get written to a temporary directory inside execute()).

add_public_info(key, value)[source]

Add public information to a run. For example, a FASTQ reader may store the index barcode here for subsequent steps to query via AbstractStep.find_upstream_info().

add_temporary_directory(prefix='', suffix='', designation=None)[source]

Convenience method for creation of temporary directories. Basically, just calls self.add_temporary_file(). The magic happens in ProcessPool.__exit__()

get_basic_state()[source]

Determines basic run state of a run.

Determine the basic run state of a run, which is, at any time, one of waiting, ready, or finished.

These states are determined from the current configuration and the timestamps of result files present in the file system. In addition to these three basic states, there are two additional states which are less reliable (see get_run_state()).

get_execution_hashtag()[source]

Creates a hash tag based on the commands to be executed.

This causes runs to be marked for rerunning if the commands to be executed change.

get_input_files_for_output_file(out_path)[source]

Return all input files a given output file depends on.

get_output_directory()[source]

Returns the final output directory.

get_output_directory_du_jour()[source]

Returns the state-dependent output directory of the step.

Returns this steps output directory according to its current state: - if we are currently calling a step’s declare_runs() method, this will return None - if we are currently calling a step’s execute() method, this will return the temporary directory - otherwise, it will return the real output directory

get_output_directory_du_jour_placeholder()[source]

Returns a placeholder for the temporary output directory, which needs to be replaced by the actual temp directory inside the abstract_step.execute() method

get_output_files_abspath()[source]

Return a dictionary of all defined output files, grouped by connection annotation:

annotation_1:
    out_path_1: [in_path_1, in_path_2, ...]
    out_path_2: ...
annotation_2: ...

The out_path consists of the output directory du jour and the output file name.

get_output_files_for_annotation_and_tags(annotation, tags)[source]

Retrieve a set of output files of the given annotation, assigned to the same number of specified tags. If you have two ‘alignment’ output files and they are called out-a.txt and out-b.txt, you can use this function like this:

  • tags: [‘a’, ‘b’]
  • result: {‘a’: ‘out-a.txt’, ‘b’: ‘out-b.txt’}
get_private_info(key)[source]

Query private information which must have been previously stored via ” “add_private_info().

get_public_info(key)[source]

Query public information which must have been previously stored via ” “add_public_info().

get_single_output_file_for_annotation(annotation)[source]

Retrieve exactly one output file of the given annotation, and crash if there isn’t exactly one.

get_temp_output_directory()[source]

Returns the temporary output directory of a run.

has_private_info(key)[source]

Query whether a piece of public information has been defined.

has_public_info(key)[source]

Query whether a piece of public information has been defined.

remove_temporary_paths()[source]

Everything stored in self._temp_paths is examined and deleted if possible. The list elements are removed in LIFO order. Also, self._known_paths ‘type’ info is updated here. NOTE: Included additional stat checks to detect FIFOs as well as other special files.

update_public_info(key, value)[source]

Update public information already existing in a run. For example, all steps which handle FASTQ files want to know how to distinguish between files of read 1 and files of read 2. So each step that provides FASTQ should update this information if the file names are altered. The stored information can be acquired via: AbstractStep.find_upstream_info().

write_annotation_file(path)[source]

Write the YAML annotation after a successful or failed run. The annotation can later be used to render the process graph.

task

class task.Task(pipeline, step, run_id, run_index)[source]

A task represents a certain run of a certain step.

get_parent_tasks()[source]

Returns a list of parent tasks which this task depends on.

get_pipeline()[source]

Returns the pipeline this task belongs to.

get_run()[source]

Returns the run object for this task.

get_step()[source]

Returns the step of this task.

get_task_state()[source]

Proxy method for step.get_run_state().

get_task_state_basic()[source]

Proxy method for step.get_run_state().

input_files()[source]

Return a list of input files required by this task.

output_files()[source]

Return a list of output files produced by this task.

run()[source]

Run the task. Skip if it’s already finished. Raise Exception if it’s not ready.

Miscellaneous modules

process_pool

This module can be used to launch child processes and wait for them. Processes may either run on their own or pipelines can be built with them.

class process_pool.ProcessPool(run)[source]

The process pool provides an environment for launching and monitoring processes. You can launch any number of unrelated processes plus any number of pipelines in which several processes are chained together.

Use it like this:

with process_pool.ProcessPool(self) as pool:
    # launch processes or create pipelines here

When the scope opened by the with statement is left, all processes are launched and being watched. The process pool then waits until all processes have finished. You cannot launch a process pool within another process pool, but you can launch multiple pipeline and independent processes within a single process pool. Also, you can launch several process pools sequentially.

COPY_BLOCK_SIZE = 4194304

When stdout or stderr streams should be written to output files, this is the buffer size which is used for writing.

class Pipeline(pool)[source]

This class can be used to chain multiple processes together.

Use it like this:

with pool.Pipeline(pool) as pipeline:
    # append processes to the pipeline here
append(args, stdout_path=None, stderr_path=None, hints={})[source]

Append a process to the pipeline. Parameters get stored and are passed to ProcessPool.launch() later, so the same behaviour applies.

SIGTERM_TIMEOUT = 10

After a SIGTERM signal is issued, wait this many seconds before going postal.

TAIL_LENGTH = 1024

Size of the tail which gets recorded from both stdout and stderr streams of every process launched with this class, in bytes.

get_log()[source]

Return the log as a dictionary.

classmethod kill()[source]

Kills all user-launched processes. After that, the remaining process will end and a report will be written.

classmethod kill_all_child_processes()[source]

Kill all child processes of this process by sending a SIGTERM to each of them. This includes all children which were not launched by this module, and their children etc.

launch(args, stdout_path=None, stderr_path=None, hints={})[source]

Launch a process. Arguments, including the program itself, are passed in args. If the program is not a binary but a script which cannot be invoked directly from the command line, the first element of args must be a list like this: [‘python’, ‘script.py’].

Use stdout_path and stderr_path to redirect stdout and stderr streams to files. In any case, the output of both streams gets watched, the process pool calculates SHA1 checksums automatically and also keeps the last 1024 bytes of every stream. This may be useful if a process crashes and writes error messages to stderr in which case you can see them even if you didn’t redirect stderr to a log file.

Hints can be specified but are not essential. They help to determine the direction of arrows for the run annotation graphs rendered by GraphViz (sometimes, it’s not clear from the command line whether a certain file is an input or output file to a given process).

log(message)[source]

Append a message to the pipeline log.

exception process_pool.TimeoutException[source]

fscache

class fscache.FSCache[source]

Use this class if you expect to make the same os.path.* calls many times during a short time. The first time you call a method with certain arguments, the call is made, but all subsequent calls are served from a cache.

Usage example:

# Instantiate a new file system cache.
fsc = FSCache()

# This call will stat the file system.
print(fsc.exists('/home'))

# This call will leave the file system alone, the cached result will be returned.
print(fsc.exists('/home'))

You may call any method which is available in os.path.

misc

class misc.Enum(_list)[source]
misc.append_suffix_to_path(path, suffix)[source]

Append a suffix to a path, for example:

  • path: /home/michael/chocolate-cookies.txt.gz
  • suffix: done right
  • result: /home/michael/chocolate-cookies-done-right.txt.gz
misc.assign_strings(paths, tags)[source]

Assign N strings (path names, for example) to N tags. Example:

  • paths = [‘RIB0000794-cutadapt-R1.fastq.gz’, ‘RIB0000794-cutadapt-R2.fastq.gz’]
  • tags = [‘R1’, ‘R2’]
  • result = { ‘R1’: ‘RIB0000794-cutadapt-R1.fastq.gz’, ‘R2’: ‘RIB0000794-cutadapt-R2.fastq.gz’ }

If this is not possible without ambiguities, a StandardError is thrown. Attention: The number of paths must be equal to the number of tags, a 1:1 relation is returned, if possible.

misc.bytes_to_str(num)[source]

Convert a number representing a number of bytes into a human-readable string such as “4.7 GB”

misc.duration_to_str(duration, long=False)[source]

Minor adjustment for Python’s duration to string conversion, removed microsecond accuracy and replaces ‘days’ with ‘d’

misc.natsorted(l)[source]

Return a ‘naturally sorted’ permutation of l.

Credits: http://www.codinghorror.com/blog/2007/12/sorting-for-humans-natural-sort-order.html