1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.gridsystems.nextgrid.api.pom;
19
20 import java.util.BitSet;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25
26 import nextgrid.api.pem.ProcessEvent;
27 import nextgrid.api.pem.ProcessListener;
28 import nextgrid.api.pom.Process;
29 import nextgrid.api.pom.ProcessController;
30 import nextgrid.api.pom.ProcessException;
31 import nextgrid.api.pom.Reference;
32 import nextgrid.api.pom.UnorderedListProcess;
33
34
35
36
37
38
39
40 public final class UnorderedListProcessImpl extends ControlProcessImpl
41 implements UnorderedListProcess {
42
43
44
45
46 private static final long serialVersionUID = -8766837087826444925L;
47
48
49
50
51 private static final long WAIT_TIMEOUT = 200L;
52
53
54
55
56 private BitSet enacted = new BitSet();
57
58
59
60
61 private ProcessFinishedNotifier notifier = new ProcessFinishedNotifier();
62
63
64
65
66 public UnorderedListProcessImpl() {
67 super(-1);
68 }
69
70
71
72
73 @Override
74 protected void doValidate(ValidationType when) throws ProcessException {
75
76 }
77
78
79
80
81 public void run(ProcessContext ctx) throws ProcessException, InterruptedException {
82 fireProcessStarted();
83 waitForInputs();
84
85 Process[] children = getChildren();
86 int count = (children == null) ? 0 : children.length;
87
88 while (this.enacted.cardinality() < count) {
89 if (!ctx.isRunning()) {
90 return;
91 }
92
93 int index = selectChildToEnact();
94 while (index == -1 && ctx.isRunning()) {
95 if (waitForProcessFinalization(ctx)) {
96 index = selectChildToEnact();
97 if (ENACTOR_LOG.isDebugEnabled()) {
98 ENACTOR_LOG.debug("Child at position " + index + " unlocked");
99 }
100 }
101 }
102
103 if (!ctx.isRunning()) {
104 return;
105 }
106
107 ProcessController controller = children[index].enact(ctx);
108 controller.run();
109 this.enacted.set(index);
110 }
111
112 if (ctx.isRunning()) {
113 fireProcessFinished();
114 }
115
116 }
117
118
119
120
121
122
123
124
125
126 private boolean waitForProcessFinalization(ProcessContext ctx)
127 throws InterruptedException {
128 this.findRoot().addListener(this.notifier);
129
130 while (ctx.isRunning()) {
131 if (this.notifier.getCondition().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
132 return true;
133 }
134 }
135 return false;
136 }
137
138
139
140
141 @Override protected void doReset() {
142 this.enacted.clear();
143 }
144
145
146
147
148 @Override public String toString() {
149 return "UnorderedListProcess#" + this.getId();
150 }
151
152
153
154
155
156
157
158
159
160
161 private int selectChildToEnact() {
162 Process[] children = getChildren();
163 int count = (children == null) ? 0 : children.length;
164
165 for (int i = 0; i < count; i++) {
166 if (!this.enacted.get(i)) {
167 if (!isLocked(children[i])) {
168 return i;
169 }
170 }
171 }
172
173 return -1;
174 }
175
176
177
178
179
180
181
182
183 private boolean isLocked(Process p) {
184 String[] names = p.getUsedInputNames();
185 for (String name : names) {
186 Reference<?> ref = p.getInput(name);
187 if (!ref.isAvailable()) {
188 return true;
189 }
190 }
191 return false;
192 }
193
194
195
196
197
198
199 static class ProcessFinishedNotifier implements ProcessListener {
200
201
202
203
204 private static final long serialVersionUID = -4488491121616206883L;
205
206
207
208
209 private Lock lock = new ReentrantLock();
210
211
212
213
214 private Condition cond = lock.newCondition();
215
216
217
218
219
220
221 public Condition getCondition() {
222 return this.cond;
223 }
224
225
226
227
228 public void processEvaluated(ProcessEvent event) {
229 }
230
231
232
233
234 public void processFinished(ProcessEvent event) {
235 this.cond.signalAll();
236 }
237
238
239
240
241 public void processFailed(ProcessEvent event) {
242 }
243
244
245
246
247 public void processSelected(ProcessEvent event) {
248 }
249
250
251
252
253 public void processStarted(ProcessEvent event) {
254 }
255
256
257
258
259 public boolean isListeningToDescendants() {
260 return true;
261 }
262
263 }
264 }