View Javadoc

1   /*
2    Copyright (C) 2006 Grid Systems, S.A.
3   
4    This library is free software; you can redistribute it and/or
5    modify it under the terms of the GNU Lesser General Public
6    License as published by the Free Software Foundation; either
7    version 2.1 of the License, or (at your option) any later version.
8   
9    This library is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   Lesser General Public License for more details.
13  
14   You should have received a copy of the GNU Lesser General Public
15   License along with this library; if not, write to the Free Software
16   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
17  */
18  package com.gridsystems.nextgrid.api.pom;
19  
20  import java.net.URI;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.Map;
25  import java.util.PriorityQueue;
26  import java.util.UUID;
27  import java.util.Map.Entry;
28  
29  import javax.xml.namespace.QName;
30  
31  import nextgrid.api.env.ProcessEnvironment;
32  import nextgrid.api.pem.POMListener;
33  import nextgrid.api.pom.AbstractProcess;
34  import nextgrid.api.pom.ControlProcess;
35  import nextgrid.api.pom.Process;
36  import nextgrid.api.pom.ProcessController;
37  import nextgrid.api.pom.ProcessException;
38  import nextgrid.api.pom.Reference;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  
43  import com.gridsystems.nextgrid.api.pom.ref.IncompatibleReferenceException;
44  import com.gridsystems.nextgrid.api.pom.ref.UnresolvedReferenceException;
45  
46  /**
47   * Base implementation for Processes.
48   * <p>
49   * To enact a process, the programmer should invoke the following operation:
50   *
51   * <pre>
52   * EnactorContext context = ...;
53   * Process p = ...;
54   * try {
55   *   p.enact(context);
56   * } catch (Exception e) {
57   *   ...
58   * }
59   * </pre>
60   *
61   * @author Rodrigo Ruiz
62   */
63  public abstract class ProcessImpl extends PemHelper implements Enactable {
64  
65    /**
66     * Enaction logger.
67     */
68    protected static final Log ENACTOR_LOG = LogFactory.getLog("ENACTOR");
69  
70    /**
71     * Initial state of the internal DFA.
72     */
73    public static final int DFA_INITIAL_STATE = 0;
74  
75    /**
76     * Validation types.
77     */
78    protected static enum ValidationType {
79      /**
80       * This value is passed to {@link #doValidate(int)} before child processes
81       * are validated.
82       */
83      BEFORE_CHILDREN,
84      /**
85       * This value is passed to {@link #doValidate(int)} after child processes
86       * are validated.
87       */
88      AFTER_CHILDREN
89    };
90  
91    /**
92     * The id of this process.
93     */
94    private URI id;
95  
96    /**
97     * The name of this process.
98     */
99    private String name;
100 
101   /**
102    * Human-readable description of this process.
103    */
104   private String description;
105 
106   /**
107    * Parent process.
108    */
109   private Process parent;
110 
111   /**
112    * Map of input references. The keys are the reference IDs.
113    */
114   private Map<String, Reference<?>> inputs;
115 
116   /**
117    * Map of output references. The keys are the reference IDs.
118    */
119   private Map<String, Reference<?>> outputs;
120 
121   /**
122    * Set containing the names of the inputs that will be "locally" used
123    * by this process.
124    */
125   private Map<String, Class<?>> usedInputs;
126 
127   /**
128    * Set containing the names of the outputs that will be "locally" used
129    * by this process.
130    */
131   private Map<String, Class<?>> usedOutputs;
132 
133   /**
134    * Attribute map.
135    */
136   private Map<QName, String> attribs;
137 
138   /**
139    * Process enaction state. Used to implement an internal DFA.
140    */
141   private int state = DFA_INITIAL_STATE;
142 
143   /**
144    * Flag that marks if this process has already been validated.
145    */
146   private boolean validated;
147 
148   /**
149    * Evaluation flag.
150    */
151   private boolean evaluated;
152 
153   /**
154    * Lazy flag.
155    */
156   private boolean lazy;
157 
158   /**
159    * Creates a new instance with a default empty model.
160    */
161   protected ProcessImpl() {
162     this.id = genRandomURI();
163     this.inputs = new HashMap<String, Reference<?>>();
164     this.outputs = new HashMap<String, Reference<?>>();
165     this.usedInputs = new HashMap<String, Class<?>>();
166     this.usedOutputs = new HashMap<String, Class<?>>();
167     this.attribs = new HashMap<QName, String>();
168   }
169 
170   /**
171    * {@inheritDoc}
172    */
173   public final Map<QName, String> getAttributes() {
174     return this.attribs;
175   }
176 
177   /**
178    * {@inheritDoc}
179    */
180   public final String getAttribute(QName name) {
181     return this.attribs.get(name);
182   }
183 
184   /**
185    * {@inheritDoc}
186    */
187   public final String getAttribute(String name) {
188     return this.attribs.get(new QName("", name));
189   }
190 
191   /**
192    * {@inheritDoc}
193    */
194   public final String getAttribute(String ns, String local) {
195     return this.attribs.get(new QName(ns, local));
196   }
197 
198   /**
199    * {@inheritDoc}
200    */
201   public final void setAttribute(QName name, String value) {
202     if (value == null) {
203       this.attribs.remove(name);
204     } else {
205       this.attribs.put(name, value);
206     }
207   }
208 
209   /**
210    * {@inheritDoc}
211    */
212   public final void setAttribute(String name, String value) {
213     final QName key = new QName("", name);
214     if (value == null) {
215       this.attribs.remove(key);
216     } else {
217       this.attribs.put(key, value);
218     }
219   }
220 
221   /**
222    * Gets the id of this process instance. The id must be unique within the
223    * scope this instance is defined in.
224    *
225    * @return The process id
226    */
227   public final URI getId() {
228     return this.id;
229   }
230 
231   /**
232    * Gets the name of this process. This is just a human-readable label for
233    * presentation purposes.
234    *
235    * @return The process name
236    */
237   public final String getName() {
238     return this.name;
239   }
240 
241   /**
242    * Gets a human-readable description of this process.
243    *
244    * @return A human-readable description of this process
245    */
246   public final String getDescription() {
247     return this.description;
248   }
249 
250   /**
251    * Gets the parent process, or null if there is no parent.
252    *
253    * @return The parent process
254    */
255   public final Process getParent() {
256     return this.parent;
257   }
258 
259   /**
260    * Gets the internal DFA state of this process.
261    *
262    * @return The internal DFA state
263    */
264   public final int getState() {
265     return this.state;
266   }
267 
268   /**
269    * Sets the internal DFA state of this process.
270    *
271    * @param state The internal DFA state
272    */
273   protected final void setState(int state) {
274     this.state = state;
275   }
276 
277   /**
278    * Searches up through the process hierarchy to find the root process of the
279    * workflow this process is defined in.
280    *
281    * @return The root process of the workflow
282    */
283   public final Process findRoot() {
284     Process root = this;
285     Process localParent = root.getParent();
286     while (localParent != null) {
287       root = localParent;
288       localParent = root.getParent();
289     }
290 
291     return root;
292   }
293 
294   /**
295    * Sets the id of this process instance.
296    *
297    * @param id A unique identifier
298    * @throws NullPointerException     If id is null
299    */
300   public final void setId(URI id) {
301     if (id == null) {
302       throw new NullPointerException("Null id");
303     } else {
304       this.id = id;
305     }
306   }
307 
308   /**
309    * Sets this process name.
310    *
311    * @param name The process name
312    */
313   public final void setName(String name) {
314     this.name = name;
315   }
316 
317   /**
318    * Sets the description of this process.
319    *
320    * @param description A textual description
321    */
322   public final void setDescription(String description) {
323     this.description = description;
324   }
325 
326   /**
327    * Sets the parent process instance.
328    *
329    * @param parent The parent process
330    */
331   public final void setParent(Process parent) {
332     this.parent = parent;
333   }
334 
335   /**
336    * {@inheritDoc}
337    */
338   public final void validate() throws ProcessException {
339     resolveReferences();
340 
341     // Local validation
342     doValidate(ValidationType.BEFORE_CHILDREN);
343 
344     // Validate children processes
345     if (this instanceof AbstractProcess) {
346       Process p = ((AbstractProcess)this).getSelected();
347       if (p != null) {
348         p.validate();
349       }
350     } else if (this instanceof ControlProcess) {
351       for (Process p : ((ControlProcess)this).getChildren()) {
352         p.validate();
353       }
354     }
355 
356     // Local validation
357     doValidate(ValidationType.AFTER_CHILDREN);
358 
359     // Mark this process as already validated
360     validated = true;
361   }
362 
363   /**
364    * {@inheritDoc}
365    */
366   public final void invalidate() {
367     validated = false;
368   }
369 
370   /**
371    * {@inheritDoc}
372    */
373   public final boolean isLazy() {
374     return this.lazy;
375   }
376 
377   /**
378    * {@inheritDoc}
379    */
380   public final void setLazy(boolean lazy) {
381     this.lazy = lazy;
382   }
383 
384   /**
385    * {@inheritDoc}
386    */
387   public final boolean isValidated() {
388     return this.validated;
389   }
390 
391   /**
392    * {@inheritDoc}
393    */
394   public final boolean isEvaluated() {
395     return this.evaluated;
396   }
397 
398   /**
399    * Sets the evaluation flag value.
400    *
401    * @param evaluated The new value
402    */
403   protected final void setEvaluated(boolean evaluated) {
404     this.evaluated = evaluated;
405   }
406 
407   /**
408    * Shortcut for the interface method <tt>prioritise</tt>.
409    *
410    * @param env The process environment to use
411    * @return A set of abstract processes
412    * @throws ProcessException If an error occurs
413    */
414   protected final PriorityQueue<Process> prioritise(ProcessEnvironment env)
415     throws ProcessException {
416 
417     PriorityQueue<Process> set = new PriorityQueue<Process>(1,
418       new ProcessPriorityComparator()
419     );
420     this.prioritise(env, set);
421 
422     return set;
423   }
424 
425   /**
426    * Performs any validation action needed for the process implementation.
427    *
428    * @param type Specifies when this validation is being invoked
429    * @throws ProcessException If the validation fails
430    */
431   protected abstract void doValidate(ValidationType type)
432     throws ProcessException;
433 
434   /**
435    * {@inheritDoc}
436    */
437   public final void evaluate(ProcessEnvironment env) throws ProcessException {
438     this.evaluate(env, false);
439   }
440 
441   /**
442    * Evaluates this process. It uses the "force" parameter to override the
443    * value of the lazy attribute.
444    *
445    * @param env   The environment to use
446    * @param force Whether to force the evaluation or not
447    * @throws ProcessException If an error occurs
448    */
449   public final void evaluate(ProcessEnvironment env, boolean force)
450     throws ProcessException {
451     if (!isEvaluated()) {
452       ENACTOR_LOG.debug("Evaluating process...");
453       attachListeners(env);
454       ENACTOR_LOG.debug("Listeners attached");
455 
456       if (force || !lazy) {
457         ENACTOR_LOG.debug("Evaluation required. Performing...");
458         try {
459           doEvaluate(env);
460           setEvaluated(true);
461           fireProcessEvaluated();
462           ENACTOR_LOG.debug("Evaluation finished (" + this.getId() + ")");
463         } catch (ProcessException e) {
464           fireProcessFailed(e);
465           throw e;
466         }
467       }
468     }
469   }
470 
471   /**
472    * Performs the actual actions needed for evaluating this process instance.
473    *
474    * @param env The process environment
475    * @throws ProcessException If an error occurs
476    */
477   public abstract void doEvaluate(ProcessEnvironment env) throws ProcessException;
478 
479   /**
480    * {@inheritDoc}
481    */
482   public final ProcessController enact(ProcessEnvironment env)
483     throws ProcessException {
484     evaluate(env, true);
485 
486     if (env instanceof ProcessContext) {
487       ProcessContext ctx = (ProcessContext)env;
488       return new ThreadController(ctx, this);
489     } else {
490       throw new IllegalArgumentException("Only ProcessContext is supported");
491     }
492   }
493 
494   /**
495    * Performs specific reset actions in each subclass.
496    */
497   protected abstract void doReset();
498 
499   /**
500    * Resets any child process.
501    */
502   protected abstract void resetChildren();
503 
504   /**
505    * {@inheritDoc}
506    */
507   public final void reset() {
508     this.evaluated = false;
509 
510     for (Reference<?> ref : this.getOutputs().values()) {
511       ref.setValue(null);
512     }
513 
514     resetChildren();
515 
516     doReset();
517   }
518 
519   /**
520    * Attaches listeners configured in the environment to this process.
521    *
522    * @param env The environment
523    */
524   protected final void attachListeners(ProcessEnvironment env) {
525     ENACTOR_LOG.debug("Looking for listeners for " + this.getId());
526     Collection<POMListener> collection = env.getListenersFor(this.getId());
527     for (POMListener l : collection) {
528       ENACTOR_LOG.debug("Found listener for " + this.getId() + ": " + l);
529       this.addListener(l);
530     }
531   }
532 
533   // -------------------------------------------------------------
534   // Input/Output management
535   // -------------------------------------------------------------
536 
537   /**
538    * Gets the map of references to input parameters.
539    *
540    * @return the input parameter reference map
541    */
542   public final Map<String, Reference<?>> getInputs() {
543     Map<String, Reference<?>> map = new HashMap<String, Reference<?>>();
544     getInputs(map);
545     return Collections.unmodifiableMap(map);
546   }
547 
548   /**
549    * Gets all inputs of this process, including inherited ones into the
550    * specified map.
551    *
552    * @param map The map to fill
553    */
554   protected final void getInputs(Map<String, Reference<?>> map) {
555     if (map != null) {
556       if (this.getParent() instanceof ProcessImpl) {
557         ((ProcessImpl)this.getParent()).getInputs(map);
558       } else if (this.getParent() != null) {
559         map.putAll(this.getParent().getInputs());
560       }
561       map.putAll(this.inputs);
562     }
563   }
564 
565   /**
566    * Gets the map of references to output parameters.
567    *
568    * @return the output parameter reference map
569    */
570   public final Map<String, Reference<?>> getOutputs() {
571     Map<String, Reference<?>> map = new HashMap<String, Reference<?>>();
572     getOutputs(map);
573     return Collections.unmodifiableMap(map);
574   }
575 
576   /**
577    * Gets all outputs of this process, including inherited ones into the
578    * specified map.
579    *
580    * @param map The map to fill
581    */
582   protected final void getOutputs(Map<String, Reference<?>> map) {
583     if (map != null) {
584       if (this.getParent() instanceof ProcessImpl) {
585         ((ProcessImpl)this.getParent()).getOutputs(map);
586       } else if (this.getParent() != null) {
587         map.putAll(this.getParent().getOutputs());
588       }
589       map.putAll(this.outputs);
590     }
591   }
592 
593   /**
594    * Gets an input parameter by its name.
595    * <p>
596    * Parameters are inheritable, so if a local parameter is not found, the
597    * search will be continued on the parent process.
598    *
599    * @param key The parameter name
600    * @return The parameter, or null if none exists with the specified name
601    */
602   public final Reference<?> getInput(String key) {
603     Reference<?> ref = this.inputs.get(key);
604     if (ref == null  && this.getParent() != null) {
605       ref = this.getParent().getInput(key);
606     }
607     return ref;
608   }
609 
610   /**
611    * Gets an output parameter by its name.
612    * <p>
613    * Parameters are inheritable, so if a local parameter is not found, the
614    * search will be continued on the parent process.
615    *
616    * @param key The parameter name
617    * @return The parameter, or null if none exists with the specified name
618    */
619   public final Reference<?> getOutput(String key) {
620     if (this.outputs.containsKey(key)) {
621       return this.outputs.get(key);
622     } else if (this.getParent() != null) {
623       return this.getParent().getOutput(key);
624     } else {
625       return null;
626     }
627   }
628 
629   /**
630    * Puts a reference into the map of input parameters for this process.
631    *
632    * @param key   The parameter name
633    * @param input The parameter
634    */
635   public final void putInput(String key, Reference<?> input) {
636     synchronized (inputs) {
637       Reference<?> old;
638       if (input == null) {
639         old = inputs.remove(key);
640       } else {
641         Class<?> type = usedInputs.get(key);
642         if (type != null && !input.canCastTo(type)) {
643           throw new ClassCastException("Type incompatible with resolved reference");
644         }
645         input.addReader(this);
646         old = inputs.put(key, input);
647       }
648       if (old != null) {
649         old.removeReader(this);
650       }
651     }
652   }
653 
654   /**
655    * Puts a reference into the map of output parameters for this process.
656    *
657    * @param key    The parameter name
658    * @param output The parameter
659    */
660   public final void putOutput(String key, Reference<?> output) {
661     synchronized (outputs) {
662       Reference<?> old;
663       if (output == null) {
664         old = outputs.remove(key);
665       } else {
666         Class<?> type = usedOutputs.get(key);
667         if (type != null && !output.canCastFrom(type)) {
668           ENACTOR_LOG.warn("Type incompatible with resolved reference");
669           ENACTOR_LOG.warn("- name: " + key);
670           ENACTOR_LOG.warn("- type: " + type);
671           ENACTOR_LOG.warn("- output: " + output.getValueType());
672           throw new ClassCastException("Type incompatible with resolved reference");
673         }
674         output.addWriter(this);
675         old = outputs.put(key, output);
676       }
677       if (old != null) {
678         old.removeWriter(this);
679       }
680     }
681   }
682 
683   /**
684    * Removes a reference from the input map.
685    *
686    * @param key The name of the reference to remove
687    */
688   public final void removeInput(String key) {
689     putInput(key, null);
690   }
691 
692   /**
693    * Removes a reference from the output map.
694    *
695    * @param key The name of the reference to remove
696    */
697   public final void removeOutput(String key) {
698     putOutput(key, null);
699   }
700 
701   /**
702    * {@inheritDoc}
703    */
704   public final void useInput(String name, Class<?> type) {
705     Reference<?> ref = getInput(name);
706     if (ref != null && !ref.canCastTo(type)) {
707       throw new ClassCastException("Type incompatible with resolved reference");
708     }
709     usedInputs.put(name, type);
710   }
711 
712   /**
713    * {@inheritDoc}
714    */
715   public final void useOutput(String name, Class<?> type) {
716     Reference<?> ref = getOutput(name);
717     if (ref != null && !ref.canCastTo(type)) {
718       throw new ClassCastException("Type incompatible with resolved reference");
719     }
720     usedOutputs.put(name, type);
721   }
722 
723   /**
724    * {@inheritDoc}
725    */
726   public final void unuseInput(String... names) {
727     for (String n : names) {
728       usedInputs.remove(n);
729     }
730   }
731 
732   /**
733    * {@inheritDoc}
734    */
735   public final void unuseOutput(String... names) {
736     for (String n : names) {
737       usedOutputs.remove(n);
738     }
739   }
740 
741   /**
742    * {@inheritDoc}
743    */
744   public final String[] getUsedInputNames() {
745     return usedInputs.keySet().toArray(new String[usedInputs.size()]);
746   }
747 
748   /**
749    * {@inheritDoc}
750    */
751   public final String[] getUsedOutputNames() {
752     return usedOutputs.keySet().toArray(new String[usedOutputs.size()]);
753   }
754 
755   /**
756    * {@inheritDoc}
757    */
758   public final Class<?> getInputType(String name) {
759     return usedInputs.get(name);
760   }
761 
762   /**
763    * {@inheritDoc}
764    */
765   public final Class<?> getOutputType(String name) {
766     return usedOutputs.get(name);
767   }
768 
769   /**
770    * Searches through all processes in the workflow (considering this process
771    * as the root node), and tries to resolve any used input or output to
772    * a reference.
773    * <p>
774    * During this process, reference reader and writer lists are synchronised
775    * with the current workflow layout.
776    *
777    * @throws ProcessException If a reference cannot be resolved or an
778    *                          incompatibility is found
779    */
780   private void resolveReferences() throws ProcessException {
781 
782     for (Entry<String, Class<?>> entry : usedInputs.entrySet()) {
783       final String key = entry.getKey();
784       final Class<?> type = entry.getValue();
785 
786       Reference<?> ref = getInput(key);
787       if (ref == null) {
788         throw new UnresolvedReferenceException(key + "@" + this.getName());
789       } else if (ref.canCastTo(type)) {
790         ref.addReader(this);
791       } else {
792         throw new IncompatibleReferenceException(key);
793       }
794     }
795 
796     for (Entry<String, Class<?>> entry : usedOutputs.entrySet()) {
797       final String key = entry.getKey();
798       final Class<?> type = entry.getValue();
799 
800       Reference<?> ref = getOutput(key);
801       if (ref == null) {
802         throw new UnresolvedReferenceException(key);
803       } else if (ref.canCastTo(type)) {
804         ref.addReader(this);
805       } else {
806         throw new IncompatibleReferenceException(key);
807       }
808     }
809   }
810 
811   // -------------------------------------------------------------
812   // Miscelanea
813   // -------------------------------------------------------------
814 
815   /**
816    * Waits until all input parameters are available for reading.
817    *
818    * @throws InterruptedException If the thread is interrupted
819    */
820   protected void waitForInputs() throws InterruptedException {
821     ENACTOR_LOG.debug("Process '" + this.getName() + "': waiting for inputs");
822     for (Reference<?> ref : inputs.values()) {
823       ref.waitAvailable();
824     }
825   }
826 
827   /**
828    * Generates a random URI in the UUID name space.
829    *
830    * @return An UUID encapsulated within a URI
831    */
832   private URI genRandomURI() {
833     UUID uuid = UUID.randomUUID();
834     try {
835       return new URI("urn:uuid:" + uuid);
836     } catch (Exception e) {
837       throw new InternalError(e.toString());
838     }
839   }
840 
841   /**
842    * {@inheritDoc}
843    */
844   @Override public Process copy() {
845     ProcessImpl copy = (ProcessImpl)super.copy();
846     copy.attribs = new HashMap<QName, String>(this.attribs);
847     copy.inputs = copyRefs(this.inputs);
848     copy.outputs = copyRefs(this.outputs);
849     copy.usedInputs = copyTypes(this.usedInputs);
850     copy.usedOutputs = copyTypes(this.usedOutputs);
851 
852     copy.parent = null;
853 
854     return copy;
855   }
856 
857   /**
858    * Creates a copy of the specified reference map.
859    *
860    * @param map The map to copy
861    * @return The copy
862    */
863   private Map<String, Reference<?>> copyRefs(Map<String, Reference<?>> map) {
864     Map<String, Reference<?>> copy = new HashMap<String, Reference<?>>();
865 
866     for (Map.Entry<String, Reference<?>> entry : map.entrySet()) {
867       copy.put(entry.getKey(), entry.getValue().copy());
868     }
869 
870     return copy;
871   }
872 
873   /**
874    * Creates a copy of the specified parameter type map.
875    *
876    * @param map The map to copy
877    * @return The copy
878    */
879   private Map<String, Class<?>> copyTypes(Map<String, Class<?>> map) {
880     Map<String, Class<?>> copy = new HashMap<String, Class<?>>();
881 
882     for (Map.Entry<String, Class<?>> entry : map.entrySet()) {
883       copy.put(entry.getKey(), entry.getValue());
884     }
885 
886     return copy;
887   }
888 }