1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.gridsystems.nextgrid.api.pom;
19
20 import java.net.URI;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.PriorityQueue;
27
28 import javax.xml.namespace.QName;
29
30 import nextgrid.api.env.DiscoveryException;
31 import nextgrid.api.env.Prioritiser;
32 import nextgrid.api.env.ProcessDiscoverer;
33 import nextgrid.api.env.ProcessEnvironment;
34 import nextgrid.api.env.ProcessSelector;
35 import nextgrid.api.env.SelectionException;
36 import nextgrid.api.pem.DiscoveryEvent;
37 import nextgrid.api.pem.ProcessEvent;
38 import nextgrid.api.pom.AbstractProcess;
39 import nextgrid.api.pom.Process;
40 import nextgrid.api.pom.ProcessController;
41 import nextgrid.api.pom.ProcessException;
42 import nextgrid.api.pom.QueryProfile;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public final class AbstractProcessImpl extends ProcessImpl
66 implements AbstractProcess {
67
68
69
70
71 private static final long serialVersionUID = -345106213089801350L;
72
73
74
75
76 private List<Process> candidates = null;
77
78
79
80
81 private Process selected;
82
83
84
85
86 private int priority = UNDEFINED_PRIORITY;
87
88
89
90
91 private QueryProfile profile;
92
93
94
95
96 public AbstractProcessImpl() {
97 }
98
99
100
101
102
103
104 public AbstractProcessImpl(QueryProfile profile) {
105 this.profile = profile;
106 }
107
108
109
110
111
112
113
114
115 public List<Process> getCandidates() {
116 return this.candidates;
117 }
118
119
120
121
122 public void setCandidates(Collection<Process> candidates) {
123 if (candidates == null) {
124 this.candidates = null;
125 } else {
126 if (this.candidates == null) {
127 this.candidates = new ArrayList<Process>(candidates);
128 } else {
129 this.candidates.clear();
130 this.candidates.addAll(candidates);
131 }
132 }
133 }
134
135
136
137
138 public Process getSelected() {
139 return this.selected;
140 }
141
142
143
144
145
146
147 public void setSelected(Process selected) {
148 if (this.selected != null) {
149 this.selected.setParent(null);
150 }
151
152 this.selected = selected;
153
154 if (this.selected != null) {
155 this.selected.setParent(this);
156 }
157 }
158
159
160
161
162
163
164
165
166
167
168 public int getPriority() {
169 return this.priority;
170 }
171
172
173
174
175
176
177 public void setPriority(int priority) {
178 this.priority = priority;
179 }
180
181
182
183
184 public QueryProfile getProfile() {
185 return this.profile;
186 }
187
188
189
190
191 public void setProfile(QueryProfile profile) {
192 this.profile = profile;
193 }
194
195
196
197
198 public Process findProcessById(URI id) {
199 if (id == null) {
200 return null;
201 } else if (this.getId().equals(id)) {
202 return this;
203 } else if (this.getSelected() != null) {
204 return this.getSelected().findProcessById(id);
205 } else {
206 return null;
207 }
208 }
209
210
211
212
213 @Override protected void doValidate(ValidationType when)
214 throws ProcessException {
215
216 }
217
218
219
220
221 @Override
222 public void doEvaluate(ProcessEnvironment env) throws ProcessException {
223 this.selected = select(env);
224 if (this.selected != null) {
225 this.selected.evaluate(env);
226 }
227 }
228
229
230
231
232
233
234 private boolean hasPriority() {
235 return this.priority != UNDEFINED_PRIORITY;
236 }
237
238
239
240
241 public void prioritise(ProcessEnvironment env, PriorityQueue<Process> queue)
242 throws ProcessException {
243
244 if (!queue.contains(this) && !isEvaluated() && !isLazy()) {
245 if (!this.hasPriority()) {
246 URI id = this.getId();
247 Prioritiser prioritiser = env.getPrioritiserFor(id);
248 if (prioritiser == null) {
249 throw new ProcessException("No prioritiser found for process " + id);
250 } else {
251 prioritiser.prioritise(this);
252 }
253 }
254 queue.offer(this);
255 }
256 }
257
258
259
260
261 public void discover(ProcessEnvironment env) throws ProcessException {
262
263 if (this.getCandidates() == null) {
264 ProcessDiscoverer disc = env.getDiscovererFor(this.getId());
265
266
267 Process p = this;
268 while (disc == null && p.getParent() != null) {
269 p = p.getParent();
270 disc = env.getDiscovererFor(p.getId());
271 }
272
273 if (disc == null) {
274 throw new DiscoveryException("No discoverer found for process " + this.getId());
275 } else {
276 fireDiscovererSelected(new DiscoveryEvent(disc, this));
277 fireDiscoveryStarting(new DiscoveryEvent(disc, this, this.candidates));
278 try {
279 Collection<Process> found = disc.discover(this);
280 if (found == null || found.isEmpty()) {
281 throw new DiscoveryException("No candidates found");
282 } else {
283 this.setCandidates(found);
284 fireDiscoveryFinished(new DiscoveryEvent(disc, this, this.candidates));
285 }
286 } catch (DiscoveryException e) {
287 fireDiscoveryFailed(new DiscoveryEvent(disc, this, this.candidates, e));
288 throw e;
289 }
290 }
291 }
292 }
293
294
295
296
297
298
299
300
301 private Process select(ProcessEnvironment env) throws ProcessException {
302 if (this.selected == null) {
303 discover(env);
304
305 URI id = this.getId();
306
307 ProcessSelector selector = env.getSelectorFor(id);
308 if (selector == null) {
309 throw new ProcessException("No selector found for process " + id);
310 } else {
311 selector.select(this);
312 if (this.selected != null) {
313 this.selected.setParent(this);
314 matchParameters(this.selected);
315
316 fireProcessSelected(new ProcessEvent(this.selected));
317 }
318 }
319 }
320 return this.selected;
321 }
322
323
324
325
326 @Override protected void doReset() {
327 setCandidates(null);
328 setSelected(null);
329 }
330
331
332
333
334 @Override protected void resetChildren() {
335 if (this.selected != null) {
336 this.selected.reset();
337 }
338 }
339
340
341
342
343 public void run(ProcessContext ctx) throws ProcessException, InterruptedException {
344 fireProcessStarted();
345 waitForInputs();
346
347
348 if (this.selected == null) {
349 throw new SelectionException("No candidate selected for process '"
350 + this.getId() + "'");
351 } else {
352 ProcessController controller = this.selected.enact(ctx);
353 controller.run();
354 }
355
356 if (ctx.isRunning()) {
357 fireProcessFinished();
358 }
359 }
360
361
362
363
364 @Override public String toString() {
365 return "AbstractProcess#" + this.getId();
366 }
367
368
369
370
371
372
373
374
375
376
377 private void matchParameters(Process p) throws ProcessException {
378
379 for (String name : p.getUsedInputNames()) {
380 String match = findInputMatch(name);
381 if (match == null) {
382 throw new ProcessException("Could not match input " + p.getId() + "#" + name);
383 } else {
384 p.putInput(name, this.getInput(match));
385 }
386 }
387
388 for (String name : p.getUsedOutputNames()) {
389 String match = findOutputMatch(name);
390 if (match == null) {
391 throw new ProcessException("Could not match output " + p.getId() + "#" + name);
392 } else {
393 p.putOutput(name, this.getOutput(match));
394 }
395 }
396 }
397
398
399
400
401
402
403
404 private String findInputMatch(String name) {
405 String[] usedNames = this.getUsedInputNames();
406
407
408 Map<String, String> inputMap = new HashMap<String, String>();
409 for (Map.Entry<QName, String> entry : this.getAttributes().entrySet()) {
410 QName qname = entry.getKey();
411 if (qname.getNamespaceURI().equals("input-map")) {
412 inputMap.put(entry.getValue(), qname.getLocalPart());
413 }
414 }
415
416
417 if (inputMap.containsKey(name)) {
418 return inputMap.get(name);
419 }
420
421
422 for (String s : usedNames) {
423 if (s.equals(name)) {
424 return s;
425 }
426 }
427
428
429 String localName = name.substring(name.lastIndexOf('/'));
430 for (Map.Entry<String, String> entry : inputMap.entrySet()) {
431 String key = entry.getKey();
432 if (key != null && key.endsWith(localName)) {
433 return entry.getValue();
434 }
435 }
436
437
438 for (String s : usedNames) {
439 if (s.endsWith(localName)) {
440 return s;
441 }
442 }
443
444 return null;
445 }
446
447
448
449
450
451
452
453 private String findOutputMatch(String name) {
454 String[] usedNames = this.getUsedOutputNames();
455
456
457 Map<String, String> outputMap = new HashMap<String, String>();
458 for (Map.Entry<QName, String> entry : this.getAttributes().entrySet()) {
459 QName qname = entry.getKey();
460 if (qname.getNamespaceURI().equals("output-map")) {
461 outputMap.put(entry.getValue(), qname.getLocalPart());
462 }
463 }
464
465
466 if (outputMap.containsKey(name)) {
467 return outputMap.get(name);
468 }
469
470
471 for (String s : usedNames) {
472 if (s.equals(name)) {
473 return s;
474 }
475 }
476
477
478 String localName = name.substring(name.lastIndexOf('/'));
479 for (Map.Entry<String, String> entry : outputMap.entrySet()) {
480 String key = entry.getKey();
481 if (key != null && key.endsWith(localName)) {
482 return entry.getValue();
483 }
484 }
485
486
487 for (String s : usedNames) {
488 if (s.endsWith(localName)) {
489 return s;
490 }
491 }
492
493 return null;
494 }
495
496
497
498
499 @Override public Process copy() {
500 AbstractProcessImpl copy = (AbstractProcessImpl)super.copy();
501
502 if (this.candidates != null) {
503 copy.candidates = new ArrayList<Process>();
504 for (Process p : candidates) {
505 copy.candidates.add(p.copy());
506 }
507 }
508 copy.setSelected((selected == null) ? null : selected.copy());
509
510 return copy;
511 }
512 }