1 /*
2 * Copyright (c) 2005 The University of Reading
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. Neither the name of the University of Reading, nor the names of the
14 * authors or contributors may be used to endorse or promote products
15 * derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29 package uk.ac.rdg.resc.jstyx.gridservice.server;
30
31 import java.io.InputStream;
32 import java.io.RandomAccessFile;
33 import java.io.FileNotFoundException;
34 import java.io.IOException;
35 import java.io.File;
36
37 import java.util.Vector;
38 import java.util.Iterator;
39
40 import org.apache.log4j.Logger;
41
42 /***
43 * Abstract class that reads from an InputStream in a separate thread, caching
44 * the results to the local hard disk. Permits multiple clients to make
45 * DataRequests for data, replying with chunks of data when they are ready.
46 *
47 * @author Jon Blower
48 * $Revision: 507 $
49 * $Date: 2005-12-01 08:21:56 +0000 (Thu, 01 Dec 2005) $
50 * $Log$
51 * Revision 1.5 2005/12/01 08:21:56 jonblower
52 * Fixed javadoc comments
53 *
54 * Revision 1.4 2005/09/01 07:49:39 jonblower
55 * Set to print stack trace of IOException whether or not logger is debug-enabled
56 *
57 * Revision 1.3 2005/06/10 07:54:49 jonblower
58 * Added code to convert event-based StreamViewer to InputStream-based one
59 *
60 * Revision 1.2 2005/05/27 21:22:39 jonblower
61 * Further development of caching stream readers
62 *
63 * Revision 1.1 2005/05/27 17:02:59 jonblower
64 * Initial import
65 *
66 */
67 public abstract class GeneralCachingStreamReader
68 {
69 private static final Logger log = Logger.getLogger(GeneralCachingStreamReader.class);
70
71 private InputStream is; // The input stream from which we will read
72 private RandomAccessFile cache; // The cache itself (a file on the local filesystem)
73 private Integer cacheLock = new Integer(0); // We use this for synchronization; it has no other purpose
74 private File cacheFile; // The cache file
75 private boolean started; // True if we have started reading from the stream
76 private Exception globEx; // If this is non-null an error has occurred and
77 // any attempt to read the stream will result
78 // in this message being returned until it is reset.
79 private Vector requestQueue; // Queue of DataRequests that have not yet been fulfilled
80 private long cacheLength; // The length of the cache (i.e. the size of the cache file)
81 // Note that we could also get this from cache.length() but
82 // this should be faster (saves a call to the OS)
83 private boolean eof; // This will be true when the stream has been completely read.
84 private boolean running; // This is true if there is an active thread reading from the input stream
85
86 public GeneralCachingStreamReader()
87 {
88 this.is = null;
89 this.requestQueue = new Vector();
90 this.running = false;
91 }
92
93 /***
94 * Gets the File that is being used to cache the contents of the stream
95 */
96 public File getCacheFile()
97 {
98 return this.cacheFile;
99 }
100
101 /***
102 * Gets the size of the cache in bytes
103 */
104 public long getCacheLength()
105 {
106 return this.cacheLength;
107 }
108
109 public void setCacheFile(File cacheFile) throws FileNotFoundException
110 {
111 this.cacheFile = cacheFile;
112 }
113
114 /***
115 * @return true if we have reached end of file
116 */
117 public boolean isEOF()
118 {
119 return this.eof;
120 }
121
122 /***
123 * Sets the input stream from which the CachingStreamReader gets data
124 * and immediately starts reading from this stream
125 * @throws IllegalStateException if this object is already reading from a stream
126 * @throws IOException if the cache file could not be created
127 */
128 public void startReading(InputStream is) throws IOException
129 {
130 if (this.running)
131 {
132 throw new IllegalStateException("This CachingStreamReader is already running");
133 }
134
135 if (this.cache != null)
136 {
137 // Delete any previous cache files
138 this.cache.close();
139 this.cacheFile.delete();
140 }
141
142 this.cache = new RandomAccessFile(this.cacheFile, "rw");
143 log.info("Created cache file " + this.cacheFile.getPath());
144
145 this.cacheLength = 0;
146 this.globEx = null;
147 this.is = is;
148 // Start reading from the stream immediately
149 new StreamGobbler().start();
150 }
151
152 /***
153 * Get data from the stream's cache.
154 */
155 public void read(DataRequest dr)
156 {
157 log.debug("Received request: offset = " + dr.offset + ", count = " + dr.count);
158 synchronized(this.cacheLock)
159 {
160 // Try to process the request immediately
161 boolean processed = this.processRequest(dr);
162 if (!processed)
163 {
164 // if we couldn't fulfil this request, add it to the queue
165 synchronized(this.requestQueue)
166 {
167 this.requestQueue.add(dr);
168 }
169 }
170 }
171 }
172
173 /***
174 * Processes all outstanding requests. This is called when more data arrive
175 * or when EOF is reached.
176 */
177 private void processOutstandingRequests()
178 {
179 // We should already have the lock on the cache, but let's make sure
180 synchronized(this.cacheLock)
181 {
182 synchronized(this.requestQueue)
183 {
184 for (int i = 0; i < this.requestQueue.size(); )
185 {
186 DataRequest dr = (DataRequest)this.requestQueue.get(i);
187 boolean processed = this.processRequest(dr);
188 if (processed)
189 {
190 // If we have processed the request successfully, remove
191 // it from the queue
192 this.requestQueue.remove(i);
193 }
194 else
195 {
196 i++;
197 }
198 }
199 }
200 }
201 }
202
203 /***
204 * @return true if the request was processed successfully, false otherwise
205 */
206 private boolean processRequest(DataRequest dr)
207 {
208 // We get the cache's lock so no data can be written to the cache
209 // before we're done.
210 synchronized(this.cacheLock)
211 {
212 try
213 {
214 if (log.isDebugEnabled())
215 {
216 log.debug("Processing request: offset = " + dr.offset +
217 ", count = " + dr.count + ": cacheLength = " +
218 this.cacheLength);
219 }
220 if (dr.offset >= this.cacheLength)
221 {
222 // We're asking for data not (yet) in the cache
223 if (this.eof)
224 {
225 // We've reached the end of the stream.
226 log.debug("Offset >= cache length and EOF reached."
227 + " Returning 0 bytes");
228 this.newData(dr, new byte[0], 0, 0);
229 return true;
230 }
231 else
232 {
233 // The requested data haven't yet arrived in the cache
234 if (this.globEx != null)
235 {
236 // An error has occurred so the data will never
237 // arrive.
238 log.debug("Got error message. Returning Rerror");
239 this.error(dr, this.globEx);
240 return true;
241 }
242 else
243 {
244 log.debug("Offset >= cache length but EOF not reached."
245 + " Holding request.");
246 return false;
247 }
248 }
249 }
250 else
251 {
252 // At least some of the requested data are in the cache
253 // Seek to the right position in the cache file
254 this.cache.seek(dr.offset);
255 // Try to read the requested amount of data
256 // TODO: check the number of bytes remaining?
257 byte[] arr = new byte[dr.count];
258 log.debug("Reading " + arr.length + " bytes from cache at offset "
259 + dr.offset);
260 int n = this.cache.read(arr);
261 log.debug("Actually read " + n + " bytes");
262 if (n < 0)
263 {
264 // We reached EOF - this shouldn't happen because we've already
265 // checked the file length
266 log.error("Internal error: no bytes read from stream");
267 return false;
268 }
269 else
270 {
271 // Return the bytes read to the client
272 log.debug("Returning " + n + " bytes to client");
273 this.newData(dr, arr, 0, n);
274 return true;
275 }
276 }
277 }
278 catch(IOException ioe)
279 {
280 // I think this exception is thrown sometimes. Look out for it.
281 ioe.printStackTrace();
282 // We have to reply with an error message here (as opposed to
283 // throwing a StyxException) because we're no longer within
284 // the readFile() method
285 this.error(dr, ioe);
286 return true; // We have processed the message
287 }
288 }
289 }
290
291 /***
292 * This thread continuously reads from the stream and puts data in the cache,
293 * notifying waiting clients of the new data.
294 */
295 private class StreamGobbler extends Thread
296 {
297 public void run()
298 {
299 running = true;
300 eof = false;
301 // Create a temporary 8K byte buffer
302 // TODO: is this size appropriate? Might be best to set it to the
303 // maximum payload size on the connection. Or, for services that output
304 // very little data, a much smaller size may be appropriate so that
305 // clients aren't kept waiting for long between updates
306 byte[] arr = new byte[8192];
307 try
308 {
309 while(!eof)
310 {
311 int n = is.read(arr);
312 log.debug("Read " + n + " bytes from input stream");
313 synchronized(cacheLock)
314 {
315 if (n < 0)
316 {
317 // We've reached the end of the stream
318 eof = true;
319 is.close();
320 }
321 else
322 {
323 // put the newly-read bytes to the end of the cache
324 cache.seek(cacheLength);
325 cache.write(arr, 0, n);
326 cacheLength += n;
327 // For some reason, the length() of the RandomAccessFile
328 // isn't always correct (particularly when first created)
329 // so we have to set it explicitly.
330 // TODO: could we just set this to zero when the
331 // RAF is first created? Will we notice?
332 cache.setLength(cacheLength);
333 }
334 // Now process any outstanding requests
335 processOutstandingRequests();
336 }
337 }
338 }
339 catch(Exception e)
340 {
341 if (log.isDebugEnabled())
342 {
343 e.printStackTrace();
344 }
345 // Set the global exception object so that any further
346 // attemps to read this stream return an error.
347 globEx = e;
348 }
349 // now we close the cache and open it read-only; this allows us to
350 // examine the contents of the cache without going through the
351 // Styx interface (useful for debugging)
352 synchronized(cache)
353 {
354 try
355 {
356 cache.close();
357 cache = new RandomAccessFile(cacheFile, "r");
358 }
359 catch(Exception e)
360 {
361 if (log.isDebugEnabled())
362 {
363 e.printStackTrace();
364 }
365 // Set the global error message
366 globEx = e;
367 }
368 }
369 running = false;
370 }
371 }
372
373 /***
374 * Class representing a client that is waiting for data
375 * @todo this is very similar to the ClientInfo class in AsyncStyxFile.
376 * Can we avoid this code repetition?
377 */
378 public static class DataRequest
379 {
380 // TODO create get() methods
381 public Object client;
382 public long offset; // The offset requested by the client
383 public int count; // number of bytes requested by the client
384
385 public DataRequest(Object client, long offset, int count)
386 {
387 this.client = client;
388 this.offset = offset;
389 this.count = count;
390 }
391 }
392
393 public abstract void newData(DataRequest originalRequest, byte[] data,
394 int offset, int count);
395
396 public abstract void error(DataRequest originalRequest, Exception e);
397
398 /***
399 * Called when the file is removed from the server. This deletes the
400 * underlying cache file.
401 */
402 public synchronized void delete()
403 {
404 if (this.cache != null)
405 {
406 try
407 {
408 this.cache.close();
409 }
410 catch(IOException ioe)
411 {
412 ioe.printStackTrace();
413 }
414 this.cacheFile.delete();
415 }
416 return;
417 }
418
419 }