1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.gridsystems.nextgrid.api.pom;
19
20 import java.net.URI;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.PriorityQueue;
26 import java.util.UUID;
27 import java.util.Map.Entry;
28
29 import javax.xml.namespace.QName;
30
31 import nextgrid.api.env.ProcessEnvironment;
32 import nextgrid.api.pem.POMListener;
33 import nextgrid.api.pom.AbstractProcess;
34 import nextgrid.api.pom.ControlProcess;
35 import nextgrid.api.pom.Process;
36 import nextgrid.api.pom.ProcessController;
37 import nextgrid.api.pom.ProcessException;
38 import nextgrid.api.pom.Reference;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42
43 import com.gridsystems.nextgrid.api.pom.ref.IncompatibleReferenceException;
44 import com.gridsystems.nextgrid.api.pom.ref.UnresolvedReferenceException;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public abstract class ProcessImpl extends PemHelper implements Enactable {
64
65
66
67
68 protected static final Log ENACTOR_LOG = LogFactory.getLog("ENACTOR");
69
70
71
72
73 public static final int DFA_INITIAL_STATE = 0;
74
75
76
77
78 protected static enum ValidationType {
79
80
81
82
83 BEFORE_CHILDREN,
84
85
86
87
88 AFTER_CHILDREN
89 };
90
91
92
93
94 private URI id;
95
96
97
98
99 private String name;
100
101
102
103
104 private String description;
105
106
107
108
109 private Process parent;
110
111
112
113
114 private Map<String, Reference<?>> inputs;
115
116
117
118
119 private Map<String, Reference<?>> outputs;
120
121
122
123
124
125 private Map<String, Class<?>> usedInputs;
126
127
128
129
130
131 private Map<String, Class<?>> usedOutputs;
132
133
134
135
136 private Map<QName, String> attribs;
137
138
139
140
141 private int state = DFA_INITIAL_STATE;
142
143
144
145
146 private boolean validated;
147
148
149
150
151 private boolean evaluated;
152
153
154
155
156 private boolean lazy;
157
158
159
160
161 protected ProcessImpl() {
162 this.id = genRandomURI();
163 this.inputs = new HashMap<String, Reference<?>>();
164 this.outputs = new HashMap<String, Reference<?>>();
165 this.usedInputs = new HashMap<String, Class<?>>();
166 this.usedOutputs = new HashMap<String, Class<?>>();
167 this.attribs = new HashMap<QName, String>();
168 }
169
170
171
172
173 public final Map<QName, String> getAttributes() {
174 return this.attribs;
175 }
176
177
178
179
180 public final String getAttribute(QName name) {
181 return this.attribs.get(name);
182 }
183
184
185
186
187 public final String getAttribute(String name) {
188 return this.attribs.get(new QName("", name));
189 }
190
191
192
193
194 public final String getAttribute(String ns, String local) {
195 return this.attribs.get(new QName(ns, local));
196 }
197
198
199
200
201 public final void setAttribute(QName name, String value) {
202 if (value == null) {
203 this.attribs.remove(name);
204 } else {
205 this.attribs.put(name, value);
206 }
207 }
208
209
210
211
212 public final void setAttribute(String name, String value) {
213 final QName key = new QName("", name);
214 if (value == null) {
215 this.attribs.remove(key);
216 } else {
217 this.attribs.put(key, value);
218 }
219 }
220
221
222
223
224
225
226
227 public final URI getId() {
228 return this.id;
229 }
230
231
232
233
234
235
236
237 public final String getName() {
238 return this.name;
239 }
240
241
242
243
244
245
246 public final String getDescription() {
247 return this.description;
248 }
249
250
251
252
253
254
255 public final Process getParent() {
256 return this.parent;
257 }
258
259
260
261
262
263
264 public final int getState() {
265 return this.state;
266 }
267
268
269
270
271
272
273 protected final void setState(int state) {
274 this.state = state;
275 }
276
277
278
279
280
281
282
283 public final Process findRoot() {
284 Process root = this;
285 Process localParent = root.getParent();
286 while (localParent != null) {
287 root = localParent;
288 localParent = root.getParent();
289 }
290
291 return root;
292 }
293
294
295
296
297
298
299
300 public final void setId(URI id) {
301 if (id == null) {
302 throw new NullPointerException("Null id");
303 } else {
304 this.id = id;
305 }
306 }
307
308
309
310
311
312
313 public final void setName(String name) {
314 this.name = name;
315 }
316
317
318
319
320
321
322 public final void setDescription(String description) {
323 this.description = description;
324 }
325
326
327
328
329
330
331 public final void setParent(Process parent) {
332 this.parent = parent;
333 }
334
335
336
337
338 public final void validate() throws ProcessException {
339 resolveReferences();
340
341
342 doValidate(ValidationType.BEFORE_CHILDREN);
343
344
345 if (this instanceof AbstractProcess) {
346 Process p = ((AbstractProcess)this).getSelected();
347 if (p != null) {
348 p.validate();
349 }
350 } else if (this instanceof ControlProcess) {
351 for (Process p : ((ControlProcess)this).getChildren()) {
352 p.validate();
353 }
354 }
355
356
357 doValidate(ValidationType.AFTER_CHILDREN);
358
359
360 validated = true;
361 }
362
363
364
365
366 public final void invalidate() {
367 validated = false;
368 }
369
370
371
372
373 public final boolean isLazy() {
374 return this.lazy;
375 }
376
377
378
379
380 public final void setLazy(boolean lazy) {
381 this.lazy = lazy;
382 }
383
384
385
386
387 public final boolean isValidated() {
388 return this.validated;
389 }
390
391
392
393
394 public final boolean isEvaluated() {
395 return this.evaluated;
396 }
397
398
399
400
401
402
403 protected final void setEvaluated(boolean evaluated) {
404 this.evaluated = evaluated;
405 }
406
407
408
409
410
411
412
413
414 protected final PriorityQueue<Process> prioritise(ProcessEnvironment env)
415 throws ProcessException {
416
417 PriorityQueue<Process> set = new PriorityQueue<Process>(1,
418 new ProcessPriorityComparator()
419 );
420 this.prioritise(env, set);
421
422 return set;
423 }
424
425
426
427
428
429
430
431 protected abstract void doValidate(ValidationType type)
432 throws ProcessException;
433
434
435
436
437 public final void evaluate(ProcessEnvironment env) throws ProcessException {
438 this.evaluate(env, false);
439 }
440
441
442
443
444
445
446
447
448
449 public final void evaluate(ProcessEnvironment env, boolean force)
450 throws ProcessException {
451 if (!isEvaluated()) {
452 ENACTOR_LOG.debug("Evaluating process...");
453 attachListeners(env);
454 ENACTOR_LOG.debug("Listeners attached");
455
456 if (force || !lazy) {
457 ENACTOR_LOG.debug("Evaluation required. Performing...");
458 try {
459 doEvaluate(env);
460 setEvaluated(true);
461 fireProcessEvaluated();
462 ENACTOR_LOG.debug("Evaluation finished (" + this.getId() + ")");
463 } catch (ProcessException e) {
464 fireProcessFailed(e);
465 throw e;
466 }
467 }
468 }
469 }
470
471
472
473
474
475
476
477 public abstract void doEvaluate(ProcessEnvironment env) throws ProcessException;
478
479
480
481
482 public final ProcessController enact(ProcessEnvironment env)
483 throws ProcessException {
484 evaluate(env, true);
485
486 if (env instanceof ProcessContext) {
487 ProcessContext ctx = (ProcessContext)env;
488 return new ThreadController(ctx, this);
489 } else {
490 throw new IllegalArgumentException("Only ProcessContext is supported");
491 }
492 }
493
494
495
496
497 protected abstract void doReset();
498
499
500
501
502 protected abstract void resetChildren();
503
504
505
506
507 public final void reset() {
508 this.evaluated = false;
509
510 for (Reference<?> ref : this.getOutputs().values()) {
511 ref.setValue(null);
512 }
513
514 resetChildren();
515
516 doReset();
517 }
518
519
520
521
522
523
524 protected final void attachListeners(ProcessEnvironment env) {
525 ENACTOR_LOG.debug("Looking for listeners for " + this.getId());
526 Collection<POMListener> collection = env.getListenersFor(this.getId());
527 for (POMListener l : collection) {
528 ENACTOR_LOG.debug("Found listener for " + this.getId() + ": " + l);
529 this.addListener(l);
530 }
531 }
532
533
534
535
536
537
538
539
540
541
542 public final Map<String, Reference<?>> getInputs() {
543 Map<String, Reference<?>> map = new HashMap<String, Reference<?>>();
544 getInputs(map);
545 return Collections.unmodifiableMap(map);
546 }
547
548
549
550
551
552
553
554 protected final void getInputs(Map<String, Reference<?>> map) {
555 if (map != null) {
556 if (this.getParent() instanceof ProcessImpl) {
557 ((ProcessImpl)this.getParent()).getInputs(map);
558 } else if (this.getParent() != null) {
559 map.putAll(this.getParent().getInputs());
560 }
561 map.putAll(this.inputs);
562 }
563 }
564
565
566
567
568
569
570 public final Map<String, Reference<?>> getOutputs() {
571 Map<String, Reference<?>> map = new HashMap<String, Reference<?>>();
572 getOutputs(map);
573 return Collections.unmodifiableMap(map);
574 }
575
576
577
578
579
580
581
582 protected final void getOutputs(Map<String, Reference<?>> map) {
583 if (map != null) {
584 if (this.getParent() instanceof ProcessImpl) {
585 ((ProcessImpl)this.getParent()).getOutputs(map);
586 } else if (this.getParent() != null) {
587 map.putAll(this.getParent().getOutputs());
588 }
589 map.putAll(this.outputs);
590 }
591 }
592
593
594
595
596
597
598
599
600
601
602 public final Reference<?> getInput(String key) {
603 Reference<?> ref = this.inputs.get(key);
604 if (ref == null && this.getParent() != null) {
605 ref = this.getParent().getInput(key);
606 }
607 return ref;
608 }
609
610
611
612
613
614
615
616
617
618
619 public final Reference<?> getOutput(String key) {
620 if (this.outputs.containsKey(key)) {
621 return this.outputs.get(key);
622 } else if (this.getParent() != null) {
623 return this.getParent().getOutput(key);
624 } else {
625 return null;
626 }
627 }
628
629
630
631
632
633
634
635 public final void putInput(String key, Reference<?> input) {
636 synchronized (inputs) {
637 Reference<?> old;
638 if (input == null) {
639 old = inputs.remove(key);
640 } else {
641 Class<?> type = usedInputs.get(key);
642 if (type != null && !input.canCastTo(type)) {
643 throw new ClassCastException("Type incompatible with resolved reference");
644 }
645 input.addReader(this);
646 old = inputs.put(key, input);
647 }
648 if (old != null) {
649 old.removeReader(this);
650 }
651 }
652 }
653
654
655
656
657
658
659
660 public final void putOutput(String key, Reference<?> output) {
661 synchronized (outputs) {
662 Reference<?> old;
663 if (output == null) {
664 old = outputs.remove(key);
665 } else {
666 Class<?> type = usedOutputs.get(key);
667 if (type != null && !output.canCastFrom(type)) {
668 ENACTOR_LOG.warn("Type incompatible with resolved reference");
669 ENACTOR_LOG.warn("- name: " + key);
670 ENACTOR_LOG.warn("- type: " + type);
671 ENACTOR_LOG.warn("- output: " + output.getValueType());
672 throw new ClassCastException("Type incompatible with resolved reference");
673 }
674 output.addWriter(this);
675 old = outputs.put(key, output);
676 }
677 if (old != null) {
678 old.removeWriter(this);
679 }
680 }
681 }
682
683
684
685
686
687
688 public final void removeInput(String key) {
689 putInput(key, null);
690 }
691
692
693
694
695
696
697 public final void removeOutput(String key) {
698 putOutput(key, null);
699 }
700
701
702
703
704 public final void useInput(String name, Class<?> type) {
705 Reference<?> ref = getInput(name);
706 if (ref != null && !ref.canCastTo(type)) {
707 throw new ClassCastException("Type incompatible with resolved reference");
708 }
709 usedInputs.put(name, type);
710 }
711
712
713
714
715 public final void useOutput(String name, Class<?> type) {
716 Reference<?> ref = getOutput(name);
717 if (ref != null && !ref.canCastTo(type)) {
718 throw new ClassCastException("Type incompatible with resolved reference");
719 }
720 usedOutputs.put(name, type);
721 }
722
723
724
725
726 public final void unuseInput(String... names) {
727 for (String n : names) {
728 usedInputs.remove(n);
729 }
730 }
731
732
733
734
735 public final void unuseOutput(String... names) {
736 for (String n : names) {
737 usedOutputs.remove(n);
738 }
739 }
740
741
742
743
744 public final String[] getUsedInputNames() {
745 return usedInputs.keySet().toArray(new String[usedInputs.size()]);
746 }
747
748
749
750
751 public final String[] getUsedOutputNames() {
752 return usedOutputs.keySet().toArray(new String[usedOutputs.size()]);
753 }
754
755
756
757
758 public final Class<?> getInputType(String name) {
759 return usedInputs.get(name);
760 }
761
762
763
764
765 public final Class<?> getOutputType(String name) {
766 return usedOutputs.get(name);
767 }
768
769
770
771
772
773
774
775
776
777
778
779
780 private void resolveReferences() throws ProcessException {
781
782 for (Entry<String, Class<?>> entry : usedInputs.entrySet()) {
783 final String key = entry.getKey();
784 final Class<?> type = entry.getValue();
785
786 Reference<?> ref = getInput(key);
787 if (ref == null) {
788 throw new UnresolvedReferenceException(key + "@" + this.getName());
789 } else if (ref.canCastTo(type)) {
790 ref.addReader(this);
791 } else {
792 throw new IncompatibleReferenceException(key);
793 }
794 }
795
796 for (Entry<String, Class<?>> entry : usedOutputs.entrySet()) {
797 final String key = entry.getKey();
798 final Class<?> type = entry.getValue();
799
800 Reference<?> ref = getOutput(key);
801 if (ref == null) {
802 throw new UnresolvedReferenceException(key);
803 } else if (ref.canCastTo(type)) {
804 ref.addReader(this);
805 } else {
806 throw new IncompatibleReferenceException(key);
807 }
808 }
809 }
810
811
812
813
814
815
816
817
818
819
820 protected void waitForInputs() throws InterruptedException {
821 ENACTOR_LOG.debug("Process '" + this.getName() + "': waiting for inputs");
822 for (Reference<?> ref : inputs.values()) {
823 ref.waitAvailable();
824 }
825 }
826
827
828
829
830
831
832 private URI genRandomURI() {
833 UUID uuid = UUID.randomUUID();
834 try {
835 return new URI("urn:uuid:" + uuid);
836 } catch (Exception e) {
837 throw new InternalError(e.toString());
838 }
839 }
840
841
842
843
844 @Override public Process copy() {
845 ProcessImpl copy = (ProcessImpl)super.copy();
846 copy.attribs = new HashMap<QName, String>(this.attribs);
847 copy.inputs = copyRefs(this.inputs);
848 copy.outputs = copyRefs(this.outputs);
849 copy.usedInputs = copyTypes(this.usedInputs);
850 copy.usedOutputs = copyTypes(this.usedOutputs);
851
852 copy.parent = null;
853
854 return copy;
855 }
856
857
858
859
860
861
862
863 private Map<String, Reference<?>> copyRefs(Map<String, Reference<?>> map) {
864 Map<String, Reference<?>> copy = new HashMap<String, Reference<?>>();
865
866 for (Map.Entry<String, Reference<?>> entry : map.entrySet()) {
867 copy.put(entry.getKey(), entry.getValue().copy());
868 }
869
870 return copy;
871 }
872
873
874
875
876
877
878
879 private Map<String, Class<?>> copyTypes(Map<String, Class<?>> map) {
880 Map<String, Class<?>> copy = new HashMap<String, Class<?>>();
881
882 for (Map.Entry<String, Class<?>> entry : map.entrySet()) {
883 copy.put(entry.getKey(), entry.getValue());
884 }
885
886 return copy;
887 }
888 }