View Javadoc

1   /*
2    Copyright (C) 2007 Grid Systems, S.A.
3   
4    This library is free software; you can redistribute it and/or
5    modify it under the terms of the GNU Lesser General Public
6    License as published by the Free Software Foundation; either
7    version 2.1 of the License, or (at your option) any later version.
8   
9    This library is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   Lesser General Public License for more details.
13  
14   You should have received a copy of the GNU Lesser General Public
15   License along with this library; if not, write to the Free Software
16   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
17  */
18  package com.gridsystems.nextgrid.api.pom;
19  
20  import java.io.Serializable;
21  import java.net.URI;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.locks.Lock;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  
35  import nextgrid.api.env.DiscoveryException;
36  import nextgrid.api.env.Prioritiser;
37  import nextgrid.api.env.ProcessDiscoverer;
38  import nextgrid.api.env.ProcessEnvironment;
39  import nextgrid.api.env.ProcessSelector;
40  import nextgrid.api.env.STSModule;
41  import nextgrid.api.pem.POMListener;
42  import nextgrid.api.pom.ProcessController;
43  import nextgrid.api.pom.ProcessException;
44  
45  /**
46   * Implementation of the {@link ProcessEnvironment} and {@link ProcessController}
47   * interfaces.
48   * <p>
49   *
50   *
51   * @author Rodrigo Ruiz
52   */
53  public class ProcessContext implements ProcessEnvironment, ProcessController {
54  
55    /**
56     * <code>serialVersionUID</code> attribute.
57     */
58    private static final long serialVersionUID = 9145702833459112038L;
59  
60    /**
61     * Environment logger.
62     */
63    private static final Log ENACTOR_LOG = LogFactory.getLog("ENACTOR");
64  
65    /**
66     * URI used to replace null values.
67     */
68    public static final URI NULL_ID = URI.create("#null");
69  
70    /**
71     * Map from process ID to discoverer.
72     */
73    private final Map<URI, ProcessDiscoverer> discoverers
74      = new ConcurrentHashMap<URI, ProcessDiscoverer>();
75  
76    /**
77     * Map from process ID to prioritiser.
78     */
79    private final Map<URI, Prioritiser> prioritisers
80      = new ConcurrentHashMap<URI, Prioritiser>();
81  
82    /**
83     * Map from process ID to selector.
84     */
85    private final Map<URI, ProcessSelector> selectors
86      = new ConcurrentHashMap<URI, ProcessSelector>();
87  
88    /**
89     * The STS Module.
90     */
91    private STSModule stsmod = null;
92  
93    /**
94     * Map from process ID to process state.
95     */
96    private final Map<String, Serializable> attribs
97      = new ConcurrentHashMap<String, Serializable>();
98  
99    /**
100    * Map from process ID to process listeners.
101    */
102   private final Map<URI, Collection<POMListener>> listeners
103     = new ConcurrentHashMap<URI, Collection<POMListener>>();
104 
105   /**
106    * Lock used to implement control operations.
107    */
108   private final Lock lock = new ReentrantLock();
109 
110   /**
111    * Workflow execution state.
112    */
113   private State state = State.RUNNING;
114 
115   /**
116    * Creates a new instance.
117    */
118   public ProcessContext() {
119   }
120 
121   /**
122    * {@inheritDoc}
123    */
124   public void cancel() throws ProcessException, InterruptedException {
125     lock.lock();
126     try {
127       if (state != State.CANCELLED && state != State.FINISHED) {
128         state = State.CANCELLED;
129       }
130     } finally {
131       lock.unlock();
132     }
133   }
134 
135   /**
136    * {@inheritDoc}
137    */
138   public State getState() {
139     lock.lock();
140     try {
141       return this.state;
142     } finally {
143       lock.unlock();
144     }
145   }
146 
147   /**
148    * {@inheritDoc}
149    */
150   public void pause() throws ProcessException, InterruptedException {
151     lock.lock();
152     try {
153       if (this.state == State.RUNNING) {
154         this.state = State.PAUSED;
155       }
156     } finally {
157       lock.unlock();
158     }
159   }
160 
161   /**
162    * {@inheritDoc}
163    */
164   public void resume() {
165     lock.lock();
166     try {
167       if (this.state == State.PAUSED) {
168         this.state = State.RUNNING;
169       }
170     } finally {
171       lock.unlock();
172     }
173   }
174 
175   /**
176    * {@inheritDoc}
177    */
178   public void start() {
179   }
180 
181   /**
182    * {@inheritDoc}
183    */
184   public void join() {
185   }
186 
187   /**
188    * {@inheritDoc}
189    */
190   public void run() {
191   }
192 
193   /**
194    * {@inheritDoc}
195    */
196   public ProcessDiscoverer getDiscovererFor(URI id) throws DiscoveryException {
197     ProcessDiscoverer disc = null;
198     if (id != null) {
199       disc = discoverers.get(id);
200     }
201     if (disc == null) {
202       disc = discoverers.get(NULL_ID);
203     }
204     return disc;
205   }
206 
207   /**
208    * {@inheritDoc}
209    */
210   public List<ProcessDiscoverer> getDiscoverers() {
211     return new ArrayList<ProcessDiscoverer>(discoverers.values());
212   }
213 
214   /**
215    * {@inheritDoc}
216    */
217   public Prioritiser getPrioritiserFor(URI id) {
218     Prioritiser p = null;
219     if (id != null) {
220       p = prioritisers.get(id);
221     }
222     if (p == null) {
223       p = prioritisers.get(NULL_ID);
224     }
225     return p;
226   }
227 
228   /**
229    * {@inheritDoc}
230    */
231   public List<Prioritiser> getPrioritisers() {
232     return new ArrayList<Prioritiser>(prioritisers.values());
233   }
234 
235   /**
236    * {@inheritDoc}
237    */
238   public ProcessSelector getSelectorFor(URI id) {
239     ProcessSelector ps = null;
240     if (id != null) {
241       ps = selectors.get(id);
242     }
243     if (ps == null) {
244       ps = selectors.get(NULL_ID);
245     }
246     return ps;
247   }
248 
249   /**
250    * {@inheritDoc}
251    */
252   public List<ProcessSelector> getSelectors() {
253     return new ArrayList<ProcessSelector>(selectors.values());
254   }
255 
256   /**
257    * Sets the discoverer associated to a given URI.
258    *
259    * @param id          A process id
260    * @param discoverer  A discoverer instance
261    */
262   public void setDiscovererFor(URI id, ProcessDiscoverer discoverer) {
263     final URI key = (id == null) ? NULL_ID : id;
264     if (discoverer == null) {
265       discoverers.remove(key);
266     } else {
267       discoverers.put(key, discoverer);
268     }
269   }
270 
271   /**
272    * Sets the discoverer instance associated to all processes without
273    * an explicit one.
274    *
275    * @param discoverer A discoverer instance
276    */
277   public void setDefaultDiscoverer(ProcessDiscoverer discoverer) {
278     setDiscovererFor(NULL_ID, discoverer);
279   }
280 
281   /**
282    * Sets the prioritiser associated to a given URI.
283    *
284    * @param id           A process id
285    * @param prioritiser  A prioritiser instance
286    */
287   public void setPrioritiser(URI id, Prioritiser prioritiser) {
288     final URI key = (id == null) ? NULL_ID : id;
289     if (prioritiser == null) {
290       prioritisers.remove(key);
291     } else {
292       prioritisers.put(key, prioritiser);
293     }
294   }
295 
296   /**
297    * Sets the prioritiser instance associated to all processes without
298    * an explicit one.
299    *
300    * @param prioritiser A prioritiser instance
301    */
302   public void setDefaultPrioritiser(Prioritiser prioritiser) {
303     setPrioritiser(NULL_ID, prioritiser);
304   }
305 
306   /**
307    * Sets the selector associated to a given URI.
308    *
309    * @param id        A process id
310    * @param selector  A selector instance
311    */
312   public void setSelectorFor(URI id, ProcessSelector selector) {
313     final URI key = (id == null) ? NULL_ID : id;
314     if (selector == null) {
315       selectors.remove(key);
316     } else {
317       selectors.put(key, selector);
318     }
319   }
320 
321   /**
322    * Sets the selector instance associated to all processes without
323    * an explicit one.
324    *
325    * @param selector A selector instance
326    */
327   public void setDefaultSelector(ProcessSelector selector) {
328     setSelectorFor(NULL_ID, selector);
329   }
330 
331   /**
332    * {@inheritDoc}
333    */
334   public STSModule getSTSModule() {
335     return this.stsmod;
336   }
337 
338   /**
339    * {@inheritDoc}
340    */
341   public void setSTSModule(STSModule stsmod) {
342     this.stsmod = stsmod;
343   }
344 
345   /**
346    * Sets a component that could potentially implement several interfaces.
347    * <p>
348    * This method will register the object to all relevant interfaces it
349    * implements.
350    *
351    * @param id  The process URI to associate the component to
352    * @param obj The component instance
353    */
354   public void setComponentFor(URI id, Object obj) {
355     if (obj != null) {
356       boolean isComponent = false;
357       if (obj instanceof STSModule) {
358         setSTSModule((STSModule)obj);
359         isComponent = true;
360       }
361 
362       if (obj instanceof ProcessSelector) {
363         setSelectorFor(id, (ProcessSelector)obj);
364         isComponent = true;
365       }
366 
367       if (obj instanceof ProcessDiscoverer) {
368         setDiscovererFor(id, (ProcessDiscoverer)obj);
369         isComponent = true;
370       }
371 
372       if (obj instanceof Prioritiser) {
373         setPrioritiser(id, (Prioritiser)obj);
374         isComponent = true;
375       }
376 
377       if (obj instanceof POMListener) {
378         addListenersFor(id, (POMListener)obj);
379         isComponent = true;
380       }
381 
382       if (!isComponent) {
383         ENACTOR_LOG.info("Object '" + obj + "' is not a component");
384       }
385     }
386   }
387 
388   /**
389    * {@inheritDoc}
390    */
391   public Serializable getAttribute(String id) {
392     return attribs.get(id);
393   }
394 
395   /**
396    * {@inheritDoc}
397    */
398   public void setAttribute(String id, Serializable value) {
399     attribs.put(id, value);
400   }
401 
402   /**
403    * {@inheritDoc}
404    */
405   public Map<String, Serializable> getAttributes() {
406     return Collections.unmodifiableMap(attribs);
407   }
408 
409   /**
410    * Adds one or more listeners to a given process.
411    *
412    * @param id        The id of the Process to attach these listeners to
413    * @param listeners The listeners to attach
414    */
415   public void addListenersFor(URI id, POMListener... listeners) {
416     Collection<POMListener> list = this.listeners.get(id);
417     if (list == null) {
418       list = new HashSet<POMListener>();
419       this.listeners.put(id, list);
420     }
421 
422     for (POMListener l : listeners) {
423       list.add(l);
424     }
425   }
426 
427   /**
428    * Removes one or more listeners from the attachment map.
429    *
430    * @param id        The id of the Process to detach these listeners from
431    * @param listeners The listeners to detach
432    */
433   public void removeListenersFrom(URI id, POMListener... listeners) {
434     Collection<POMListener> list = this.listeners.get(id);
435     if (list != null) {
436       for (POMListener l : listeners) {
437         list.remove(l);
438       }
439     }
440   }
441 
442   /**
443    * Clears all listeners registered in this environment instance.
444    */
445   public void clearListeners() {
446     this.listeners.clear();
447   }
448 
449   /**
450    * Clears all listeners registered for a given Process in this environment
451    * instance.
452    *
453    * @param id The id of the Process
454    */
455   public void clearListenersFor(URI id) {
456     this.listeners.remove(id);
457   }
458 
459   /**
460    * {@inheritDoc}
461    */
462   public Collection<POMListener> getListenersFor(URI id) {
463     Collection<POMListener> col = this.listeners.get(id);
464     if (col == null) {
465       return java.util.Collections.emptySet();
466     } else {
467       return new HashSet<POMListener>(col);
468     }
469   }
470 
471   // ========================================================================
472   // Helper methods to be used by Process implementations
473   // ========================================================================
474 
475   /**
476    * Waits until the workflow is in RUNNING state.
477    * <p>
478    * As a side effect, it adds the caller thread to an internal set, which
479    * is used for detecting the end of workflow executions.
480    *
481    * @return <tt>true</tt>  if the workflow is still active;
482    *         <tt>false</tt> if it has been cancelled
483    */
484   boolean isRunning() {
485     lock.lock();
486     try {
487       return state == State.RUNNING;
488     } finally {
489       lock.unlock();
490     }
491   }
492 
493   /**
494    * Creates a Thread that executes an Enactable instance taking care
495    * of exception handling.
496    *
497    * @param enactable The instance to execute
498    * @return A Thread wrapping the instance
499    */
500   public EnactionWorker createWorkerThread(Enactable enactable) {
501     return new EnactionWorker(this, enactable);
502   }
503 
504 }