--- a
+++ b/qiita_db/processing_job.py
@@ -0,0 +1,2541 @@
+# -----------------------------------------------------------------------------
+# Copyright (c) 2014--, The Qiita Development Team.
+#
+# Distributed under the terms of the BSD 3-clause License.
+#
+# The full license is in the file LICENSE, distributed with this software.
+# -----------------------------------------------------------------------------
+
+import networkx as nx
+import qiita_db as qdb
+import pandas as pd
+from numpy import log as nlog # noqa
+
+from collections import defaultdict, Iterable
+from datetime import datetime, timedelta
+from itertools import chain
+from json import dumps, loads
+from multiprocessing import Process, Queue, Event
+from re import search, findall
+from subprocess import Popen, PIPE
+from time import sleep
+from uuid import UUID
+from os.path import join
+from humanize import naturalsize
+from os import environ
+
+from qiita_core.qiita_settings import qiita_config
+from qiita_db.util import create_nested_path
+
+
+class Watcher(Process):
+    # TODO: Qiita will need a proper mapping of these states to Qiita states
+    # Currently, these strings are being inserted directly into Qiita's status
+    # table. Qiita will be unfamiliar with many of these. We will need at least
+    # one additional job type for 'Held': A job waiting for another to complete
+    # before it can run.
+    #
+    # Note that the main Qiita script instantiates an object of this class in
+    # a separate thread, so it can periodically update the database w/metadata
+    # from Watcher's queue. Qiita's script also calls qdb.complete() so there
+    # are no circular references. TODO: replace w/a REST call.
+
+    # valid Qiita states:
+    #             The current status of the job, one of {'queued', 'running',
+    #             'success', 'error', 'in_construction', 'waiting'}
+
+    # TODO: what to map in_construction to?
+    job_state_map = {'C': 'completed', 'E': 'exiting', 'H': 'held',
+                     'Q': 'queued', 'R': 'running', 'T': 'moving',
+                     'W': 'waiting', 'S': 'suspended'}
+
+    # TODO: moving, waiting, and suspended have been mapped to
+    # 'running' in Qiita, as 'waiting' in Qiita connotes that the
+    # main job itself has completed, and is waiting on validator
+    # jobs to finish, etc. Revisit
+    job_scheduler_to_qiita_state_map = {'completed': 'completed',
+                                        'held': 'queued',
+                                        'queued': 'queued',
+                                        'exiting': 'running',
+                                        'running': 'running',
+                                        'moving': 'running',
+                                        'waiting': 'running',
+                                        'suspended': 'running',
+                                        'DROPPED': 'error'}
+
+    def __init__(self):
+        super(Watcher, self).__init__()
+
+        # set self.owner to qiita, or whomever owns processes we need to watch.
+        self.owner = qiita_config.job_scheduler_owner
+
+        # Setting a polling value less than 60 seconds allows for multiple
+        # chances to catch the exit status before it disappears.
+        self.polling_value = qiita_config.job_scheduler_poll_val
+
+        # the cross-process method by which to communicate across
+        # process boundaries. Note that when Watcher object runs,
+        # another process will get created, and receive a copy of
+        # the Watcher object. At this point, these self.* variables
+        # become local to each process. Hence, the main process
+        # can't see self.processes for example; theirs will just
+        # be empty.
+        self.queue = Queue()
+        self.processes = {}
+
+        # the cross-process sentinel value to shutdown Watcher
+        self.event = Event()
+
+    def _element_extract(self, snippet, list_of_elements,
+                         list_of_optional_elements):
+        results = {}
+        missing_elements = []
+
+        for element in list_of_elements:
+            value = search('<%s>(.*?)</%s>' % (element, element), snippet)
+            if value:
+                results[element] = value.group(1)
+            else:
+                missing_elements.append(element)
+
+        if missing_elements:
+            raise AssertionError("The following elements were not found: %s"
+                                 % ', '.join(missing_elements))
+
+        for element in list_of_optional_elements:
+            value = search('<%s>(.*?)</%s>' % (element, element), snippet)
+            if value:
+                results[element] = value.group(1)
+
+        return results
+
+    def _process_dependent_jobs(self, results):
+        # when a job has its status changed, check to see if the job completed
+        # with an error. If so, check to see if it had any jobs that were being
+        # 'held' on this job's successful completion. If we are maintaining
+        # state on any of these jobs, mark them as 'DROPPED', because they will
+        # no longer appear in qstat output.
+        if results['job_state'] == 'completed':
+            if results['exit_status'] == '0':
+                return
+
+            if 'depend' in results:
+                tmp = results['depend'].split(':')
+                if tmp[0] == 'beforeok':
+                    tmp.pop(0)
+                    for child_job_id in tmp:
+                        # jobs in 'beforeok' are labeled with the complete
+                        # job id and what looks to be the server name doing
+                        # the work. For now, simply remove the
+                        # '@host.domain.org' (server) component.
+                        child_job_id = child_job_id.split('@')[0]
+                        self.processes[child_job_id]['job_state'] = 'DROPPED'
+                        self.queue.put(self.processes[child_job_id])
+
+    def run(self):
+        # check to see if qstat is available. If not, exit immediately.
+        proc = Popen("qstat -x", shell=True, stdout=PIPE, stderr=PIPE)
+        proc.wait()
+        if proc.returncode != 0:
+            # inform any process expecting data from Watcher
+            self.queue.put('QUIT')
+            self.event.set()
+
+        while not self.event.is_set():
+            proc = Popen("qstat -x", shell=True, stdout=PIPE, stderr=PIPE)
+            stdout, stderr = proc.communicate()
+            if proc.returncode == 0:
+                # qstat returned successfully with metadata on processes
+                # break up metadata into individual <Job></Job> elements
+                # for processing.
+                m = findall('<Job>(.*?)</Job>', stdout.decode('ascii'))
+                for item in m:
+                    # filter out jobs that don't belong to owner
+                    if search('<Job_Owner>%s</Job_Owner>' % self.owner, item):
+                        # extract the metadata we want.
+                        # if a job has completed, an exit_status element will
+                        # be present. We also want that.
+                        results = self._element_extract(item, ['Job_Id',
+                                                               'Job_Name',
+                                                               'job_state'],
+                                                              ['depend'])
+                        tmp = Watcher.job_state_map[results['job_state']]
+                        results['job_state'] = tmp
+                        if results['job_state'] == 'completed':
+                            results2 = self._element_extract(item,
+                                                             ['exit_status'],
+                                                             [])
+                            results['exit_status'] = results2['exit_status']
+
+                        # determine if anything has changed since last poll
+                        if results['Job_Id'] in self.processes:
+                            if self.processes[results['Job_Id']] != results:
+                                # metadata for existing job has changed
+                                self.processes[results['Job_Id']] = results
+                                self.queue.put(results)
+                                self._process_dependent_jobs(results)
+                        else:
+                            # metadata for new job inserted
+                            self.processes[results['Job_Id']] = results
+                            self.queue.put(results)
+            else:
+                self.queue.put('QUIT')
+                self.event.set()
+                # don't join(), since we are exiting from the main loop
+
+            sleep(self.polling_value)
+
+    def stop(self):
+        # 'poison pill' to thread/process
+        self.queue.put('QUIT')
+        # setting self.event is a safe way of communicating a boolean
+        # value across processes and threads.
+        # when this event is 'set' by the main line of execution in Qiita,
+        # (or in any other process if need be), Watcher's run loop will
+        # stop and the Watcher process will exit.
+        self.event.set()
+        # Here, it is assumed that we are running this from the main
+        # context. By joining(), we're waiting for the Watcher process to
+        # end before returning from this method.
+        self.join()
+
+
+def launch_local(env_script, start_script, url, job_id, job_dir):
+
+    # launch_local() differs from launch_job_scheduler(), as no Watcher() is
+    # used.
+    # each launch_local() process will execute the cmd as a child process,
+    # wait, and update the database once cmd has completed.
+    #
+    # As processes are lighter weight than jobs, this should be fine.
+    # This is how the current job model works locally.
+    cmd = [start_script, url, job_id, job_dir]
+    print("ENV_SCRIPT: %s" % env_script)
+    print("START_SCRIPT: %s" % start_script)
+    print("URL: %s" % url)
+    print("JOB ID: %s" % job_id)
+    print("JOB DIR: %s" % job_dir)
+
+    # When Popen() executes, the shell is not in interactive mode,
+    # so it is not sourcing any of the bash configuration files
+    # We need to source it so the env_script are available
+    cmd = "bash -c '%s; %s'" % (env_script, ' '.join(cmd))
+    print("CMD STRING: %s" % cmd)
+
+    # Popen() may also need universal_newlines=True
+    proc = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)
+
+    # Communicate pulls all stdout/stderr from the PIPEs
+    # This call waits until cmd is done
+    stdout, stderr = proc.communicate()
+    print("STDOUT: %s" % stdout)
+    print("STDERR: %s" % stderr)
+
+    # proc.returncode will be equal to None if the process hasn't finished
+    # yet. If cmd was terminated by a SIGNAL, it will be a negative value.
+    # (*nix platforms only)
+    error = None
+    print("RETURN CODE: %s" % proc.returncode)
+    print("JOB ID: %s" % job_id)
+
+    if proc.returncode != 0:
+        error = "error from launch_local when launching cmd='%s'" % cmd
+        error = "%s\n%s\n%s" % (error, stdout, stderr)
+
+        # Forcing the creation of a new connection
+        qdb.sql_connection.create_new_transaction()
+        ProcessingJob(job_id).complete(False, error=error)
+
+
+def launch_job_scheduler(env_script, start_script, url, job_id, job_dir,
+                         dependent_job_id, resource_params):
+
+    # note that job_id is Qiita's UUID, not a job_scheduler job ID
+    cmd = [start_script, url, job_id, job_dir]
+
+    lines = [
+        '#!/bin/bash',
+        f'#SBATCH --error {job_dir}/slurm-error.txt',
+        f'#SBATCH --output {job_dir}/slurm-output.txt']
+    lines.append("echo $SLURM_JOBID")
+    lines.append("source ~/.bash_profile")
+    lines.append(env_script)
+
+    epilogue = environ.get('QIITA_JOB_SCHEDULER_EPILOGUE', '')
+    if epilogue:
+        lines.append(f"#SBATCH --epilog {epilogue}")
+
+    lines.append(' '.join(cmd))
+
+    # writing the script file
+    create_nested_path(job_dir)
+
+    fp = join(job_dir, '%s.txt' % job_id)
+
+    with open(fp, 'w') as job_file:
+        job_file.write("\n".join(lines))
+
+    sbatch_cmd = ['sbatch']
+
+    if dependent_job_id:
+        # note that a dependent job should be submitted before the
+        # 'parent' job ends
+        sbatch_cmd.append("-d")
+        sbatch_cmd.append("afterok:%s" % dependent_job_id)
+
+    sbatch_cmd.append(resource_params)
+    sbatch_cmd.append(fp)
+
+    stdout, stderr, return_value = _system_call(' '.join(sbatch_cmd))
+
+    if return_value != 0:
+        raise AssertionError(f'Error submitting job: {sbatch_cmd} :: {stderr}')
+
+    job_id = stdout.strip('\n').split(" ")[-1]
+
+    return job_id
+
+
+def _system_call(cmd):
+    """Execute the command `cmd`
+
+    Parameters
+    ----------
+    cmd : str
+        The string containing the command to be run.
+
+    Returns
+    -------
+    tuple of (str, str, int)
+        The standard output, standard error and exist status of the
+        executed command
+
+    Notes
+    -----
+    This function is ported from QIIME (http://www.qiime.org), previously named
+    qiime_system_call. QIIME is a GPL project, but we obtained permission from
+    the authors of this function to port it to Qiita and keep it under BSD
+    license.
+    """
+    proc = Popen(cmd, universal_newlines=True, shell=True, stdout=PIPE,
+                 stderr=PIPE)
+    # Communicate pulls all stdout/stderr from the PIPEs
+    # This call blocks until the command is done
+    stdout, stderr = proc.communicate()
+    return_value = proc.returncode
+    return stdout, stderr, return_value
+
+
+class ProcessingJob(qdb.base.QiitaObject):
+    r"""Models a job that executes a command in a set of artifacts
+
+    Attributes
+    ----------
+    user
+    command
+    parameters
+    status
+    log
+    heartbeat
+    step
+
+    Methods
+    -------
+    exists
+    create
+    """
+    _table = 'processing_job'
+    _launch_map = {'qiita-plugin-launcher':
+                   {'function': launch_local,
+                    'execute_in_process': False},
+                   'qiita-plugin-launcher-slurm':
+                   {'function': launch_job_scheduler,
+                    'execute_in_process': True}}
+
+    @classmethod
+    def exists(cls, job_id):
+        """Check if the job `job_id` exists
+
+        Parameters
+        ----------
+        job_id : str
+            The job id
+
+        Returns
+        -------
+        bool
+            True if the job `job_id` exists. False otherwise.
+        """
+        try:
+            UUID(job_id)
+        except ValueError:
+            return False
+
+        with qdb.sql_connection.TRN:
+            sql = """SELECT EXISTS(SELECT *
+                                   FROM qiita.processing_job
+                                   WHERE processing_job_id = %s)"""
+            qdb.sql_connection.TRN.add(sql, [job_id])
+            return qdb.sql_connection.TRN.execute_fetchlast()
+
+    @classmethod
+    def by_external_id(cls, external_id):
+        """Return Qiita Job UUID associated with external_id
+
+        Parameters
+        ----------
+        external_id : str
+            An external id (e.g. job scheduler Job ID)
+
+        Returns
+        -------
+        str
+            Qiita Job UUID, if found, otherwise None
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT processing_job_id FROM qiita.processing_job
+                     WHERE external_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [external_id])
+            return cls(qdb.sql_connection.TRN.execute_fetchlast())
+
+    @property
+    def resource_allocation_info(self):
+        """Return resource allocation defined for this job. For
+        external computational resources only.
+
+        Returns
+        -------
+        str
+            A resource allocation string useful to the external resource
+        """
+        with qdb.sql_connection.TRN:
+            analysis = None
+            if self.command.name == 'complete_job':
+                jtype = 'COMPLETE_JOBS_RESOURCE_PARAM'
+                params = self.parameters.values
+                v = loads(params['payload'])
+                # assume an empty string for name is preferable to None
+                name = ''
+                if v['artifacts'] is not None:
+                    an_element = list(v['artifacts'].keys())[0]
+                    name = v['artifacts'][an_element]['artifact_type']
+                # for analysis we have two options, either use the
+                # input_artifacts or use the parameter 'analysis' of the job
+                # to complete
+                job = ProcessingJob(params['job_id'])
+                params = job.parameters.values
+                ia = job.input_artifacts
+                if 'analysis' in params and params['analysis'] is not None:
+                    analysis = qdb.analysis.Analysis(params['analysis'])
+                elif ia:
+                    analysis = ia[0].analysis
+            elif self.command.name == 'release_validators':
+                jtype = 'RELEASE_VALIDATORS_RESOURCE_PARAM'
+                tmp = ProcessingJob(self.parameters.values['job'])
+                name = tmp.parameters.command.name
+                if tmp.input_artifacts:
+                    analysis = tmp.input_artifacts[0].analysis
+            elif self.command.name == 'Validate':
+                jtype = 'VALIDATOR'
+                vals = self.parameters.values
+                name = vals['artifact_type']
+                if vals['analysis'] is not None:
+                    analysis = qdb.analysis.Analysis(vals['analysis'])
+            elif self.id == 'register':
+                jtype = 'REGISTER'
+                name = 'REGISTER'
+            else:
+                # assume anything else is a command
+                jtype = 'RESOURCE_PARAMS_COMMAND'
+                name = self.command.name
+                # for analysis we have two options, either use the
+                # input_artifacts or use the parameter 'analysis' of self
+                params = self.parameters.values
+                ia = self.input_artifacts
+                if 'analysis' in params and params['analysis'] is not None:
+                    analysis = qdb.analysis.Analysis(params['analysis'])
+                elif ia:
+                    analysis = ia[0].analysis
+
+            # first, query for resources matching name and type
+            sql = """SELECT allocation FROM
+                     qiita.processing_job_resource_allocation
+                     WHERE name = %s and job_type = %s"""
+            qdb.sql_connection.TRN.add(sql, [name, jtype])
+
+            result = qdb.sql_connection.TRN.execute_fetchflatten()
+
+            # if no matches for both type and name were found, query the
+            # 'default' value for the type
+
+            if not result:
+                sql = """SELECT allocation FROM
+                         qiita.processing_job_resource_allocation WHERE
+                         name = %s and job_type = %s"""
+                qdb.sql_connection.TRN.add(sql, ['default', jtype])
+
+                result = qdb.sql_connection.TRN.execute_fetchflatten()
+                if not result:
+                    AssertionError(
+                        "Could not match %s to a resource allocation!" % name)
+
+            allocation = result[0]
+            # adding user_level extra parameters
+            allocation = f'{allocation} {self.user.slurm_parameters}'.strip()
+            # adding analysis reservation
+            if analysis is not None:
+                sr = analysis.slurm_reservation
+                if sr is not None:
+                    allocation = f'{allocation} --reservation {sr}'
+
+            if ('{samples}' in allocation or '{columns}' in allocation or
+                    '{input_size}' in allocation):
+                samples, columns, input_size = self.shape
+                parts = []
+                error_msg = ('Obvious incorrect allocation. Please '
+                             'contact %s' % qiita_config.help_email)
+                for part in allocation.split('--'):
+                    param = ''
+                    if part.startswith('time '):
+                        param = 'time '
+                    elif part.startswith('mem '):
+                        param = 'mem '
+                    else:
+                        # if parts is empty, this is the first part so no --
+                        if parts:
+                            parts.append(f'--{part.strip()}')
+                        else:
+                            parts.append(part.strip())
+                        continue
+
+                    part = part[len(param):]
+                    if ('{samples}' in part or '{columns}' in part or
+                            '{input_size}' in part):
+                        # to make sure that the formula is correct and avoid
+                        # possible issues with conversions, we will check that
+                        # all the variables {samples}/{columns}/{input_size}
+                        # present in the formula are not None, if any is None
+                        # we will set the job's error (will stop it) and the
+                        # message is gonna be shown to the user within the job
+                        if (('{samples}' in part and samples is None) or
+                                ('{columns}' in part and columns is None) or
+                                ('{input_size}' in part and input_size is
+                                 None)):
+                            self._set_error(error_msg)
+                            return 'Not valid'
+
+                        try:
+                            # if eval has something that can't be processed
+                            # it will raise a NameError
+                            value = eval(part.format(
+                                samples=samples, columns=columns,
+                                input_size=input_size))
+                        except NameError:
+                            self._set_error(error_msg)
+                            return 'Not valid'
+                        else:
+                            if value <= 0:
+                                self._set_error(error_msg)
+                                return 'Not valid'
+
+                            if param == 'time ':
+                                td = timedelta(seconds=value)
+                                if td.days > 0:
+                                    days = td.days
+                                    td = td - timedelta(days=days)
+                                    part = f'{days}-{str(td)}'
+                                else:
+                                    part = str(td)
+                                part = part.split('.')[0]
+                            else:
+                                part = naturalsize(
+                                    value, gnu=True, format='%.0f')
+                    parts.append(f'--{param}{part}'.strip())
+
+                allocation = ' '.join(parts)
+
+            return allocation
+
+    @classmethod
+    def create(cls, user, parameters, force=False):
+        """Creates a new job in the system
+
+        Parameters
+        ----------
+        user : qiita_db.user.User
+            The user executing the job
+        parameters : qiita_db.software.Parameters
+            The parameters of the job being executed
+        force : bool
+            Force creation on duplicated parameters
+
+        Returns
+        -------
+        qiita_db.processing_job.ProcessingJob
+            The newly created job
+
+        Notes
+        -----
+        If force is True the job is going to be created even if another job
+        exists with the same parameters
+        """
+        TTRN = qdb.sql_connection.TRN
+        with TTRN:
+            command = parameters.command
+            if not force:
+                # check if a job with the same parameters already exists
+                sql = """SELECT processing_job_id, email,
+                        processing_job_status, COUNT(aopj.artifact_id)
+                     FROM qiita.processing_job
+                     LEFT JOIN qiita.processing_job_status
+                        USING (processing_job_status_id)
+                     LEFT JOIN qiita.artifact_output_processing_job aopj
+                        USING (processing_job_id)
+                     WHERE command_id = %s AND processing_job_status IN (
+                        'success', 'waiting', 'running', 'in_construction') {0}
+                     GROUP BY processing_job_id, email,
+                        processing_job_status"""
+
+                # we need to use ILIKE because of booleans as they can be
+                # false or False
+                params = []
+                for k, v in parameters.values.items():
+                    # this is necessary in case we have an Iterable as a value
+                    # but that is string
+                    if isinstance(v, Iterable) and not isinstance(v, str):
+                        for vv in v:
+                            params.extend([k, str(vv)])
+                    else:
+                        params.extend([k, str(v)])
+
+                if params:
+                    # divided by 2 as we have key-value pairs
+                    len_params = int(len(params)/2)
+                    sql = sql.format(' AND ' + ' AND '.join(
+                        ["command_parameters->>%s ILIKE %s"] * len_params))
+                    params = [command.id] + params
+                    TTRN.add(sql, params)
+                else:
+                    # the sql variable expects the list of parameters but if
+                    # there is no param we need to replace the {0} with an
+                    # empty string
+                    TTRN.add(sql.format(""), [command.id])
+
+                # checking that if the job status is success, it has children
+                # [2] status, [3] children count
+                existing_jobs = [r for r in TTRN.execute_fetchindex()
+                                 if r[2] != 'success' or r[3] > 0]
+                if existing_jobs:
+                    raise ValueError(
+                        'Cannot create job because the parameters are the '
+                        'same as jobs that are queued, running or already '
+                        'have succeeded:\n%s' % '\n'.join(
+                            ["%s: %s" % (jid, status)
+                             for jid, _, status, _ in existing_jobs]))
+
+            sql = """INSERT INTO qiita.processing_job
+                        (email, command_id, command_parameters,
+                         processing_job_status_id)
+                     VALUES (%s, %s, %s, %s)
+                     RETURNING processing_job_id"""
+            status = qdb.util.convert_to_id(
+                "in_construction", "processing_job_status")
+            sql_args = [user.id, command.id,
+                        parameters.dump(), status]
+            TTRN.add(sql, sql_args)
+            job_id = TTRN.execute_fetchlast()
+
+            # Link the job with the input artifacts
+            sql = """INSERT INTO qiita.artifact_processing_job
+                        (artifact_id, processing_job_id)
+                     VALUES (%s, %s)"""
+            pending = defaultdict(dict)
+            for pname, vals in command.parameters.items():
+                if vals[0] == 'artifact':
+                    artifact_info = parameters.values[pname]
+                    # If the artifact_info is a list, then the artifact
+                    # still doesn't exist because the current job is part
+                    # of a workflow, so we can't link
+                    if not isinstance(artifact_info, list):
+                        TTRN.add(sql, [artifact_info, job_id])
+                    else:
+                        pending[artifact_info[0]][pname] = artifact_info[1]
+                elif pname == 'artifact':
+                    TTRN.add(sql, [parameters.values[pname], job_id])
+
+            if pending:
+                sql = """UPDATE qiita.processing_job
+                         SET pending = %s
+                         WHERE processing_job_id = %s"""
+                TTRN.add(sql, [dumps(pending), job_id])
+
+            TTRN.execute()
+
+            return cls(job_id)
+
+    @property
+    def user(self):
+        """The user that launched the job
+
+        Returns
+        -------
+        qiita_db.user.User
+            The user that launched the job
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT email
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            email = qdb.sql_connection.TRN.execute_fetchlast()
+            return qdb.user.User(email)
+
+    @property
+    def command(self):
+        """The command that the job executes
+
+        Returns
+        -------
+        qiita_db.software.Command
+            The command that the job executes
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT command_id
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            cmd_id = qdb.sql_connection.TRN.execute_fetchlast()
+            return qdb.software.Command(cmd_id)
+
+    @property
+    def parameters(self):
+        """The parameters used in the job's command
+
+        Returns
+        -------
+        qiita_db.software.Parameters
+            The parameters used in the job's command
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT command_id, command_parameters
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            res = qdb.sql_connection.TRN.execute_fetchindex()[0]
+            return qdb.software.Parameters.load(
+                qdb.software.Command(res[0]), values_dict=res[1])
+
+    @property
+    def input_artifacts(self):
+        """The artifacts used as input in the job
+
+        Returns
+        -------
+        list of qiita_db.artifact.Artifact
+            The artifacs used as input in the job
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT artifact_id
+                     FROM qiita.artifact_processing_job
+                     WHERE processing_job_id = %s
+                     ORDER BY artifact_id"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return [qdb.artifact.Artifact(aid)
+                    for aid in qdb.sql_connection.TRN.execute_fetchflatten()]
+
+    @property
+    def status(self):
+        """The status of the job
+
+        Returns
+        -------
+        str
+            The current status of the job, one of {'queued', 'running',
+            'success', 'error', 'in_construction', 'waiting'}
+
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT processing_job_status
+                     FROM qiita.processing_job_status
+                        JOIN qiita.processing_job
+                            USING (processing_job_status_id)
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return qdb.sql_connection.TRN.execute_fetchlast()
+
+    def _generate_notification_message(self, value, error_msg):
+        ignored_software = ('artifact definition',)
+        ignored_commands = ('Validate', 'complete_job', 'release_validators')
+
+        # abort early conditions (don't send an email notification)
+        # tentatively accept the overhead of a function-call, even when a
+        # notification isn't sent, just to keep the logic clean and
+        # centralized.
+
+        if value == 'waiting':
+            # notification not needed.
+            return None
+
+        if not self.user.info['receive_processing_job_emails']:
+            # notification not needed.
+            return None
+
+        if self.command.software.name in ignored_software:
+            # notification not needed.
+            return None
+
+        if self.command.name in ignored_commands:
+            # notification not needed.
+            return None
+
+        # generate subject line
+        subject = (f'{self.command.name}: {value}, {self.id} '
+                   f'[{self.external_id}]')
+
+        # generate message line
+        message = ''
+
+        input_artifacts = self.input_artifacts
+        if input_artifacts is None:
+            # this is an admin job. display command name and parameters
+            message = (f'Admin Job {self.command.name} '
+                       f'{self.command.parameters}')
+        else:
+            for artifact in input_artifacts:
+                if artifact.prep_templates:
+                    # this is a processing job. display the study id as link,
+                    # prep ids, data_type, and command name.
+                    study_ids = [x.study_id for x in artifact.prep_templates]
+                    prep_ids = [x.id for x in artifact.prep_templates]
+                    data_types = [x.data_type() for x in
+                                  artifact.prep_templates]
+
+                    # there should only be one study id
+                    study_ids = set(study_ids)
+                    if len(study_ids) > 1:
+                        raise qdb.exceptions.QiitaError("More than one Study "
+                                                        "ID was found: "
+                                                        f"{study_ids}")
+                    study_id = study_ids.pop()
+
+                    # there should be at least one prep_id and probably more.
+                    prep_ids = list(set(prep_ids))
+                    if len(prep_ids) == 0:
+                        raise qdb.exceptions.QiitaError("No Prep IDs were "
+                                                        "found")
+                    if len(prep_ids) == 1:
+                        study_url = (f'{qiita_config.base_url}/study/'
+                                     f'description/{study_id}?prep_id='
+                                     f'{prep_ids[0]}')
+                    else:
+                        study_url = (f'{qiita_config.base_url}/study/'
+                                     f'description/{study_id}')
+                    # convert into a string for presentation.
+                    prep_ids = [str(x) for x in prep_ids]
+                    prep_ids = ', '.join(prep_ids)
+
+                    # there should be only one data type.
+                    data_types = set(data_types)
+                    if len(data_types) > 1:
+                        raise qdb.exceptions.QiitaError("More than one data "
+                                                        "type was found: "
+                                                        f"{data_types}")
+                    data_type = data_types.pop()
+
+                    message = f'{self.command.name}\n'
+                    message += f'Prep IDs: {prep_ids}\n'
+                    message += f'{study_url}\n'
+                    message += f'Data Type: {data_type}\n'
+                elif artifact.analysis:
+                    # this is an analysis job. display analysis id as link and
+                    # the command name.
+                    message = f'Analysis Job {self.command.name}\n'
+                    message += f'{qiita_config.base_url}/analysis/'
+                    message += f'description/{artifact.analysis.id}/\n'
+                else:
+                    raise qdb.exceptions.QiitaError("Unknown Condition")
+
+        # append legacy message line
+        message += 'New status: %s' % (value)
+
+        if value == 'error' and error_msg is not None:
+            message += f'\n\nError:\n{error_msg}'
+
+        return {'subject': subject, 'message': message}
+
+    def _set_status(self, value, error_msg=None):
+        """Sets the status of the job
+
+        Parameters
+        ----------
+        value : str, {'queued', 'running', 'success', 'error',
+                      'in_construction', 'waiting'}
+            The new status of the job
+        error_msg : str, optional
+            If not None this is the message that is going to be sent to the
+            user when the value is 'error'
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBStatusError
+            - If the current status of the job is 'success'
+            - If the current status of the job is 'running' and `value` is
+            'queued'
+        """
+        with qdb.sql_connection.TRN:
+            current_status = self.status
+            if current_status == 'success':
+                raise qdb.exceptions.QiitaDBStatusError(
+                    "Cannot change the status of a 'success' job")
+            elif current_status == 'running' and value == 'queued':
+                raise qdb.exceptions.QiitaDBStatusError(
+                    "Cannot revert the status of a 'running' job to 'queued'")
+
+            new_status = qdb.util.convert_to_id(
+                value, "processing_job_status")
+
+            msg = self._generate_notification_message(value, error_msg)
+            if msg is not None:
+                # send email
+                qdb.util.send_email(self.user.email, msg['subject'],
+                                    msg['message'])
+                # send email to our sys-admin if error from admin
+                if self.user.level in {'admin', 'wet-lab admin'}:
+                    if value == 'error':
+                        qdb.util.send_email(
+                            qiita_config.sysadmin_email, msg['subject'],
+                            msg['message'])
+
+            sql = """UPDATE qiita.processing_job
+                     SET processing_job_status_id = %s
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [new_status, self.id])
+            qdb.sql_connection.TRN.execute()
+
+    @property
+    def external_id(self):
+        """Retrieves the external id"""
+        with qdb.sql_connection.TRN:
+            sql = """SELECT external_job_id
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            result = qdb.sql_connection.TRN.execute_fetchlast()
+            if result is None:
+                result = 'Not Available'
+            return result
+
+    @external_id.setter
+    def external_id(self, value):
+        """Sets the external job id of the job
+
+        Parameters
+        ----------
+        value : str, {'queued', 'running', 'success', 'error',
+                      'in_construction', 'waiting'}
+            The job's new status
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBStatusError
+            - If the current status of the job is 'success'
+            - If the current status of the job is 'running' and `value` is
+            'queued'
+        """
+        sql = """UPDATE qiita.processing_job
+                 SET external_job_id = %s
+                 WHERE processing_job_id = %s"""
+        qdb.sql_connection.perform_as_transaction(sql, [value, self.id])
+
+    @property
+    def release_validator_job(self):
+        """Retrieves the release validator job
+
+        Returns
+        -------
+        qiita_db.processing_job.ProcessingJob or None
+            The release validator job of this job
+        """
+        rvalidator = None
+        with qdb.sql_connection.TRN:
+            sql = """SELECT processing_job_id
+                     FROM qiita.processing_job
+                     WHERE command_id in (
+                         SELECT command_id
+                         FROM qiita.software_command
+                         WHERE name = 'release_validators')
+                             AND command_parameters->>'job' = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            results = qdb.sql_connection.TRN.execute_fetchflatten()
+            if results:
+                rvalidator = ProcessingJob(results[0])
+
+        return rvalidator
+
+    def submit(self, parent_job_id=None, dependent_jobs_list=None):
+        """Submits the job to execution
+        This method has the ability to submit itself, as well as a list of
+        other ProcessingJob objects. If a list of ProcessingJob objects is
+        supplied, they will be submitted conditionally on the successful
+        execution of this object.
+
+        Users of this method don't need to set parent_job_id. It is used
+        internally by submit() for subsequent submit() calls for dependents.
+
+        Raises
+        ------
+        QiitaDBOperationNotPermittedError
+            If the job is not in 'waiting' or 'in_construction' status
+        """
+        with qdb.sql_connection.TRN:
+            status = self.status
+            if status not in {'in_construction', 'waiting'}:
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    "Can't submit job, not in 'in_construction' or "
+                    "'waiting' status. Current status: %s" % status)
+            self._set_status('queued')
+            # At this point we are going to involve other processes. We need
+            # to commit the changes to the DB or the other processes will not
+            # see these changes
+            qdb.sql_connection.TRN.commit()
+
+        job_dir = join(qdb.util.get_work_base_dir(), self.id)
+        command = self.command
+        software = command.software
+        cname = command.name
+        plugin_start_script = software.start_script
+        plugin_env_script = software.environment_script
+
+        # Appending the portal URL so the job requests the information from the
+        # portal server that submitted the job
+        url = "%s%s" % (qiita_config.base_url, qiita_config.portal_dir)
+
+        # if the word ENVIRONMENT is in the plugin_env_script we have a special
+        # case where we are going to execute some command and then wait for the
+        # plugin to return their own id (first implemented for
+        # fast-bowtie2+woltka)
+        #
+        # This is the hardcoded lines described in issue:
+        # https://github.com/qiita-spots/qiita/issues/3340
+        # the idea is that in the future we shouldn't check specific command
+        # names to know if it should be executed differently and the
+        # plugin should let Qiita know that a specific command should be ran
+        # as job array or not
+        cnames_to_skip = {'Calculate Cell Counts', 'Calculate RNA Copy Counts'}
+        if 'ENVIRONMENT' in plugin_env_script and cname not in cnames_to_skip:
+            # the job has to be in running state so the plugin can change its`
+            # status
+            with qdb.sql_connection.TRN:
+                self._set_status('running')
+                qdb.sql_connection.TRN.commit()
+
+            create_nested_path(job_dir)
+            cmd = (f'{plugin_env_script}; {plugin_start_script} '
+                   f'{url} {self.id} {job_dir}')
+            stdout, stderr, return_value = _system_call(cmd)
+            if return_value != 0 or stderr != '':
+                self._set_error(stderr)
+            job_id = stdout
+        # note that dependent jobs, such as m validator jobs marshalled into
+        # n 'queues' require the job_id returned by an external scheduler such
+        # as Torque's MOAB, rather than a job name that can be defined within
+        # Qiita. Hence, this method must be able to handle the case where a job
+        # requires metadata from a late-defined and time-sensitive source.
+        elif qiita_config.plugin_launcher in ProcessingJob._launch_map:
+            launcher = ProcessingJob._launch_map[qiita_config.plugin_launcher]
+            if launcher['execute_in_process']:
+                # run this launcher function within this process.
+                # usually this is done if the launcher spawns other processes
+                # before returning immediately, usually with a job ID that can
+                # be used to monitor the job's progress.
+
+                try:
+                    resource_params = self.resource_allocation_info
+                except qdb.exceptions.QiitaDBUnknownIDError as e:
+                    # this propagates the error to the job and using str(e)
+                    # should be fine as we just want the last calculation
+                    # error
+                    self._set_error(str(e))
+
+                # note that parent_job_id is being passed transparently from
+                # submit declaration to the launcher.
+                # TODO: In proc launches should throw exceptions, that are
+                # handled by this code. Out of proc launches will need to
+                # handle exceptions by catching them and returning an error
+                # code.
+                job_id = launcher['function'](plugin_env_script,
+                                              plugin_start_script,
+                                              url,
+                                              self.id,
+                                              job_dir,
+                                              parent_job_id, resource_params)
+
+                if dependent_jobs_list:
+                    # a dependent_jobs_list will always have at least one
+                    # job
+                    next_job = dependent_jobs_list.pop(0)
+
+                    if not dependent_jobs_list:
+                        # dependent_jobs_list is now empty
+                        dependent_jobs_list = None
+
+                    # The idea here is that a list of jobs is considered a
+                    # chain. Each job in the chain is submitted with the job
+                    # id of job submitted before it; a job will only run if
+                    # 'parent_job' ran successfully. Each iteration of submit()
+                    # launches a job, pulls the next job from the list, and
+                    # submits it. The remainder of the list is also passed to
+                    # continue the process.
+                    next_job.submit(parent_job_id=job_id,
+                                    dependent_jobs_list=dependent_jobs_list)
+
+            elif not launcher['execute_in_process']:
+                # run this launcher function as a new process.
+                # usually this is done if the launcher performs work that takes
+                # an especially long time, or waits for children who perform
+                # such work.
+                p = Process(target=launcher['function'],
+                            args=(plugin_env_script,
+                                  plugin_start_script,
+                                  url,
+                                  self.id,
+                                  job_dir))
+
+                p.start()
+
+                job_id = p.pid
+
+                if dependent_jobs_list:
+                    # for now, treat dependents as independent when
+                    # running locally. This means they will not be
+                    # organized into n 'queues' or 'chains', and
+                    # will all run simultaneously.
+                    for dependent in dependent_jobs_list:
+                        # register dependent job as queued to make qiita
+                        # aware of this child process
+                        dependent._set_status('queued')
+
+                        dep_software = dependent.command.software
+                        dep_job_dir = join(qdb.util.get_work_base_dir(),
+                                           dependent.id)
+                        p = Process(target=launcher['function'],
+                                    args=(dep_software.environment_script,
+                                          dep_software.start_script,
+                                          url,
+                                          dependent.id,
+                                          dep_job_dir))
+                        p.start()
+                        # assign the child process ID as external id to
+                        # the dependent
+                        dependent.external_id = p.pid
+            else:
+                error = ("execute_in_process must be defined",
+                         "as either true or false")
+                raise AssertionError(error)
+        else:
+            error = "plugin_launcher should be one of two values for now"
+            raise AssertionError(error)
+
+        # note that at this point, self.id is Qiita's UUID for a Qiita
+        # job. job_id at this point is an external ID (e.g. Torque Job
+        # ID). Record the mapping between job_id and self.id using
+        # external_id.
+        if job_id is not None:
+            self.external_id = job_id
+
+    def release(self):
+        """Releases the job from the waiting status and creates the artifact
+
+        Returns
+        -------
+        dict of {int: int}
+            The mapping between the job output and the artifact
+        """
+        with qdb.sql_connection.TRN:
+            if self.command.software.type != 'artifact definition':
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    "Only artifact definition jobs can be released")
+
+            # Retrieve the artifact information from the DB
+            sql = """SELECT artifact_info
+                     FROM qiita.processing_job_validator
+                     WHERE validator_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            a_info = qdb.sql_connection.TRN.execute_fetchlast()
+
+            provenance = loads(self.parameters.values['provenance'])
+            job = ProcessingJob(provenance['job'])
+            if 'data_type' in a_info:
+                # This job is resulting from a private job
+                parents = None
+                params = None
+                name = None
+                data_type = a_info['data_type']
+                pvals = job.parameters.values
+                if 'analysis' in pvals:
+                    cmd_out_id = None
+                    analysis = qdb.analysis.Analysis(
+                        job.parameters.values['analysis'])
+                else:
+                    cmd_out_id = provenance['cmd_out_id']
+                    analysis = None
+                a_info = a_info['artifact_data']
+            else:
+                # This job is resulting from a plugin job
+                parents = job.input_artifacts
+                params = job.parameters
+                cmd_out_id = provenance['cmd_out_id']
+                name = provenance['name']
+                analysis = None
+                data_type = None
+
+            # Create the artifact
+            atype = a_info['artifact_type']
+            filepaths = a_info['filepaths']
+            a = qdb.artifact.Artifact.create(
+                filepaths, atype, parents=parents,
+                processing_parameters=params,
+                analysis=analysis, data_type=data_type, name=name)
+
+            self._set_status('success')
+
+            mapping = {}
+            if cmd_out_id is not None:
+                mapping = {cmd_out_id: a.id}
+
+            return mapping
+
+    def release_validators(self):
+        """Allows all the validator job spawned by this job to complete"""
+        if self.command.software.type not in ('artifact transformation',
+                                              'private'):
+            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                "Only artifact transformation and private jobs can "
+                "release validators")
+
+        # Check if all the validators are completed. Validator jobs can be
+        # in two states when completed: 'waiting' in case of success
+        # or 'error' otherwise
+
+        validator_ids = ['%s [%s]' % (j.id, j.external_id)
+                         for j in self.validator_jobs
+                         if j.status not in ['waiting', 'error']]
+
+        # Active polling - wait until all validator jobs are completed
+        # TODO: As soon as we see one errored validator, we should kill
+        # the other jobs and exit early. Don't wait for all of the jobs
+        # to complete.
+        while validator_ids:
+            jids = ', '.join(validator_ids)
+            self.step = ("Validating outputs (%d remaining) via "
+                         "job(s) %s" % (len(validator_ids), jids))
+            sleep(10)
+            validator_ids = ['%s [%s]' % (j.id, j.external_id)
+                             for j in self.validator_jobs
+                             if j.status not in ['waiting', 'error']]
+
+        # Check if any of the validators errored
+        errored = [j for j in self.validator_jobs
+                   if j.status == 'error']
+        if errored:
+            # At least one of the validators failed, Set the rest of the
+            # validators and the current job as failed
+            waiting = [j.id for j in self.validator_jobs
+                       if j.status == 'waiting']
+
+            common_error = "\n".join(
+                ["Validator %s error message: %s" % (j.id, j.log.msg)
+                 for j in errored])
+
+            val_error = "%d sister validator jobs failed: %s" % (
+                len(errored), common_error)
+            for j in waiting:
+                ProcessingJob(j)._set_error(val_error)
+
+            self._set_error('%d validator jobs failed: %s'
+                            % (len(errored), common_error))
+        else:
+            mapping = {}
+            # Loop through all validator jobs and release them, allowing
+            # to create the artifacts. Note that if any artifact creation
+            # fails, the rollback operation will make sure that the
+            # previously created artifacts are not in there
+            for vjob in self.validator_jobs:
+                mapping.update(vjob.release())
+
+            if mapping:
+                sql = """INSERT INTO
+                            qiita.artifact_output_processing_job
+                            (artifact_id, processing_job_id,
+                            command_output_id)
+                         VALUES (%s, %s, %s)"""
+                sql_args = [[aid, self.id, outid]
+                            for outid, aid in mapping.items()]
+                with qdb.sql_connection.TRN:
+                    qdb.sql_connection.TRN.add(sql, sql_args, many=True)
+                self._update_and_launch_children(mapping)
+            self._set_status('success')
+
+    def _complete_artifact_definition(self, artifact_data):
+        """"Performs the needed steps to complete an artifact definition job
+
+        In order to complete an artifact definition job we need to create
+        the artifact, and then start all the jobs that were waiting for this
+        artifact to be created. Note that each artifact definition job creates
+        one and only one artifact.
+
+        Parameters
+        ----------
+        artifact_data : {'filepaths': list of (str, str), 'artifact_type': str}
+            Dict with the artifact information. `filepaths` contains the list
+            of filepaths and filepath types for the artifact and
+            `artifact_type` the type of the artifact
+
+        Notes
+        -----
+        The `provenance` in the job.parameters can contain a `direct_creation`
+        flag to avoid having to wait for the complete job to create a new
+        artifact, which is normally ran during regular processing. Skipping is
+        fine because we are adding an artifact to an existing job outside of
+        regular processing
+        """
+        with qdb.sql_connection.TRN:
+            atype = artifact_data['artifact_type']
+            filepaths = artifact_data['filepaths']
+            # We need to differentiate if this artifact is the
+            # result of a previous job or uploading
+            job_params = self.parameters.values
+            if job_params['provenance'] is not None:
+                # The artifact is a result from a previous job
+                provenance = loads(job_params['provenance'])
+                if provenance.get('direct_creation', False):
+                    original_job = ProcessingJob(provenance['job'])
+                    artifact = qdb.artifact.Artifact.create(
+                        filepaths, atype,
+                        parents=original_job.input_artifacts,
+                        processing_parameters=original_job.parameters,
+                        analysis=job_params['analysis'],
+                        name=job_params['name'])
+
+                    sql = """
+                        INSERT INTO qiita.artifact_output_processing_job
+                            (artifact_id, processing_job_id,
+                             command_output_id)
+                         VALUES (%s, %s, %s)"""
+                    qdb.sql_connection.TRN.add(
+                        sql, [artifact.id, original_job.id,
+                              provenance['cmd_out_id']])
+                    qdb.sql_connection.TRN.execute()
+
+                    self._set_status('success')
+                else:
+                    if provenance.get('data_type') is not None:
+                        artifact_data = {'data_type': provenance['data_type'],
+                                         'artifact_data': artifact_data}
+
+                    sql = """UPDATE qiita.processing_job_validator
+                             SET artifact_info = %s
+                             WHERE validator_id = %s"""
+                    qdb.sql_connection.TRN.add(
+                        sql, [dumps(artifact_data), self.id])
+                    qdb.sql_connection.TRN.execute()
+
+                    # Can't create the artifact until all validators
+                    # are completed
+                    self._set_status('waiting')
+            else:
+                # The artifact is uploaded by the user or is the initial
+                # artifact of an analysis
+                if ('analysis' in job_params and
+                        job_params['analysis'] is not None):
+                    pt = None
+                    an = qdb.analysis.Analysis(job_params['analysis'])
+                    sql = """SELECT data_type
+                             FROM qiita.analysis_processing_job
+                             WHERE analysis_id = %s
+                                AND processing_job_id = %s"""
+                    qdb.sql_connection.TRN.add(sql, [an.id, self.id])
+                    data_type = qdb.sql_connection.TRN.execute_fetchlast()
+                elif job_params['template'] is not None:
+                    pt = qdb.metadata_template.prep_template.PrepTemplate(
+                        job_params['template'])
+                    an = None
+                    data_type = None
+                else:
+                    pt = None
+                    an = None
+                    data_type = 'Job Output Folder'
+
+                artifact = qdb.artifact.Artifact.create(
+                    filepaths, atype, prep_template=pt, analysis=an,
+                    data_type=data_type, name=job_params['name'])
+                self._set_status('success')
+
+            # we need to update the children jobs to replace the input
+            # for the newly created artifact via the validator
+            for c in self.children:
+                self._helper_update_children({atype: artifact.id})
+                c.submit()
+
+    def _complete_artifact_transformation(self, artifacts_data):
+        """Performs the needed steps to complete an artifact transformation job
+
+        In order to complete an artifact transformation job, we need to create
+        a validate job for each artifact output and submit it.
+
+        Parameters
+        ----------
+        artifacts_data : dict of dicts
+            The generated artifact information keyed by output name.
+            The format of each of the internal dictionaries must be
+            {'filepaths': list of (str, str), 'artifact_type': str}
+            where `filepaths` contains the list of filepaths and filepath types
+            for the artifact and `artifact_type` the type of the artifact
+
+        Raises
+        ------
+        QiitaDBError
+            If there is more than one prep information attached to the new
+            artifact
+        """
+        validator_jobs = []
+        with qdb.sql_connection.TRN:
+            cmd_id = self.command.id
+            for out_name, a_data in artifacts_data.items():
+                # Correct the format of the filepaths parameter so we can
+                # create a validate job
+                filepaths = defaultdict(list)
+                for fp, fptype in a_data['filepaths']:
+                    filepaths[fptype].append(fp)
+                atype = a_data['artifact_type']
+
+                # The validate job needs a prep information file. In theory,
+                # a job can be generated from more that one prep information
+                # file, so we check here if we have one or more templates. At
+                # this moment, If we allow more than one template, there is a
+                # fair amount of changes that need to be done on the plugins,
+                # so we are going to restrict the number of templates to one.
+                # Note that at this moment there is no way of generating an
+                # artifact from 2 or more artifacts, so we can impose this
+                # limitation now and relax it later.
+                templates = set()
+                for artifact in self.input_artifacts:
+                    templates.update(pt.id for pt in artifact.prep_templates)
+                template = None
+                analysis = None
+                if len(templates) > 1:
+                    raise qdb.exceptions.QiitaDBError(
+                        "Currently only single prep template "
+                        "is allowed, found %d" % len(templates))
+                elif len(templates) == 1:
+                    template = templates.pop()
+                elif self.input_artifacts:
+                    # In this case we have 0 templates. What this means is that
+                    # this artifact is being generated in the analysis pipeline
+                    # All the artifacts included in the analysis pipeline
+                    # belong to the same analysis, so we can just ask the
+                    # first artifact for the analysis that it belongs to
+                    analysis = self.input_artifacts[0].analysis.id
+
+                # Once the validate job completes, it needs to know if it has
+                # been generated from a command (and how) or if it has been
+                # uploaded. In order to differentiate these cases, we populate
+                # the provenance parameter with some information about the
+                # current job and how this artifact has been generated. This
+                # does not affect the plugins since they can ignore this
+                # parameter
+                sql = """SELECT command_output_id
+                         FROM qiita.command_output
+                         WHERE name = %s AND command_id = %s"""
+                qdb.sql_connection.TRN.add(sql, [out_name, cmd_id])
+                cmd_out_id = qdb.sql_connection.TRN.execute_fetchlast()
+                naming_params = self.command.naming_order
+                if naming_params:
+                    params = self.parameters.values
+                    art_name = "%s %s" % (
+                        out_name, ' '.join([str(params[p]).split('/')[-1]
+                                            for p in naming_params]))
+                else:
+                    art_name = out_name
+
+                provenance = {'job': self.id,
+                              'cmd_out_id': cmd_out_id,
+                              'name': art_name}
+
+                if self.command.software.type == 'private':
+                    provenance['data_type'] = 'Job Output Folder'
+
+                # Get the validator command for the current artifact type and
+                # create a new job
+                # see also release_validators()
+                cmd = qdb.software.Command.get_validator(atype)
+                values_dict = {
+                    'files': dumps(filepaths), 'artifact_type': atype,
+                    'template': template, 'provenance': dumps(provenance),
+                    'analysis': None}
+                if analysis is not None:
+                    values_dict['analysis'] = analysis
+                validate_params = qdb.software.Parameters.load(
+                    cmd, values_dict=values_dict)
+
+                validator_jobs.append(
+                    ProcessingJob.create(self.user, validate_params, True))
+
+            # Change the current step of the job
+            self.step = "Validating outputs (%d remaining) via job(s) %s" % (
+                len(validator_jobs), ', '.join(['%s [%s]' % (
+                    j.id, j.external_id) for j in validator_jobs]))
+
+            # Link all the validator jobs with the current job
+            self._set_validator_jobs(validator_jobs)
+
+            # Submit m validator jobs as n lists of jobs
+            n = qiita_config.job_scheduler_dependency_q_cnt
+            if n is None:
+                n = 2
+
+            # taken from:
+            # https://www.geeksforgeeks.org/break-list-chunks-size-n-python/
+            lists = [validator_jobs[i * n:(i + 1) * n]
+                     for i in range((len(validator_jobs) + n - 1) // n)]
+
+            for sub_list in lists:
+                # each sub_list will always have at least a lead_job
+                lead_job = sub_list.pop(0)
+                if not sub_list:
+                    # sub_list is now empty
+                    sub_list = None
+                lead_job.submit(dependent_jobs_list=sub_list)
+
+            # Submit the job that will release all the validators
+            plugin = qdb.software.Software.from_name_and_version(
+                'Qiita', 'alpha')
+            cmd = plugin.get_command('release_validators')
+            params = qdb.software.Parameters.load(
+                cmd, values_dict={'job': self.id})
+            job = ProcessingJob.create(self.user, params)
+
+        # Doing the submission outside of the transaction
+        job.submit()
+
+    def _set_validator_jobs(self, validator_jobs):
+        """Sets the validator jobs for the current job
+
+        Parameters
+        ----------
+        validator_jobs : list of ProcessingJob
+            The validator_jobs for the current job
+        """
+        with qdb.sql_connection.TRN:
+            sql = """INSERT INTO qiita.processing_job_validator
+                        (processing_job_id, validator_id)
+                     VALUES (%s, %s)"""
+            sql_args = [[self.id, j.id] for j in validator_jobs]
+            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
+            qdb.sql_connection.TRN.execute()
+
+    def complete(self, success, artifacts_data=None, error=None):
+        """Completes the job, either with a success or error status
+
+        Parameters
+        ----------
+        success : bool
+            Whether the job has completed successfully or not
+        artifacts_data : dict of dicts, optional
+            The generated artifact information keyed by output name.
+            The format of each of the internal dictionaries must be
+            {'filepaths': list of (str, str), 'artifact_type': str}
+            where `filepaths` contains the list of filepaths and filepath types
+            for the artifact and `artifact_type` the type of the artifact
+        error : str, optional
+            If the job was not successful, the error message
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the job is not in running state
+        """
+        with qdb.sql_connection.TRN:
+            if success:
+                if self.status != 'running':
+                    # If the job is not running, we only allow to complete it
+                    # if it did not succeed
+                    raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                        "Can't complete job: not in a running state")
+                if artifacts_data:
+                    if self.command.software.type == 'artifact definition':
+                        # There is only one artifact created
+                        _, a_data = artifacts_data.popitem()
+                        self._complete_artifact_definition(a_data)
+                    else:
+                        self._complete_artifact_transformation(artifacts_data)
+                else:
+                    self._set_status('success')
+            else:
+                self._set_error(error)
+
+    @property
+    def log(self):
+        """The log entry attached to the job if it failed
+
+        Returns
+        -------
+        qiita_db.logger.LogEntry or None
+            If the status of the job is `error`, returns the LogEntry attached
+            to the job
+        """
+        with qdb.sql_connection.TRN:
+            res = None
+            if self.status == 'error':
+                sql = """SELECT logging_id
+                         FROM qiita.processing_job
+                         WHERE processing_job_id = %s"""
+                qdb.sql_connection.TRN.add(sql, [self.id])
+                log_id = qdb.sql_connection.TRN.execute_fetchlast()
+                res = qdb.logger.LogEntry(log_id)
+        return res
+
+    def _set_error(self, error):
+        """Attaches a log entry to the job
+
+        Parameters
+        ----------
+        error : str
+            The error message
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the status of the job is 'success'
+        """
+        with qdb.sql_connection.TRN:
+            if self.status == 'success':
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    "Can only set up the log for jobs whose status is 'error'")
+
+            log = qdb.logger.LogEntry.create('Runtime', error)
+
+            sql = """UPDATE qiita.processing_job
+                     SET logging_id = %s
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [log.id, self.id])
+            qdb.sql_connection.TRN.execute()
+
+            # All the children should be marked as failure
+            for c in self.children:
+                c.complete(False, error="Parent job '%s' failed." % self.id)
+
+            # set as error after everything is in place
+            self._set_status('error', error_msg=error)
+
+    @property
+    def heartbeat(self):
+        """The timestamp of the last heartbeat received from the job
+
+        Returns
+        -------
+        datetime
+            The last heartbeat timestamp
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT heartbeat
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return qdb.sql_connection.TRN.execute_fetchlast()
+
+    def update_heartbeat_state(self):
+        """Updates the heartbeat of the job
+
+        In case that the job is in `queued` status, it changes the status to
+        `running`.
+
+        Raises
+        ------
+        QiitaDBOperationNotPermittedError
+            If the job is already completed
+        """
+        with qdb.sql_connection.TRN:
+            status = self.status
+            if status == 'queued':
+                self._set_status('running')
+            elif status != 'running':
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    "Can't execute heartbeat on job: already completed")
+            sql = """UPDATE qiita.processing_job
+                     SET heartbeat = %s
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [datetime.now(), self.id])
+            qdb.sql_connection.TRN.execute()
+
+    @property
+    def step(self):
+        """Returns the current step of the job
+
+        Returns
+        -------
+        str
+            The current step of the job
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT step
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return qdb.sql_connection.TRN.execute_fetchlast()
+
+    @step.setter
+    def step(self, value):
+        """Sets the current step of the job
+
+        Parameters
+        ----------
+        value : str
+            The new current step of the job
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the status of the job is not 'running'
+        """
+        if self.status != 'running':
+            raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                "Cannot change the step of a job whose status is not "
+                "'running'")
+        sql = """UPDATE qiita.processing_job
+                 SET step = %s
+                 WHERE processing_job_id = %s"""
+        qdb.sql_connection.perform_as_transaction(sql, [value, self.id])
+
+    @property
+    def children(self):
+        """The children jobs
+
+        Returns
+        -------
+        generator of qiita_db.processing_job.ProcessingJob
+            The children jobs
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT child_id
+                     FROM qiita.parent_processing_job
+                     WHERE parent_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            for jid in qdb.sql_connection.TRN.execute_fetchflatten():
+                yield ProcessingJob(jid)
+
+    @property
+    def validator_jobs(self):
+        """The validators of this job
+
+        Returns
+        -------
+        generator of qiita_db.processing_job.ProcessingJob
+            The validators of this job
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT validator_id
+                     FROM qiita.processing_job_validator pjv
+                     JOIN qiita.processing_job pj
+                         ON pjv.validator_id = pj.processing_job_id
+                     JOIN qiita.processing_job_status USING (
+                        processing_job_status_id)
+                     WHERE pjv.processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            for jid in qdb.sql_connection.TRN.execute_fetchflatten():
+                yield ProcessingJob(jid)
+
+    def _helper_update_children(self, new_map):
+        ready = []
+        sql = """SELECT command_parameters, pending
+                 FROM qiita.processing_job
+                 WHERE processing_job_id = %s"""
+        sql_update = """UPDATE qiita.processing_job
+                        SET command_parameters = %s,
+                            pending = %s
+                        WHERE processing_job_id = %s"""
+        sql_link = """INSERT INTO qiita.artifact_processing_job
+                        (artifact_id, processing_job_id)
+                      VALUES (%s, %s)"""
+
+        for c in self.children:
+            qdb.sql_connection.TRN.add(sql, [c.id])
+            params, pending = qdb.sql_connection.TRN.execute_fetchflatten()
+            for pname, out_name in pending[self.id].items():
+                a_id = new_map[out_name]
+                params[pname] = str(a_id)
+                del pending[self.id]
+                # Link the input artifact with the child job
+                qdb.sql_connection.TRN.add(sql_link, [a_id, c.id])
+
+            # Force to insert a NULL in the DB if pending is empty
+            pending = pending if pending else None
+            qdb.sql_connection.TRN.add(sql_update,
+                                       [dumps(params), pending, c.id])
+            qdb.sql_connection.TRN.execute()
+
+            if pending is None:
+                # The child already has all the parameters
+                # Add it to the ready list
+                ready.append(c)
+
+        return ready
+
+    def _update_children(self, mapping):
+        """Updates the children of the current job to populate the input params
+
+        Parameters
+        ----------
+        mapping : dict of {int: int}
+            The mapping between output parameter and artifact
+
+        Returns
+        -------
+        list of qiita_db.processing_job.ProcessingJob
+            The list of childrens that are ready to be submitted
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT command_output_id, name
+                     FROM qiita.command_output
+                     WHERE command_output_id IN %s"""
+            sql_args = [tuple(mapping.keys())]
+            qdb.sql_connection.TRN.add(sql, sql_args)
+            res = qdb.sql_connection.TRN.execute_fetchindex()
+            new_map = {name: mapping[oid] for oid, name in res}
+
+        return self._helper_update_children(new_map)
+
+    def _update_and_launch_children(self, mapping):
+        """Updates the children of the current job to populate the input params
+
+        Parameters
+        ----------
+        mapping : dict of {int: int}
+            The mapping between output parameter and artifact
+        """
+        ready = self._update_children(mapping)
+        # Submit all the children that already have all the input parameters
+        for c in ready:
+            if c.status in {'in_construction', 'waiting'}:
+                c.submit()
+                # some jobs create several children jobs/validators and this
+                # can clog the submission process; giving it a second to
+                # avoid this
+                sleep(1)
+
+    @property
+    def outputs(self):
+        """The outputs of the job
+
+        Returns
+        -------
+        dict of {str: qiita_db.artifact.Artifact}
+            The outputs of the job keyed by output name
+        """
+        with qdb.sql_connection.TRN:
+            if self.status != 'success':
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    "Can't return the outputs of a non-success job")
+
+            sql = """SELECT artifact_id, name
+                     FROM qiita.artifact_output_processing_job
+                        JOIN qiita.command_output USING (command_output_id)
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return {
+                name: qdb.artifact.Artifact(aid)
+                for aid, name in qdb.sql_connection.TRN.execute_fetchindex()}
+
+    @property
+    def processing_job_workflow(self):
+        """The processing job workflow
+
+        Returns
+        -------
+        ProcessingWorkflow
+            The processing job workflow the job
+        """
+        with qdb.sql_connection.TRN:
+            # Retrieve the workflow root jobs
+            sql = """SELECT get_processing_workflow_roots
+                     FROM qiita.get_processing_workflow_roots(%s)"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            res = qdb.sql_connection.TRN.execute_fetchindex()
+            if res:
+                sql = """SELECT processing_job_workflow_id
+                         FROM qiita.processing_job_workflow_root
+                         WHERE processing_job_id = %s"""
+                qdb.sql_connection.TRN.add(sql, [res[0][0]])
+                r = qdb.sql_connection.TRN.execute_fetchindex()
+                return (qdb.processing_job.ProcessingWorkflow(r[0][0]) if r
+                        else None)
+            else:
+                return None
+
+    @property
+    def pending(self):
+        """A dictionary with the information about the predecessor jobs
+
+        Returns
+        -------
+        dict
+            A dict with {job_id: {parameter_name: output_name}}"""
+        with qdb.sql_connection.TRN:
+            sql = """SELECT pending
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            res = qdb.sql_connection.TRN.execute_fetchlast()
+            return res if res is not None else {}
+
+    @property
+    def hidden(self):
+        """Whether the job is hidden or not
+
+        Returns
+        -------
+        bool
+            Whether the jobs is hidden or not
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT hidden
+                     FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return qdb.sql_connection.TRN.execute_fetchlast()
+
+    def hide(self):
+        """Hides the job from the user
+
+        Raises
+        ------
+        QiitaDBOperationNotPermittedError
+            If the job is not in the error status
+        """
+        with qdb.sql_connection.TRN:
+            status = self.status
+            if status != 'error':
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    'Only jobs in error status can be hidden. Current status: '
+                    '%s' % status)
+            sql = """UPDATE qiita.processing_job
+                     SET hidden = %s
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [True, self.id])
+            qdb.sql_connection.TRN.execute()
+
+    @property
+    def shape(self):
+        """Number of samples, metadata columns and input size of this job
+
+        Returns
+        -------
+        int, int, int
+            Number of samples, metadata columns and input size. None means it
+            couldn't be calculated
+        """
+        samples = None
+        columns = None
+        prep_info = None
+        study_id = None
+        analysis_id = None
+        artifact = None
+        input_size = None
+
+        parameters = self.parameters.values
+        QUIDError = qdb.exceptions.QiitaDBUnknownIDError
+
+        if self.command.name == 'Validate':
+            # Validate only has two options to calculate it's size: template (a
+            # job that has a preparation linked) or analysis (is from an
+            # analysis). However, 'template' can be present and be None
+            if 'template' in parameters and parameters['template'] is not None:
+                try:
+                    PT = qdb.metadata_template.prep_template.PrepTemplate
+                    prep_info = PT(parameters['template'])
+                except QUIDError:
+                    pass
+                else:
+                    study_id = prep_info.study_id
+            elif 'analysis' in parameters:
+                analysis_id = parameters['analysis']
+        elif self.command.name == 'build_analysis_files':
+            # build analysis is a special case because the analysis doesn't
+            # exist yet
+            sanalysis = qdb.analysis.Analysis(parameters['analysis']).samples
+            samples = sum([len(sams) for sams in sanalysis.values()])
+            # only count the biom files
+            input_size = sum([fp['fp_size'] for aid in sanalysis
+                              for fp in qdb.artifact.Artifact(aid).filepaths
+                              if fp['fp_type'] == 'biom'])
+            columns = self.parameters.values['categories']
+            if columns is not None:
+                columns = len(columns)
+        elif self.command.software.name == 'Qiita':
+            if self.command.name == 'delete_sample_or_column':
+                MT = qdb.metadata_template
+                _id = parameters['obj_id']
+                try:
+                    if parameters['obj_class'] == 'SampleTemplate':
+                        obj = MT.sample_template.SampleTemplate(_id)
+                    else:
+                        obj = MT.prep_template.PrepTemplate(_id)
+                    samples = len(obj)
+                except QUIDError:
+                    pass
+            else:
+                if 'study' in parameters:
+                    study_id = parameters['study']
+                elif 'study_id' in parameters:
+                    study_id = parameters['study_id']
+                elif 'analysis' in parameters:
+                    analysis_id = parameters['analysis']
+                elif 'analysis_id' in parameters:
+                    analysis_id = parameters['analysis_id']
+                elif 'artifact' in parameters:
+                    try:
+                        artifact = qdb.artifact.Artifact(
+                            parameters['artifact'])
+                    except QUIDError:
+                        pass
+        elif self.command.name == 'Sequence Processing Pipeline':
+            body = self.parameters.values['sample_sheet']['body']
+            samples = body.count('\r')
+            stemp = body.count('\n')
+            if stemp > samples:
+                samples = stemp
+        elif self.input_artifacts:
+            artifact = self.input_artifacts[0]
+            if artifact.artifact_type == 'BIOM':
+                input_size = sum([fp['fp_size'] for a in self.input_artifacts
+                                  for fp in a.filepaths
+                                  if fp['fp_type'] == 'biom'])
+            else:
+                input_size = sum([fp['fp_size'] for a in self.input_artifacts
+                                  for fp in a.filepaths])
+
+        # if there is an artifact, then we need to get the study_id/analysis_id
+        if artifact is not None:
+            if artifact.study is not None:
+                # only count samples in the prep template
+                prep_info = artifact.prep_templates[0]
+                study_id = prep_info.study_id
+            elif artifact.analysis is not None:
+                analysis_id = artifact.analysis.id
+
+        # now retrieve the sample/columns based on study_id/analysis_id
+        if study_id is not None:
+            try:
+                st = qdb.study.Study(study_id).sample_template
+            except QUIDError:
+                pass
+            else:
+                if prep_info is not None:
+                    samples = len(prep_info)
+                    columns = len(prep_info.categories) + len(st.categories)
+                elif st is not None:
+                    samples = len(st)
+                    columns = len(st.categories)
+        elif analysis_id is not None:
+            try:
+                analysis = qdb.analysis.Analysis(analysis_id)
+            except qdb.exceptions.QiitaDBUnknownIDError:
+                pass
+            else:
+                mfp = qdb.util.get_filepath_information(
+                    analysis.mapping_file)['fullpath']
+                samples, columns = pd.read_csv(
+                    mfp, sep='\t', dtype=str).shape
+                input_size = sum([fp['fp_size'] for aid in analysis.samples for
+                                  fp in qdb.artifact.Artifact(aid).filepaths])
+
+        return samples, columns, input_size
+
+    @property
+    def complete_processing_job(self):
+        sql = """SELECT processing_job_id FROM qiita.software_command
+                    JOIN qiita.processing_job USING (command_id)
+                    WHERE name = 'complete_job' AND
+                        command_parameters->>'job_id' = %s LIMIT 1"""
+        with qdb.sql_connection.TRN:
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            result = qdb.sql_connection.TRN.execute_fetchflatten()
+
+        if result:
+            return qdb.processing_job.ProcessingJob(result[0])
+        return None
+
+    @property
+    def trace(self):
+        """ Returns as a text array the full trace of the job, from itself
+            to validators and complete jobs"""
+        lines = [f'{self.id} [{self.external_id}] ({self.status}): '
+                 f'{self.command.name} | {self.resource_allocation_info}']
+        cjob = self.complete_processing_job
+        if cjob is not None:
+            lines.append(f'  {cjob.id} [{cjob.external_id}] ({cjob.status})| '
+                         f'{cjob.resource_allocation_info}')
+            vjob = self.release_validator_job
+            if vjob is not None:
+                lines.append(f'    {vjob.id} [{vjob.external_id}] '
+                             f' ({vjob.status}) | '
+                             f'{vjob.resource_allocation_info}')
+        for v in self.validator_jobs:
+            lines.append(f'     {v.id} [{v.external_id}] ({v.status}): '
+                         f'{v.command.name} | {v.resource_allocation_info}')
+            cjob = v.complete_processing_job
+            if cjob is not None:
+                lines.append(f'         {cjob.id} [{cjob.external_id}] '
+                             f'({cjob.status}) | '
+                             f'{cjob.resource_allocation_info}')
+        return lines
+
+
+class ProcessingWorkflow(qdb.base.QiitaObject):
+    """Models a workflow defined by the user
+
+    Parameters
+    ----------
+    user : qiita_db.user.User
+        The user that modeled the workflow
+    root : list of qiita_db.processing_job.ProcessingJob
+        The first job in the workflow
+    """
+    _table = "processing_job_workflow"
+
+    @classmethod
+    def _common_creation_steps(cls, user, root_jobs, name=None):
+        """Executes the common creation steps
+
+        Parameters
+        ----------
+        user : qiita_db.user.User
+            The user creating the workflow
+        root_jobs : list of qiita_db.processing_job.ProcessingJob
+            The root jobs of the workflow
+        name : str, optional
+            The name of the workflow. Default: generated from user's name
+        """
+        with qdb.sql_connection.TRN:
+            # Insert the workflow in the processing_job_workflow table
+            name = name if name else "%s's workflow" % user.info['name']
+            sql = """INSERT INTO qiita.processing_job_workflow (email, name)
+                     VALUES (%s, %s)
+                     RETURNING processing_job_workflow_id"""
+            qdb.sql_connection.TRN.add(sql, [user.email, name])
+            w_id = qdb.sql_connection.TRN.execute_fetchlast()
+            # Connect the workflow with it's initial set of jobs
+            sql = """INSERT INTO qiita.processing_job_workflow_root
+                        (processing_job_workflow_id, processing_job_id)
+                     VALUES (%s, %s)"""
+            sql_args = [[w_id, j.id] for j in root_jobs]
+            qdb.sql_connection.TRN.add(sql, sql_args, many=True)
+            qdb.sql_connection.TRN.execute()
+
+        return cls(w_id)
+
+    @classmethod
+    def from_default_workflow(cls, user, dflt_wf, req_params, name=None,
+                              force=False):
+        """Creates a new processing workflow from a default workflow
+
+        Parameters
+        ----------
+        user : qiita_db.user.User
+            The user creating the workflow
+        dflt_wf : qiita_db.software.DefaultWorkflow
+            The default workflow
+        req_params : dict of {qdb.software.Command: dict of {str: object}}
+            The required parameters values for the source commands in the
+            workflow, keyed by command. The inner dicts are keyed by
+            parameter name.
+        name : str, optional
+            Name of the workflow. Default: generated from user's name
+        force : bool
+            Force creation on duplicated parameters
+
+        Returns
+        -------
+        qiita_db.processing_job.ProcessingWorkflow
+            The newly created workflow
+        """
+        with qdb.sql_connection.TRN:
+            dflt_g = dflt_wf.graph
+
+            # Find the roots of the workflow. That is, the nodes that do not
+            # have a parent in the graph (in_degree = 0)
+            in_degrees = dflt_g.in_degree()
+
+            # We can potentially access this information from the nodes
+            # multiple times, so caching in here
+            # [0] in_degrees returns a tuple, where [0] is the element we want
+            all_nodes = {}
+            roots = {}
+
+            for node, position in in_degrees:
+                dp = node.default_parameter
+                cmd = dp.command
+                if position == 0:
+                    roots[node] = (cmd, dp)
+                all_nodes[node] = (cmd, dp)
+
+            # Check that we have all the required parameters
+            root_cmds = set(c for c, _ in roots.values())
+            if root_cmds != set(req_params):
+                error_msg = ['Provided required parameters do not match the '
+                             'initial set of commands for the workflow.']
+                missing = [c.name for c in root_cmds - set(req_params)]
+                if missing:
+                    error_msg.append(
+                        ' Command(s) "%s" are missing the required parameter '
+                        'set.' % ', '.join(missing))
+                extra = [c.name for c in set(req_params) - root_cmds]
+                if extra:
+                    error_msg.append(
+                        ' Paramters for command(s) "%s" have been provided, '
+                        'but they are not the initial commands for the '
+                        'workflow.' % ', '.join(extra))
+                raise qdb.exceptions.QiitaDBError(''.join(error_msg))
+
+            # Start creating the root jobs
+            node_to_job = {
+                n: ProcessingJob.create(
+                    user,
+                    qdb.software.Parameters.from_default_params(
+                        p, req_params[c]), force)
+                for n, (c, p) in roots.items()}
+            root_jobs = node_to_job.values()
+
+            # SQL used to create the edges between jobs
+            sql = """INSERT INTO qiita.parent_processing_job
+                        (parent_id, child_id)
+                     VALUES (%s, %s)"""
+
+            # Create the rest of the jobs. These are different form the root
+            # jobs because they depend on other jobs to complete in order to be
+            # submitted
+            for n in nx.topological_sort(dflt_g):
+                if n in node_to_job:
+                    # We have already visited this node
+                    # (because it is a root node)
+                    continue
+
+                cmd, dflt_params = all_nodes[n]
+                job_req_params = {}
+                parent_ids = []
+
+                # Each incoming edge represents an artifact that is generated
+                # by the source job of the edge
+                for source, dest, data in dflt_g.in_edges(n, data=True):
+                    # Retrieve the id of the parent job - it already exists
+                    # because we are visiting the nodes in topological order
+                    source_id = node_to_job[source].id
+                    parent_ids.append(source_id)
+                    # Get the connections between the job and the source
+                    connections = data['connections'].connections
+                    for out, in_param, _ in connections:
+                        # We take advantage of the fact the parameters are
+                        # stored in JSON to encode the name of the output
+                        # artifact from the previous job
+                        job_req_params[in_param] = [source_id, out]
+
+                # At this point we should have all the requried parameters for
+                # the current job, so create it
+                new_job = ProcessingJob.create(
+                    user, qdb.software.Parameters.from_default_params(
+                        dflt_params, job_req_params), force)
+                node_to_job[n] = new_job
+
+                # Create the parent-child links in the DB
+                sql_args = [[pid, new_job.id] for pid in parent_ids]
+                qdb.sql_connection.TRN.add(sql, sql_args, many=True)
+
+            return cls._common_creation_steps(user, root_jobs, name)
+
+    @classmethod
+    def from_scratch(cls, user, parameters, name=None, force=False):
+        """Creates a new processing workflow from scratch
+
+        Parameters
+        ----------
+        user : qiita_db.user.User
+            The user creating the workflow
+        parameters : qiita_db.software.Parameters
+            The parameters of the first job in the workflow
+        name : str, optional
+            Name of the workflow. Default: generated from user's name
+        force : bool
+            Force creation on duplicated parameters
+
+        Returns
+        -------
+        qiita_db.processing_job.ProcessingWorkflow
+            The newly created workflow
+        """
+        job = ProcessingJob.create(user, parameters, force)
+        return cls._common_creation_steps(user, [job], name)
+
+    @property
+    def name(self):
+        """"The name of the workflow
+
+        Returns
+        -------
+        str
+            The name of the workflow
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT name
+                     FROM qiita.processing_job_workflow
+                     WHERE processing_job_workflow_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            return qdb.sql_connection.TRN.execute_fetchlast()
+
+    @property
+    def user(self):
+        """The user that created the workflow
+
+        Returns
+        -------
+        qdb.user.User
+            The user that created the workflow
+        """
+        with qdb.sql_connection.TRN:
+            sql = """SELECT email
+                     FROM qiita.processing_job_workflow
+                     WHERE processing_job_workflow_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            email = qdb.sql_connection.TRN.execute_fetchlast()
+            return qdb.user.User(email)
+
+    @property
+    def graph(self):
+        """Returns the graph of jobs that represent the workflow
+
+        Returns
+        -------
+        networkx.DiGraph
+            The graph representing the workflow
+        """
+        g = nx.DiGraph()
+        with qdb.sql_connection.TRN:
+            # Retrieve all graph workflow nodes
+            sql = """SELECT parent_id, child_id
+                     FROM qiita.get_processing_workflow_edges(%s)"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            edges = qdb.sql_connection.TRN.execute_fetchindex()
+            nodes = {}
+            if edges:
+                nodes = {jid: ProcessingJob(jid)
+                         for jid in set(chain.from_iterable(edges))}
+                edges = [(nodes[s], nodes[d]) for s, d in edges]
+                g.add_edges_from(edges)
+            # It is possible that there are root jobs that doesn't have any
+            # child, so they do not appear on edge list
+            sql = """SELECT processing_job_id
+                     FROM qiita.processing_job_workflow_root
+                     WHERE processing_job_workflow_id = %s"""
+            sql_args = [self.id]
+            if nodes:
+                sql += " AND processing_job_id NOT IN %s"
+                sql_args.append(tuple(nodes))
+            qdb.sql_connection.TRN.add(sql, sql_args)
+            nodes = [
+                ProcessingJob(jid)
+                for jid in qdb.sql_connection.TRN.execute_fetchflatten()]
+            g.add_nodes_from(nodes)
+
+        return g
+
+    def _raise_if_not_in_construction(self):
+        """Raises an error if the workflow is not in construction
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the workflow is not in construction
+        """
+        with qdb.sql_connection.TRN:
+            # To know if the workflow is in construction or not it suffices
+            # to look at the status of the root jobs
+            sql = """SELECT DISTINCT processing_job_status
+                     FROM qiita.processing_job_workflow_root
+                        JOIN qiita.processing_job USING (processing_job_id)
+                        JOIN qiita.processing_job_status
+                            USING (processing_job_status_id)
+                     WHERE processing_job_workflow_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [self.id])
+            res = qdb.sql_connection.TRN.execute_fetchflatten()
+            # If the above SQL query returns a single element and the value
+            # is different from in construction, it means that all the jobs
+            # in the workflow are in the same status and it is not
+            # 'in_construction', hence raise the error. If the above SQL query
+            # returns more than value (len(res) > 1) it means that the workflow
+            # is no longer in construction cause some jobs have been submited
+            # for processing. Note that if the above query doesn't retrun any
+            # value, it means that no jobs are in the workflow and that means
+            # that the workflow is in construction.
+            if (len(res) == 1 and res[0] != 'in_construction') or len(res) > 1:
+                # The workflow is no longer in construction, raise an error
+                raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                    "Workflow not in construction")
+
+    def add(self, dflt_params, connections=None, req_params=None,
+            opt_params=None, force=False):
+        """Adds a new job to the workflow
+
+        Parameters
+        ----------
+        dflt_params : qiita_db.software.DefaultParameters
+            The DefaultParameters object used
+        connections : dict of {qiita_db.processing_job.ProcessingJob:
+                               {str: str}}, optional
+            Dictionary keyed by the jobs in which the new job depends on,
+            and values is a dict mapping between source outputs and new job
+            inputs
+        req_params : dict of {str: object}, optional
+            Any extra required parameter values, keyed by parameter name.
+            Default: None, all the requried parameters are provided through
+            the `connections` dictionary
+        opt_params : dict of {str: object}, optional
+            The optional parameters to change from the default set, keyed by
+            parameter name. Default: None, use the values in `dflt_params`
+        force : bool
+            Force creation on duplicated parameters
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the workflow is not in construction
+        """
+        with qdb.sql_connection.TRN:
+            self._raise_if_not_in_construction()
+
+            # checking that the new number of artifacts is not above
+            # max_artifacts_in_workflow
+            current_artifacts = sum(
+                [len(j.command.outputs) for j in self.graph.nodes()])
+            to_add_artifacts = len(dflt_params.command.outputs)
+            total_artifacts = current_artifacts + to_add_artifacts
+            max_artifacts = qdb.util.max_artifacts_in_workflow()
+            if total_artifacts > max_artifacts:
+                raise ValueError(
+                    "Cannot add new job because it will create more "
+                    f"artifacts (current: {current_artifacts} + new: "
+                    f"{to_add_artifacts} = {total_artifacts}) that what is "
+                    f"allowed in a single workflow ({max_artifacts})")
+
+            if connections:
+                # The new Job depends on previous jobs in the workflow
+                req_params = req_params if req_params else {}
+                # Loop through all the connections to add the relevant
+                # parameters
+                for source, mapping in connections.items():
+                    source_id = source.id
+                    for out, in_param in mapping.items():
+                        req_params[in_param] = [source_id, out]
+
+                new_job = ProcessingJob.create(
+                    self.user, qdb.software.Parameters.from_default_params(
+                        dflt_params, req_params, opt_params=opt_params), force)
+
+                # SQL used to create the edges between jobs
+                sql = """INSERT INTO qiita.parent_processing_job
+                            (parent_id, child_id)
+                         VALUES (%s, %s)"""
+                sql_args = [[s.id, new_job.id] for s in connections]
+                qdb.sql_connection.TRN.add(sql, sql_args, many=True)
+                qdb.sql_connection.TRN.execute()
+            else:
+                # The new job doesn't depend on any previous job in the
+                # workflow, so it is a new root job
+                new_job = ProcessingJob.create(
+                    self.user, qdb.software.Parameters.from_default_params(
+                        dflt_params, req_params, opt_params=opt_params), force)
+                sql = """INSERT INTO qiita.processing_job_workflow_root
+                            (processing_job_workflow_id, processing_job_id)
+                         VALUES (%s, %s)"""
+                sql_args = [self.id, new_job.id]
+                qdb.sql_connection.TRN.add(sql, sql_args)
+                qdb.sql_connection.TRN.execute()
+
+            return new_job
+
+    def remove(self, job, cascade=False):
+        """Removes a given job from the workflow
+
+        Parameters
+        ----------
+        job : qiita_db.processing_job.ProcessingJob
+            The job to be removed
+        cascade : bool, optional
+            If true, remove the also the input job's children. Default: False.
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the workflow is not in construction
+            If the job to be removed has children and `cascade` is `False`
+        """
+        with qdb.sql_connection.TRN:
+            self._raise_if_not_in_construction()
+
+            # Check if the given job has children
+            children = list(job.children)
+            if children:
+                if not cascade:
+                    raise qdb.exceptions.QiitaDBOperationNotPermittedError(
+                        "Can't remove job '%s': it has children" % job.id)
+                else:
+                    # We need to remove all job's children, remove them first
+                    # and then remove the current job
+                    for c in children:
+                        self.remove(c, cascade=True)
+
+            # Remove any edges (it can only appear as a child)
+            sql = """DELETE FROM qiita.parent_processing_job
+                     WHERE child_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [job.id])
+
+            # Remove as root job
+            sql = """DELETE FROM qiita.processing_job_workflow_root
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [job.id])
+
+            # Remove the input reference
+            sql = """DELETE FROM qiita.artifact_processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [job.id])
+
+            # Remove the job
+            sql = """DELETE FROM qiita.processing_job
+                     WHERE processing_job_id = %s"""
+            qdb.sql_connection.TRN.add(sql, [job.id])
+
+            qdb.sql_connection.TRN.execute()
+
+    def submit(self):
+        """Submits the workflow to execution
+
+        Raises
+        ------
+        qiita_db.exceptions.QiitaDBOperationNotPermittedError
+            If the workflow is not in construction
+        """
+        with qdb.sql_connection.TRN:
+            self._raise_if_not_in_construction()
+
+            g = self.graph
+            # In order to avoid potential race conditions, we are going to set
+            # all the children in 'waiting' status before submitting
+            # the root nodes
+            in_degrees = dict(g.in_degree())
+            roots = []
+            for job, degree in in_degrees.items():
+                if degree == 0:
+                    roots.append(job)
+                else:
+                    job._set_status('waiting')
+
+            for job in roots:
+                job.submit()