View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   * 
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.gridservice.server;
30  
31  import java.io.InputStream;
32  import java.io.RandomAccessFile;
33  import java.io.FileNotFoundException;
34  import java.io.IOException;
35  import java.io.File;
36  
37  import java.util.Vector;
38  import java.util.Iterator;
39  
40  import org.apache.log4j.Logger;
41  
42  /***
43   * Abstract class that reads from an InputStream in a separate thread, caching
44   * the results to the local hard disk.  Permits multiple clients to make
45   * DataRequests for data, replying with chunks of data when they are ready.
46   *
47   * @author Jon Blower
48   * $Revision: 507 $
49   * $Date: 2005-12-01 08:21:56 +0000 (Thu, 01 Dec 2005) $
50   * $Log$
51   * Revision 1.5  2005/12/01 08:21:56  jonblower
52   * Fixed javadoc comments
53   *
54   * Revision 1.4  2005/09/01 07:49:39  jonblower
55   * Set to print stack trace of IOException whether or not logger is debug-enabled
56   *
57   * Revision 1.3  2005/06/10 07:54:49  jonblower
58   * Added code to convert event-based StreamViewer to InputStream-based one
59   *
60   * Revision 1.2  2005/05/27 21:22:39  jonblower
61   * Further development of caching stream readers
62   *
63   * Revision 1.1  2005/05/27 17:02:59  jonblower
64   * Initial import
65   *
66   */
67  public abstract class GeneralCachingStreamReader
68  {
69      private static final Logger log = Logger.getLogger(GeneralCachingStreamReader.class);
70      
71      private InputStream is; // The input stream from which we will read
72      private RandomAccessFile cache; // The cache itself (a file on the local filesystem)
73      private Integer cacheLock = new Integer(0); // We use this for synchronization; it has no other purpose
74      private File cacheFile;   // The cache file
75      private boolean started;  // True if we have started reading from the stream
76      private Exception globEx; // If this is non-null an error has occurred and 
77                                // any attempt to read the stream will result
78                                // in this message being returned until it is reset.
79      private Vector requestQueue; // Queue of DataRequests that have not yet been fulfilled
80      private long cacheLength; // The length of the cache (i.e. the size of the cache file)
81                                // Note that we could also get this from cache.length() but
82                                // this should be faster (saves a call to the OS)
83      private boolean eof;      // This will be true when the stream has been completely read.
84      private boolean running;  // This is true if there is an active thread reading from the input stream
85      
86      public GeneralCachingStreamReader()
87      {
88          this.is = null;
89          this.requestQueue = new Vector();
90          this.running = false;
91      }
92      
93      /***
94       * Gets the File that is being used to cache the contents of the stream
95       */
96      public File getCacheFile()
97      {
98          return this.cacheFile;
99      }
100     
101     /***
102      * Gets the size of the cache in bytes
103      */
104     public long getCacheLength()
105     {
106         return this.cacheLength;
107     }
108     
109     public void setCacheFile(File cacheFile) throws FileNotFoundException
110     {
111         this.cacheFile = cacheFile;
112     }
113     
114     /***
115      * @return true if we have reached end of file
116      */
117     public boolean isEOF()
118     {
119         return this.eof;
120     }
121     
122     /***
123      * Sets the input stream from which the CachingStreamReader gets data
124      * and immediately starts reading from this stream
125      * @throws IllegalStateException if this object is already reading from a stream
126      * @throws IOException if the cache file could not be created
127      */
128     public void startReading(InputStream is) throws IOException
129     {
130         if (this.running)
131         {
132             throw new IllegalStateException("This CachingStreamReader is already running");
133         }
134         
135         if (this.cache != null)
136         {
137             // Delete any previous cache files
138             this.cache.close();
139             this.cacheFile.delete();
140         }
141         
142         this.cache = new RandomAccessFile(this.cacheFile, "rw");
143         log.info("Created cache file " + this.cacheFile.getPath());
144         
145         this.cacheLength = 0;
146         this.globEx = null;
147         this.is = is;
148         // Start reading from the stream immediately
149         new StreamGobbler().start();
150     }
151     
152     /***
153      * Get data from the stream's cache.
154      */
155     public void read(DataRequest dr)
156     {
157         log.debug("Received request: offset = " + dr.offset + ", count = " + dr.count);
158         synchronized(this.cacheLock)
159         {
160             // Try to process the request immediately
161             boolean processed = this.processRequest(dr);
162             if (!processed)
163             {
164                 // if we couldn't fulfil this request, add it to the queue
165                 synchronized(this.requestQueue)
166                 {
167                     this.requestQueue.add(dr);
168                 }
169             }
170         }
171     }
172     
173     /***
174      * Processes all outstanding requests. This is called when more data arrive
175      * or when EOF is reached.
176      */
177     private void processOutstandingRequests()
178     {
179         // We should already have the lock on the cache, but let's make sure
180         synchronized(this.cacheLock)
181         {
182             synchronized(this.requestQueue)
183             {
184                 for (int i = 0; i < this.requestQueue.size(); )
185                 {
186                     DataRequest dr = (DataRequest)this.requestQueue.get(i);
187                     boolean processed = this.processRequest(dr);
188                     if (processed)
189                     {
190                         // If we have processed the request successfully, remove
191                         // it from the queue
192                         this.requestQueue.remove(i);
193                     }
194                     else
195                     {
196                         i++;
197                     }
198                 }
199             }
200         }
201     }
202     
203     /***
204      * @return true if the request was processed successfully, false otherwise
205      */
206     private boolean processRequest(DataRequest dr)
207     {
208         // We get the cache's lock so no data can be written to the cache
209         // before we're done.
210         synchronized(this.cacheLock)
211         {
212             try
213             {
214                 if (log.isDebugEnabled())
215                 {
216                     log.debug("Processing request: offset = " + dr.offset +
217                         ", count = " + dr.count + ": cacheLength = " +
218                         this.cacheLength);
219                 }
220                 if (dr.offset >= this.cacheLength)
221                 {
222                     // We're asking for data not (yet) in the cache
223                     if (this.eof)
224                     {
225                         // We've reached the end of the stream.
226                         log.debug("Offset >= cache length and EOF reached."
227                             + " Returning 0 bytes");
228                         this.newData(dr, new byte[0], 0, 0);
229                         return true;
230                     }
231                     else
232                     {
233                         // The requested data haven't yet arrived in the cache
234                         if (this.globEx != null)
235                         {
236                             // An error has occurred so the data will never
237                             // arrive.
238                             log.debug("Got error message. Returning Rerror");
239                             this.error(dr, this.globEx);
240                             return true;
241                         }
242                         else
243                         {
244                             log.debug("Offset >= cache length but EOF not reached."
245                                 + " Holding request.");
246                             return false;
247                         }
248                     }
249                 }
250                 else
251                 {
252                     // At least some of the requested data are in the cache
253                     // Seek to the right position in the cache file
254                     this.cache.seek(dr.offset);
255                     // Try to read the requested amount of data
256                     // TODO: check the number of bytes remaining?
257                     byte[] arr = new byte[dr.count];
258                     log.debug("Reading " + arr.length + " bytes from cache at offset "
259                         + dr.offset);
260                     int n = this.cache.read(arr);
261                     log.debug("Actually read " + n + " bytes");
262                     if (n < 0)
263                     {
264                         // We reached EOF - this shouldn't happen because we've already
265                         // checked the file length
266                         log.error("Internal error: no bytes read from stream");
267                         return false;
268                     }
269                     else
270                     {
271                         // Return the bytes read to the client
272                         log.debug("Returning " + n + " bytes to client");
273                         this.newData(dr, arr, 0, n);
274                         return true;
275                     }
276                 }
277             }
278             catch(IOException ioe)
279             {
280                 // I think this exception is thrown sometimes.  Look out for it.
281                 ioe.printStackTrace();
282                 // We have to reply with an error message here (as opposed to
283                 // throwing a StyxException) because we're no longer within
284                 // the readFile() method
285                 this.error(dr, ioe);
286                 return true; // We have processed the message
287             }
288         }
289     }
290     
291     /***
292      * This thread continuously reads from the stream and puts data in the cache,
293      * notifying waiting clients of the new data.
294      */
295     private class StreamGobbler extends Thread
296     {
297         public void run()
298         {
299             running = true;
300             eof = false;
301             // Create a temporary 8K byte buffer
302             // TODO: is this size appropriate? Might be best to set it to the
303             // maximum payload size on the connection. Or, for services that output
304             // very little data, a much smaller size may be appropriate so that
305             // clients aren't kept waiting for long between updates
306             byte[] arr = new byte[8192];
307             try
308             {
309                 while(!eof)
310                 {
311                     int n = is.read(arr);
312                     log.debug("Read " + n + " bytes from input stream");
313                     synchronized(cacheLock)
314                     {
315                         if (n < 0)
316                         {
317                             // We've reached the end of the stream
318                             eof = true;
319                             is.close();
320                         }
321                         else
322                         {
323                             // put the newly-read bytes to the end of the cache
324                             cache.seek(cacheLength);
325                             cache.write(arr, 0, n);
326                             cacheLength += n;
327                             // For some reason, the length() of the RandomAccessFile
328                             // isn't always correct (particularly when first created)
329                             // so we have to set it explicitly.
330                             // TODO: could we just set this to zero when the
331                             // RAF is first created?  Will we notice?
332                             cache.setLength(cacheLength);
333                         }
334                         // Now process any outstanding requests
335                         processOutstandingRequests();
336                     }
337                 }
338             }
339             catch(Exception e)
340             {
341                 if (log.isDebugEnabled())
342                 {
343                     e.printStackTrace();
344                 }
345                 // Set the global exception object so that any further
346                 // attemps to read this stream return an error.
347                 globEx = e;
348             }
349             // now we close the cache and open it read-only; this allows us to
350             // examine the contents of the cache without going through the 
351             // Styx interface (useful for debugging)
352             synchronized(cache)
353             {
354                 try
355                 {
356                     cache.close();
357                     cache = new RandomAccessFile(cacheFile, "r");
358                 }
359                 catch(Exception e)
360                 {
361                     if (log.isDebugEnabled())
362                     {
363                         e.printStackTrace();
364                     }
365                     // Set the global error message
366                     globEx = e;
367                 }
368             }
369             running = false;
370         }
371     }
372     
373     /***
374      * Class representing a client that is waiting for data
375      * @todo this is very similar to the ClientInfo class in AsyncStyxFile.
376      * Can we avoid this code repetition?
377      */
378     public static class DataRequest
379     {
380         // TODO create get() methods
381         public Object client;
382         public long offset; // The offset requested by the client
383         public int count; // number of bytes requested by the client
384         
385         public DataRequest(Object client, long offset, int count)
386         {
387             this.client = client;
388             this.offset = offset;
389             this.count = count;
390         }
391     }
392     
393     public abstract void newData(DataRequest originalRequest, byte[] data,
394         int offset, int count);
395     
396     public abstract void error(DataRequest originalRequest, Exception e);
397     
398     /***
399      * Called when the file is removed from the server. This deletes the
400      * underlying cache file.
401      */
402     public synchronized void delete()
403     {
404         if (this.cache != null)
405         {
406             try
407             {
408                 this.cache.close();
409             }
410             catch(IOException ioe)
411             {
412                 ioe.printStackTrace();
413             }
414             this.cacheFile.delete();
415         }
416         return;
417     }
418     
419 }