Add New Functionality¶
Implement New Steps¶
uap can be easily extended by implementing new source or processing steps. This requires basic python programming skills. New steps are added to uap by placing a single Python file into one of these folders in the uap installation directory:
include/sources
- Place source step files here
include/steps
- Place processing step files here
Let’s talk about how to implement such uap steps.
Step 1: Import Statements and Logger¶
At the beginning of every step please import the required modules and create a logger object.
# First import standard libraries
import os
from logging import getLogger
# Secondly import third party libraries
import yaml
# Thirdly import local application files
from abstract_step import AbstractStep # or AbstractSourceStep
# Get application wide logger
logger = getLogger("uap_logger")
Essential imports are the from logging import getLogger
and
from abstract_step import ...
.
The former is necessary to get access to the application wide logger and
the latter to be able to inherit either from AbstractStep
or
AbstractSourceStep
.
Step 2: Class Definition¶
Now you need to define a class (which inherits either from AbstractStep
or
AbstractSourceStep
) and its __init__
method.
class ConcatenateFiles(AbstractStep):
# Overwrite initialisation
def __init__(self, pipeline):
# Call super classes initialisation
super(ConcatenateFiles, self).__init__(pipeline)
..
The new class needs to be derived from either AbstractStep
, for processing
steps, or AbstractSourceStep
, for source steps.
Step 3: __init__
Method¶
The __init__
method is the place where you should declare:
- Tools via
self.require_tool('tool_name')
: - Steps usually require tools to perform their task.
Each tool that is going to be used by a step needs to be requested via the
method
require_tool('tool_name')
. uap tests the existence of the required tools whenever it constructs the directed acyclic graph (DAG) of the analysis. The test is based on the information provided in the tools section of the analysis configuration. An entry fortool_name
has to exist and to provide information to verify the tools accessibility. - Connections via
add_connection(...)
: Connections are defined by the method
add_connection(...)
. They are used to transfer data from one step to another. If a step defines an output connectionout/something
and a subsequent step defines an input connection namedin/something
, then the files beloging toout/something
will be available via the connectionin/something
.Please name connection in a way that they describe the data itself and NOT the data type. For instance, use
in/genome
overin/fasta
. The data type of the received input data should be checked by the steps to make sure to execute the correct commands.TODO: Reanimate the constraints feature. It would often save some lines of code to be able to define constraints on the connections.
- Options via
self.add_option()
: Options allow to influence the commands executed by a step. It is advisable to provide as many meaningful options as possible to keep steps flexible. Steps can have any number of options. Options are defined via the method
add_option()
.The
add_option()
method allows to specify various information about the option. The method parameters are these:key
- name of the option (if possible include the name of the tool
this option influences e.g.
dd-blocksize
to setdd
blocksize)
option_type
- The option type has to be at least one of
int
,float
,str
,bool
,list
, ordict
.
optional
(Boolean)- Defines if the option is mandatory (
False
) or optional (True
).
choices
- List of valid values for the option.
default
- Defines the default value for the option.
description
- The description of the functionality of the option.
..
# Define connections
self.add_connection('in/text')
self.add_connection('out/text')
# Request tools
self.require_tool('cat')
# Options for workflow
self.add_option('concatenate_all_files', bool, optional=False,
default=False, description="Concatenate all files from "
"all runs, if 'True'.")
# Options for 'cat' (see manpage)
self.add_option('show-all', bool, optional=True,
description="Show all characters")
self.add_option('number-nonblank', int, optional=True,
description="number nonempty output lines, "
"overrides --number")
self.add_option('show-ends', bool, optional=True,
description="display $ at end of each line")
self.add_option("number", int, optional=True,
description="number all output lines")
self.add_option("squeeze-blank", bool, optional=True,
description="suppress repeated empty output lines")
self.add_option("show-tabs", bool, optional=True,
description="display TAB characters as ^I")
self.add_option("show-nonprinting", bool, optional=True,
description="use ^ and M- notation, except for "
"LFD and TAB")
..
Step 4: runs
Method¶
The runs
method is where all the work is done.
This method gets handed over a dictionary of dictionaries.
The keys of the first dictionary are the run IDs (often resembling the samples).
The values of the first dictionary is another dictionary.
The keys of that second dictionary are the connections e.g. “in/text” and the
values are the corresponding files belonging to that connection.
Let’s inspect all the run IDs, connections, and input files we got from our upstream steps. And let’s tore all files we received in a list for later use.
..
def runs(self, run_ids_connections_files):
all_files = list()
# Let's inspect the run_ids_connections_files data structure
for run_id in run_ids_connections_files.keys():
logger.info("Run ID: %s" % run_id)
for connection in run_ids_connections_files[run_id].keys():
logger.info("Connection: %s" % connection)
for in_file in run_ids_connections_files[run_id][connection]:
logger.info("Input file: %s" % in_file)
# Collect all files
all_files.append(in_file)
..
It comes in handy to assemble a list with all options for cat
here.
..
# List with options for 'cat'
cat_options = ['show-all', 'number-nonblank', 'show-ends', 'number',
'squeeze-blank', 'show-tabs', 'show-nonprinting']
# Get all options which were set
set_options = [option for option in cat_options if \
self.is_option_set_in_config(option)]
# Compile the list of options
cat_option_list = list()
for option in set_options:
# bool options look different than ...
if isinstance(self.get_option(option), bool):
if self.get_option(option):
cat_option_list.append('--%s' % option)
# ... the rest ...
else:
cat_option_list.append('--%s' % option)
# ... make sure to cast the values to string
cat_option_list.append(str(self.get_option(option)))
..
What should happen if we are told to concatenate all files from all input runs?
We have to create a single run with a new run ID ‘all_files’.
The run consists of a exec_group
that runs the cat
command.
Note
An exec_group
is a list of commands which are executed in one go.
You might create multiple exec_group
’s if you need to make sure a set of
commands finished before another set is started.
An exec_group
can contain commands and pipelines.
They can be added like this:
# Add a single command
exec_group.add_command(...)
# Add a pipeline to an exec_group
with exec_group.add_pipeline as pipe:
...
# Add a command to a pipeline
pipe.add_command(...)
The result of the concatenation is written to an output file. The run object needs to know about each output file that is going to be created.
Note
An output file is announced via the run objects
add_output_file(tag, out_path, in_paths)
method.
The method parameters are:
tag
: The name of the out connection e.g. ‘text’ for ‘out/text’out_path
: The name of the output file (best practice is to add the run ID to the file name)in_paths
: The input files this output file is based on
..
# Okay let's concatenate all files we get
if self.get_option('concatenate_all_files'):
run_id = 'all_files'
# New run named 'all_files' is created here
with self.declare_run(run_id) as run:
# Create an exec
with run.new_exec_group() as exec_group:
# Assemble the cat command
cat = [ self.get_tool('cat') ]
# Add the options to the command
cat.extend( cat_option_list )
cat.extend( all_files )
# Now add the command to the execution group
exec_group.add_command(
cat,
stdout_path = run.add_output_file(
'text',
"%s_concatenated.txt" % run_id,
all_files)
)
..
What should happen if all files of an input run have to be concatenated? We create a new run for each input run and concatenate all files that belong to the input run.
# Concatenate all files from a runs 'in/text' connection
else:
# iterate over all run IDs ...
for run_id in run_ids_connections_files.keys():
input_paths = run_ids_connections_files[run_id]['in/text']
# ... and declare a new run for each of them.
with self.declare_run(run_id) as run:
with run.new_exec_group() as exec_group:
# Assemble the cat command
cat = [ self.get_tool('cat') ]
# Add the options to the command
cat.extend( cat_option_list )
cat.extend( input_paths )
# Now add the command to the execution group
exec_group.add_command(
cat,
stdout_path = run.add_output_file(
'text',
"%s_concatenated.txt" % run_id,
input_paths)
)
That’s it. You created your first uap processing step.
Step 5: Add the new step to uap¶
You have to make the new step known to uap.
Save the complete file into uap’s include/steps
folder.
Processing step files are located at uap’s include/steps/
folder
and source step files at uap’s include/sources/
folder.
You can control that your step is correctly “installed” if its included in the list of all source and processing steps:
$ ls -la $(dirname $(which uap))/include/sources
... Lists all available source step files
$ ls -la $(dirname $(which uap))/include/steps
... Lists all available processing step files
You can also use uap’s steps subcommand to get information about installed steps.
If the step file exists at the correct location that step can be used in an analysis configuration file.
A potential example YAML file named test.yaml
could look like this:
destination_path: example-out/test/
steps:
##################
## Source steps ##
##################
raw_file_source:
pattern: example-data/text-files/*.txt
group: (.*).txt
######################
## Processing steps ##
######################
cat:
_depends: raw_file_source
_connect:
in/text:
- raw_file_source/raw
concatenate_all_files: False
tools:
cat:
path: cat
get_version: '--version'
exit_code: 0
You need to create the destination path and some text files matching the
pattern example-data/text-files/*.txt
.
Also you see the work of the _connect
keyword in play.
Check the status of the configured analysis:
$ uap test.yaml status
Ready runs
----------
[r] cat/Hello_america
[r] cat/Hello_asia
[r] cat/Hello_europe
[r] cat/Hello_world
runs: 4 total, 4 ready
Best Practices¶
There are a couple of things you should keep in mind while implementing new steps or modifying existing ones:
- NEVER remove files! If files need to be removed report the issue and exit uap or force the user to call a specific subcommand. Never delete files without permission by the user.
- Make sure errors already show up in when the steps
runs()
method is called the first time. So, look out for things that may fail inruns
. Stick to fail early, fail often. That way errors show up before submitting jobs to the cluster and wasting precious cluster waiting time is avoided. - Make sure that all tools which you request inside the
runs()
method are also required by the step viaself.require_tool()
. Use the__init__()
method to request tools. - Make sure your disk access is as cluster-friendly as possible (which
primarily means using large block sizes and preferably no seek operations).
If possible, use pipelines to wrap your commands in
pigz
ordd
commands. Make the used block size configurable. Although this is not possible in every case (for example when seeking in files is involved), it is straightforward with tools that read a continuous stream fromstdin
and write a continuous stream tostdout
. - Always use
os.path.join(...)
to handle paths. - Use bash commands like
mkfifo
over python library equivalents likeos.mkfifo()
. Themkfifo
command is hashed while anos.mkfifo()
is not. - Keep your steps as flexible as possible. You don’t know what other user might need, so let them decide.
Usage of dd
and mkfifo
¶
uap relies often on dd
and FIFOs to process data with fewer
disk read-write operations.
Please provide a step option to adjust the dd
blocksize (this option
is usually called dd-blocksize
).
Create your steps in a way that they perform the least filesystem operations.
Some systems might be very sensitive to huge numbers of read-write operations.