1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package com.gridsystems.nextgrid.api.pom.ref;
19
20 import java.util.Collections;
21 import java.util.HashSet;
22 import java.util.Set;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReadWriteLock;
26 import java.util.concurrent.locks.ReentrantLock;
27 import java.util.concurrent.locks.ReentrantReadWriteLock;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31
32 import nextgrid.api.pom.Process;
33 import nextgrid.api.pom.Reference;
34
35
36
37
38
39
40
41 public abstract class ReferenceSupport<T> implements Reference<T> {
42
43
44
45
46 private static final long serialVersionUID = 4543296708806798851L;
47
48
49
50
51 protected static final Log LOG = LogFactory.getLog("ENACTOR");
52
53
54
55
56 private Set<Process> readers;
57
58
59
60
61 private Set<Process> writers;
62
63
64
65
66 private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
67
68
69
70
71 private Lock availLock = new ReentrantLock(true);
72
73
74
75
76 private Condition cAvailable = availLock.newCondition();
77
78
79
80
81 boolean available;
82
83
84
85
86
87 public ReferenceSupport() {
88 this(new HashSet<Process>(), new HashSet<Process>());
89 }
90
91
92
93
94
95
96
97
98 public ReferenceSupport(Set<Process> readers,
99 Set<Process> writers) {
100
101 this.readers = readers;
102 this.writers = writers;
103 }
104
105
106
107
108 public final void clearLinks() {
109 this.readers.clear();
110 this.writers.clear();
111 }
112
113
114
115
116 public final void dispose() {
117 }
118
119
120
121
122 public final void reset() {
123 this.setValue(null);
124 this.setAvailable(false);
125 }
126
127
128
129
130 public final void addReader(Process reader) {
131 this.readers.add(reader);
132 }
133
134
135
136
137 public final void addWriter(Process writer) {
138 this.writers.add(writer);
139 }
140
141
142
143
144 public final Set<Process> getReaders() {
145 return Collections.unmodifiableSet(this.readers);
146 }
147
148
149
150
151 public final Set<Process> getWriters() {
152 return Collections.unmodifiableSet(this.writers);
153 }
154
155
156
157
158 public final void removeReader(Process reader) {
159 this.readers.remove(reader);
160 }
161
162
163
164
165 public final void removeWriter(Process writer) {
166 this.writers.remove(writers);
167 }
168
169
170
171
172 public Lock readLock() {
173 return rwLock.readLock();
174 }
175
176
177
178
179 public Lock writeLock() {
180 return rwLock.writeLock();
181 }
182
183
184
185
186 public boolean isAvailable() {
187 availLock.lock();
188 try {
189 return this.available;
190 } finally {
191 availLock.unlock();
192 }
193 }
194
195
196
197
198 public void setAvailable(boolean available) {
199 availLock.lock();
200 try {
201 this.available = available;
202 if (available) {
203 LOG.debug("Signaling " + this);
204 cAvailable.signalAll();
205 }
206 } finally {
207 availLock.unlock();
208 }
209 }
210
211
212
213
214 public void waitAvailable() throws InterruptedException {
215 availLock.lock();
216 try {
217 if (!this.isAvailable()) {
218 LOG.debug("Waiting for " + this);
219 cAvailable.await();
220 }
221 } finally {
222 availLock.unlock();
223 }
224 }
225
226
227
228
229 @SuppressWarnings("unchecked")
230 public ReferenceSupport<T> copy() {
231 try {
232 ReferenceSupport<T> copy = (ReferenceSupport<T>)clone();
233
234 copy.rwLock = new ReentrantReadWriteLock(true);
235 copy.availLock = new ReentrantLock(true);
236 copy.cAvailable = copy.availLock.newCondition();
237
238 copy.readers = new HashSet<Process>(this.readers);
239 copy.writers = new HashSet<Process>(this.writers);
240
241 return copy;
242 } catch (CloneNotSupportedException e) {
243 throw new InternalError("Unexpected CloneNotSupported exception");
244 }
245 }
246 }