[973924]: / qiita_pet / handlers / software.py

Download this file

206 lines (180 with data), 7.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from tornado.gen import coroutine
from qiita_core.util import execute_as_transaction
from qiita_db.software import Software, DefaultWorkflow
from .base_handlers import BaseHandler
from copy import deepcopy
class SoftwareHandler(BaseHandler):
@coroutine
@execute_as_transaction
def get(self):
# active True will only show active software
active = True
user = self.current_user
if user is not None and user.level in {'admin', 'dev'}:
active = False
software = Software.iter(active=active)
self.render("software.html", software=software)
def _retrive_workflows(active):
# helper method to avoid duplication of code
def _default_parameters_parsing(node):
dp = node.default_parameter
cmd = dp.command
cmd_name = 'params_%d' % node.id
rp = deepcopy(cmd.required_parameters)
op = deepcopy(cmd.optional_parameters)
params = dict()
for param, value in dp.values.items():
if param in rp:
del rp[param]
if param in op:
del op[param]
params[param] = str(value)
inputs = []
outputs = []
for input in rp.values():
accepted_values = ' | '.join(input[1])
inputs.append([cmd.id, accepted_values])
for output in cmd.outputs:
outputs.append([cmd.id, ' | '.join(output)])
fcmd_name = cmd.name if not cmd.naming_order else \
f'{cmd.name} | {dp.name}'
return ([cmd_name, cmd.id, fcmd_name, dp.name, params],
inputs, outputs)
workflows = []
for w in DefaultWorkflow.iter(active=active):
# getting the main default parameters
nodes = []
edges = []
at = w.artifact_type
# first get edges as this will give us the main connected commands
# and their order
graph = w.graph
# inputs is {input_type: node_name, ...} for easy look up of
# raw_inputs and reuse of the node_name
inputs = dict()
# main_nodes is {main_node_name: {
# output_type: output_node_name}, ...}
# for easy look up and merge of output_names
main_nodes = dict()
not_used_nodes = {n.id: n for n in graph.nodes}
for i, (x, y) in enumerate(graph.edges):
if x.id in not_used_nodes:
del not_used_nodes[x.id]
if y.id in not_used_nodes:
del not_used_nodes[y.id]
vals_x, input_x, output_x = _default_parameters_parsing(x)
vals_y, input_y, output_y = _default_parameters_parsing(y)
connections = []
for a, _, c in graph[x][y]['connections'].connections:
connections.append("%s | %s" % (a, c))
if i == 0:
# we are in the first element so we can specifically select
# the type we are looking for
if at in input_x[0][1]:
input_x[0][1] = at
else:
input_x[0][1] = '** WARNING, NOT DEFINED **'
name_x = vals_x[0]
name_y = vals_y[0]
if vals_x not in (nodes):
nodes.append(vals_x)
if name_x not in main_nodes:
main_nodes[name_x] = dict()
for a, b in input_x:
name = 'input_%s_%s' % (name_x, b)
if b in inputs:
name = inputs[b]
else:
name = 'input_%s_%s' % (name_x, b)
vals = [name, a, b]
if vals not in nodes:
inputs[b] = name
nodes.append(vals)
edges.append([name, vals_x[0]])
for a, b in output_x:
name = 'output_%s_%s' % (name_x, b)
vals = [name, a, b]
if vals not in nodes:
nodes.append(vals)
edges.append([name_x, name])
main_nodes[name_x][b] = name
if vals_y not in (nodes):
nodes.append(vals_y)
if name_y not in main_nodes:
main_nodes[name_y] = dict()
for a, b in input_y:
# checking if there is an overlap between the parameter
# and the connections; if there is, use the connection
overlap = set(main_nodes[name_x]) & set(connections)
if overlap:
# use the first hit
b = list(overlap)[0]
if b in main_nodes[name_x]:
name = main_nodes[name_x][b]
else:
name = 'input_%s_%s' % (name_y, b)
vals = [name, a, b]
if vals not in nodes:
nodes.append(vals)
edges.append([name, name_y])
for a, b in output_y:
name = 'output_%s_%s' % (name_y, b)
vals = [name, a, b]
if vals not in nodes:
nodes.append(vals)
edges.append([name_y, name])
main_nodes[name_y][b] = name
wparams = w.parameters
# adding nodes without edges
# as a first step if not_used_nodes is not empty we'll confirm that
# nodes/edges are empty; in theory we should never hit this
if not_used_nodes and (nodes or edges):
raise ValueError(
'Error, please check your workflow configuration')
# note that this block is similar but not identical to adding connected
# nodes
for i, (_, x) in enumerate(not_used_nodes.items()):
vals_x, input_x, output_x = _default_parameters_parsing(x)
if at in input_x[0][1]:
input_x[0][1] = at
else:
input_x[0][1] = '** WARNING, NOT DEFINED **'
name_x = vals_x[0]
if vals_x not in (nodes):
nodes.append(vals_x)
for a, b in input_x:
if b in inputs:
name = inputs[b]
else:
name = 'input_%s_%s' % (name_x, b)
nodes.append([name, a, b])
edges.append([name, vals_x[0]])
for a, b in output_x:
name = 'output_%s_%s' % (name_x, b)
nodes.append([name, a, b])
edges.append([name_x, name])
workflows.append(
{'name': w.name, 'id': w.id, 'data_types': w.data_type,
'description': w.description, 'active': w.active,
'parameters_sample': wparams['sample'],
'parameters_prep': wparams['prep'],
'nodes': nodes, 'edges': edges})
return workflows
class WorkflowsHandler(BaseHandler):
@coroutine
@execute_as_transaction
def get(self):
# active True will only show active workflows
active = True
user = self.current_user
if user is not None and user.level in {'admin', 'dev'}:
active = False
workflows = _retrive_workflows(active)
self.render("workflows.html", workflows=workflows)