1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package uk.ac.rdg.resc.jstyx.server;
30
31 import org.apache.mina.common.ByteBuffer;
32
33 import java.util.Hashtable;
34 import java.util.Vector;
35 import java.util.Enumeration;
36 import java.util.Iterator;
37
38 import uk.ac.rdg.resc.jstyx.StyxException;
39 import uk.ac.rdg.resc.jstyx.types.ULong;
40 import uk.ac.rdg.resc.jstyx.server.StyxSessionState;
41 import uk.ac.rdg.resc.jstyx.messages.RerrorMessage;
42
43 /***
44 * A wrapper for a StyxFile that implements asynchronous behaviour: the first
45 * read from a given client returns the file's data as normal. If the file is not
46 * closed, subsequent reads from the same client block until the file content is
47 * changed.
48 *
49 * @author Jon Blower
50 * $Revision: 609 $
51 * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
52 * $Log$
53 * Revision 1.13 2005/11/14 21:32:30 jonblower
54 * Got SGSRun working for SC2005 demo
55 *
56 * Revision 1.12 2005/11/04 19:33:41 jonblower
57 * Changed contentsChanged() to fileContentsChanged() in StyxFileChangeListener
58 *
59 * Revision 1.11 2005/11/04 09:12:05 jonblower
60 * Made baseFile a protected field (instead of private)
61 *
62 * Revision 1.10 2005/09/08 07:08:59 jonblower
63 * Removed "String user" from list of parameters to StyxFile.write()
64 *
65 * Revision 1.9 2005/08/10 18:35:28 jonblower
66 * Added simple test main() function
67 *
68 * Revision 1.8 2005/06/20 07:17:35 jonblower
69 * Wrapped SGSParamFile as AsyncStyxFile
70 *
71 * Revision 1.7 2005/05/11 10:33:50 jonblower
72 * Implemented MonitoredFileOnDisk.java
73 *
74 * Revision 1.6 2005/05/10 08:02:18 jonblower
75 * Changes related to implementing MonitoredFileOnDisk
76 *
77 * Revision 1.5 2005/05/09 07:10:37 jonblower
78 * Minor changes
79 *
80 * Revision 1.4 2005/03/24 09:48:31 jonblower
81 * Changed 'count' from long to int throughout for reading and writing
82 *
83 * Revision 1.3 2005/03/16 17:56:23 jonblower
84 * Replaced use of java.nio.ByteBuffer with MINA's ByteBuffer to minimise copying of buffers
85 *
86 * Revision 1.2 2005/03/11 14:02:16 jonblower
87 * Merged MINA-Test_20059309 into main line of development
88 *
89 * Revision 1.1.1.1.2.1 2005/03/09 19:44:18 jonblower
90 * Changes concerned with migration to MINA
91 *
92 * Revision 1.1.1.1 2005/02/16 18:58:31 jonblower
93 * Initial import
94 *
95 */
96 public class AsyncStyxFile extends StyxFile implements StyxFileChangeListener
97 {
98
99 protected StyxFile baseFile;
100 private Hashtable knownClients;
101 private Vector clientQueue;
102 private long minReplyInterval;
103
104
105
106 /***
107 * Creates a new AsyncStyxFile with the same name as the underlying file.
108 * (Note: this must be placed in a different directory from that of the
109 * underlying file to prevent name conflicts.
110 */
111 public AsyncStyxFile(StyxFile file) throws StyxException
112 {
113 this(file, file.getName());
114 }
115
116 public AsyncStyxFile(StyxFile file, String name) throws StyxException
117 {
118 this(file, name, 0666);
119 }
120
121 public AsyncStyxFile(StyxFile file, String name, int permissions) throws StyxException
122 {
123 this(file, name, permissions, false, false);
124 }
125
126 public AsyncStyxFile(StyxFile file, String name, int permissions,
127 boolean isAppendOnly, boolean isExclusive)
128 throws StyxException
129 {
130 super(name, permissions, isAppendOnly, isExclusive);
131 this.baseFile = file;
132 this.knownClients = new Hashtable();
133 this.clientQueue = new Vector();
134 this.minReplyInterval = 0;
135
136 this.baseFile.addChangeListener(this);
137 }
138
139 /***
140 * Sets a minimum reply interval, i.e. the minimum amount of time that must
141 * elapse between successive replies to the same client. This is most useful
142 * when using this AsyncStyxFile to provide notification of a very
143 * rapidly-changing quantity as it can save a lot of unnecessary messages
144 * being sent. (This is set to zero when the AsyncStyxFile is first created)
145 */
146 public void setMinReplyInterval(float seconds)
147 {
148 this.minReplyInterval = new Float(seconds * 1000).longValue();
149 }
150
151 public synchronized void read(StyxFileClient client, long offset,
152 int count, int tag) throws StyxException
153 {
154
155 ClientInfo cinfo = (ClientInfo)this.knownClients.get(client);
156 if (cinfo == null)
157 {
158
159 this.replyClient(cinfo, client, offset, count, tag);
160 }
161 else
162 {
163 long now = System.currentTimeMillis();
164
165
166
167 if (offset > cinfo.offset)
168 {
169 this.replyClient(cinfo, client, offset, count, tag);
170 }
171 else
172 {
173
174
175
176 long timeSinceLastReply = now - cinfo.timeLastReply;
177 if (cinfo.versionLastRead != this.getVersion()
178 && timeSinceLastReply >= this.minReplyInterval)
179 {
180 this.replyClient(cinfo, client, offset, count, tag);
181 }
182 else
183 {
184
185
186
187
188 cinfo.offset = offset;
189 cinfo.count = count;
190 cinfo.tag = tag;
191 this.clientQueue.add(cinfo);
192 }
193 }
194 }
195 }
196
197 /***
198 * This simply calls write() in the contained StyxFile object. This will
199 * cause the contentsChanged() method to be called, notifying any waiting
200 * clients of the change to the file
201 */
202 public synchronized void write(StyxFileClient client, long offset,
203 int count, ByteBuffer data, boolean truncate, int tag)
204 throws StyxException
205 {
206 this.baseFile.write(client, offset, count, data, truncate, tag);
207 }
208
209 /***
210 * Called to notify that the underlying data have changed.
211 * @param force If this is true, clients will be notified of changes to
212 * the underlying data, irrespective of how long they have waited.
213 */
214 public void fileContentsChanged(boolean force)
215 {
216 this.incrementVersion();
217
218 synchronized (this.clientQueue)
219 {
220 Iterator it = this.clientQueue.iterator();
221 while(it.hasNext())
222 {
223 ClientInfo cinfo = (ClientInfo)it.next();
224
225
226 long now = System.currentTimeMillis();
227 long timeSinceLastReply = now - cinfo.timeLastReply;
228 if (force || timeSinceLastReply > this.minReplyInterval)
229 {
230 try
231 {
232 this.replyClient(cinfo, cinfo.client, cinfo.offset, cinfo.count, cinfo.tag);
233 }
234 catch(StyxException se)
235 {
236
237
238 StyxServerProtocolHandler.reply(cinfo.client.getSession(),
239 new RerrorMessage(se.getMessage()), cinfo.tag);
240 }
241 it.remove();
242 }
243 }
244 }
245
246 }
247
248 /***
249 * Gets the StyxFile that is wrapped by this AsyncStyxFile
250 */
251 public StyxFile getBaseFile()
252 {
253 return this.baseFile;
254 }
255
256 /***
257 * @return the length of the StyxFile that is wrapped by this AsyncStyxFile
258 */
259 public ULong getLength()
260 {
261 return this.baseFile.getLength();
262 }
263
264 /***
265 * This is called when the contents of the underlying StyxFile are changed
266 * (required by StyxFileChangeListener interface).
267 */
268 public void fileContentsChanged()
269 {
270 this.fileContentsChanged(false);
271 }
272
273 /***
274 * Free all resources associated with this file
275 */
276 public synchronized void delete()
277 {
278 super.delete();
279 this.knownClients.clear();
280 this.clientQueue.clear();
281 }
282
283 /***
284 * Called when a client disconnects from the file. We remove all references
285 * to this client
286 */
287 protected synchronized void clientDisconnected(StyxFileClient client)
288 {
289 this.knownClients.remove(client);
290 synchronized (this.clientQueue)
291 {
292 Enumeration en = this.clientQueue.elements();
293 while(en.hasMoreElements())
294 {
295 ClientInfo cinfo = (ClientInfo)en.nextElement();
296
297
298 if (client == cinfo.client)
299 {
300 this.knownClients.remove(cinfo);
301 }
302 }
303 }
304 }
305
306 /***
307 * Replies to a client and sets the fields of the ClientInfo
308 * @throws StyxException if there was an error reading the underlying StyxFile
309 */
310 private void replyClient(ClientInfo cinfo, StyxFileClient client, long offset,
311 int count, int tag) throws StyxException
312 {
313 long now = System.currentTimeMillis();
314 if (cinfo == null)
315 {
316 cinfo = new ClientInfo(client, tag, offset, count, this.getVersion(), now);
317 this.knownClients.put(client, cinfo);
318 }
319 cinfo.offset = offset;
320 cinfo.count = count;
321 cinfo.versionLastRead = this.getVersion();
322 cinfo.timeLastReply = now;
323 cinfo.tag = tag;
324 this.baseFile.read(client, offset, count, tag);
325 }
326
327 /***
328 * Class representing a client that is waiting for a reply from an
329 * asynchronous read/write
330 */
331 private static class ClientInfo
332 {
333 private StyxFileClient client;
334 private int tag;
335 private long offset;
336 private int count;
337 private long versionLastRead;
338 private long timeLastReply;
339
340 private ClientInfo(StyxFileClient client, int tag, long offset, int count,
341 long versionLastRead, long timeLastReply)
342 {
343 this.client = client;
344 this.tag = tag;
345 this.offset = offset;
346 this.count = count;
347 this.versionLastRead = versionLastRead;
348 this.timeLastReply = timeLastReply;
349 }
350 }
351
352 /***
353 * Simple test function
354 */
355 public static void main(String[] args) throws Exception
356 {
357 AsyncStyxFile asf = new AsyncStyxFile(new InMemoryFile("test"));
358 StyxDirectory root = new StyxDirectory("/");
359 root.addChild(asf);
360 new StyxServer(2911, root).start();
361 }
362
363 }