View Javadoc

1   /*
2    Copyright (C) 2006 Grid Systems, S.A.
3   
4    This library is free software; you can redistribute it and/or
5    modify it under the terms of the GNU Lesser General Public
6    License as published by the Free Software Foundation; either
7    version 2.1 of the License, or (at your option) any later version.
8   
9    This library is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   Lesser General Public License for more details.
13  
14   You should have received a copy of the GNU Lesser General Public
15   License along with this library; if not, write to the Free Software
16   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
17  */
18  package com.gridsystems.nextgrid.api.pom;
19  
20  import java.util.BitSet;
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.locks.Condition;
23  import java.util.concurrent.locks.Lock;
24  import java.util.concurrent.locks.ReentrantLock;
25  
26  import nextgrid.api.pem.ProcessEvent;
27  import nextgrid.api.pem.ProcessListener;
28  import nextgrid.api.pom.Process;
29  import nextgrid.api.pom.ProcessController;
30  import nextgrid.api.pom.ProcessException;
31  import nextgrid.api.pom.Reference;
32  import nextgrid.api.pom.UnorderedListProcess;
33  
34  /**
35   * Control process that executes its children in a non-deterministic order,
36   * but never concurrently.
37   *
38   * @author Rodrigo Ruiz
39   */
40  public final class UnorderedListProcessImpl extends ControlProcessImpl
41    implements UnorderedListProcess {
42  
43    /**
44     * <code>serialVersionUID</code> attribute.
45     */
46    private static final long serialVersionUID = -8766837087826444925L;
47  
48    /**
49     * Timeout used for process notification wait.
50     */
51    private static final long WAIT_TIMEOUT = 200L;
52  
53    /**
54     * Flag set for knowing which child processes have already been enacted.
55     */
56    private BitSet enacted = new BitSet();
57  
58    /**
59     * Used for "process finished" notification.
60     */
61    private ProcessFinishedNotifier notifier = new ProcessFinishedNotifier();
62  
63    /**
64     * Creates a new instance.
65     */
66    public UnorderedListProcessImpl() {
67      super(-1);
68    }
69  
70    /**
71     * {@inheritDoc}
72     */
73    @Override
74    protected void doValidate(ValidationType when) throws ProcessException {
75      // No extra validation required here
76    }
77  
78    /**
79     * {@inheritDoc}
80     */
81    public void run(ProcessContext ctx) throws ProcessException, InterruptedException {
82      fireProcessStarted();
83      waitForInputs();
84  
85      Process[] children = getChildren();
86      int count = (children == null) ? 0 : children.length;
87  
88      while (this.enacted.cardinality() < count) {
89        if (!ctx.isRunning()) {
90          return;
91        }
92  
93        int index = selectChildToEnact();
94        while (index == -1 && ctx.isRunning()) {
95          if (waitForProcessFinalization(ctx)) {
96            index = selectChildToEnact();
97            if (ENACTOR_LOG.isDebugEnabled()) {
98              ENACTOR_LOG.debug("Child at position " + index + " unlocked");
99            }
100         }
101       }
102 
103       if (!ctx.isRunning()) {
104         return;
105       }
106 
107       ProcessController controller = children[index].enact(ctx);
108       controller.run();
109       this.enacted.set(index);
110     }
111 
112     if (ctx.isRunning()) {
113       fireProcessFinished();
114     }
115 
116   }
117 
118   /**
119    * Waits until a process in the same workflow as this instance has finished,
120    * or the enaction is cancelled / paused.
121    *
122    * @param ctx The process context
123    * @return <tt>true</tt> if a process has finished its execution
124    * @throws InterruptedException If the thread has been interrupted
125    */
126   private boolean waitForProcessFinalization(ProcessContext ctx)
127     throws InterruptedException {
128     this.findRoot().addListener(this.notifier);
129 
130     while (ctx.isRunning()) {
131       if (this.notifier.getCondition().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
132         return true;
133       }
134     }
135     return false;
136   }
137 
138   /**
139    * {@inheritDoc}
140    */
141   @Override protected void doReset() {
142     this.enacted.clear();
143   }
144 
145   /**
146    * {@inheritDoc}
147    */
148   @Override public String toString() {
149     return "UnorderedListProcess#" + this.getId();
150   }
151 
152   /**
153    * Searches a process still not enacted for which all its inputs are
154    * available.
155    * <p>
156    * If none is found, it blocks until the process is cancelled, or
157    * a child process is "unlocked".
158    *
159    * @return The index of the child process found, or -1 if none
160    */
161   private int selectChildToEnact() {
162     Process[] children = getChildren();
163     int count = (children == null) ? 0 : children.length;
164 
165     for (int i = 0; i < count; i++) {
166       if (!this.enacted.get(i)) {
167         if (!isLocked(children[i])) {
168           return i;
169         }
170       }
171     }
172 
173     return -1;
174   }
175 
176   /**
177    * Gets if the passed process is "locked". A process is locked if at least
178    * one of its used inputs is not available.
179    *
180    * @param p The process to test
181    * @return <tt>true</tt> if one of its used inputs is not available
182    */
183   private boolean isLocked(Process p) {
184     String[] names = p.getUsedInputNames();
185     for (String name : names) {
186       Reference<?> ref = p.getInput(name);
187       if (!ref.isAvailable()) {
188         return true;
189       }
190     }
191     return false;
192   }
193 
194   /**
195    * Notifies process finalisation events through a Condition.
196    *
197    * @author Rodrigo Ruiz
198    */
199   static class ProcessFinishedNotifier implements ProcessListener {
200 
201     /**
202      * Serial Version UID.
203      */
204     private static final long serialVersionUID = -4488491121616206883L;
205 
206     /**
207      * Lock used for creating the condition.
208      */
209     private Lock lock = new ReentrantLock();
210 
211     /**
212      * Managed condition.
213      */
214     private Condition cond = lock.newCondition();
215 
216     /**
217      * Gets the condition managed by this instance.
218      *
219      * @return The managed condition
220      */
221     public Condition getCondition() {
222       return this.cond;
223     }
224 
225     /**
226      * {@inheritDoc}
227      */
228     public void processEvaluated(ProcessEvent event) {
229     }
230 
231     /**
232      * {@inheritDoc}
233      */
234     public void processFinished(ProcessEvent event) {
235       this.cond.signalAll();
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     public void processFailed(ProcessEvent event) {
242     }
243 
244     /**
245      * {@inheritDoc}
246      */
247     public void processSelected(ProcessEvent event) {
248     }
249 
250     /**
251      * {@inheritDoc}
252      */
253     public void processStarted(ProcessEvent event) {
254     }
255 
256     /**
257      * {@inheritDoc}
258      */
259     public boolean isListeningToDescendants() {
260       return true;
261     }
262 
263   }
264 }