View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   *
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.gridservice.server;
30  
31  import java.util.Vector;
32  import java.io.File;
33  import java.io.RandomAccessFile;
34  import java.io.FileInputStream;
35  import java.io.FileNotFoundException;
36  import java.io.IOException;
37  import java.nio.channels.FileChannel;
38  
39  import org.apache.log4j.Logger;
40  import org.apache.mina.common.ByteBuffer;
41  
42  import uk.ac.rdg.resc.jstyx.server.StyxFile;
43  import uk.ac.rdg.resc.jstyx.server.StyxFileChangeListener;
44  import uk.ac.rdg.resc.jstyx.server.StyxFileClient;
45  import uk.ac.rdg.resc.jstyx.server.StyxServerProtocolHandler;
46  import uk.ac.rdg.resc.jstyx.types.ULong;
47  import uk.ac.rdg.resc.jstyx.messages.RerrorMessage;
48  import uk.ac.rdg.resc.jstyx.StyxException;
49  
50  /***
51   * A file that can be read to obtain data from a file that is put out by an
52   * executable in a Styx Grid Service
53   * @todo this repeats code in FileOnDisk and GeneralCachingStreamReader.  Can
54   * we refactor?
55   *
56   * @author Jon Blower
57   * $Revision: 579 $
58   * $Date: 2006-02-17 09:26:08 +0000 (Fri, 17 Feb 2006) $
59   * $Log$
60   * Revision 1.3  2006/02/17 09:26:08  jonblower
61   * Removed reference to SGSParamFile
62   *
63   * Revision 1.2  2005/11/11 21:57:21  jonblower
64   * Implemented passing of URLs to input files
65   *
66   * Revision 1.1  2005/11/10 19:50:43  jonblower
67   * Added code to handle output files
68   *
69   */
70  
71  public class SGSOutputFile extends StyxFile implements SGSInstanceChangeListener
72  {
73      private static final Logger log = Logger.getLogger(SGSOutputFile.class);
74      
75      private File file; // The underlying file that is produced by the executable
76      private Vector requestQueue; // Queue of DataRequests that have not yet been fulfilled
77      private boolean serviceFinished;
78      
79      /*** Creates a new instance of SGSOutputFile */
80      public SGSOutputFile(File file, StyxGridServiceInstance instance)
81          throws StyxException
82      {
83          super(file.getName(), 0444); // File is read-only
84          this.file = file;
85          instance.addChangeListener(this);
86          this.serviceFinished = false;
87          this.requestQueue = new Vector();
88          new FileMonitor().start();
89      }
90      
91      public synchronized void read(StyxFileClient client, long offset, int count,
92          int tag) throws StyxException
93      {
94          DataRequest dr = new DataRequest(client, offset, count, tag);
95          if (!processRequest(dr))
96          {
97              // Add the request to the queue
98              synchronized(this.requestQueue)
99              {
100                 this.requestQueue.add(dr);
101             }
102         }
103     }
104     
105     /***
106      * @return true if the request was processed successfully, false otherwise
107      */
108     private synchronized boolean processRequest(DataRequest dr)
109     {
110         if (dr.offset < this.getLength().asLong())
111         {
112             // There are data available and we can return them to the client
113             this.readAndReply(dr);
114             return true;
115         }
116         else if (this.serviceFinished)
117         {
118             // We're not going to get any more data from this file.  Reply with
119             // zero bytes to signify EOF
120             this.replyRead(dr.client, new byte[0], dr.tag);
121             return true;
122         }
123         else
124         {
125             // Keep this request in the queue
126             return false;
127         }        
128     }
129     
130     /***
131      * Processes all outstanding requests. This is called when more data arrive
132      * or when EOF is reached.
133      */
134     private void processOutstandingRequests()
135     {
136         // We should already have the lock on the cache, but let's make sure
137         synchronized(this.requestQueue)
138         {
139             for (int i = 0; i < this.requestQueue.size(); )
140             {
141                 DataRequest dr = (DataRequest)this.requestQueue.get(i);
142                 boolean processed = this.processRequest(dr);
143                 if (processed)
144                 {
145                     // If we have processed the request successfully, remove
146                     // it from the queue
147                     this.requestQueue.remove(i);
148                 }
149                 else
150                 {
151                     i++;
152                 }
153             }
154         }
155     }
156     
157     /*** 
158      * Reads from the underlying file and replies to the client. Before this
159      * method is called, we must check to see if dr.offset is less than the 
160      * length of the file
161      */
162     private synchronized void readAndReply(DataRequest dr)
163     {
164         try
165         {
166             // Open a new FileChannel for reading
167             FileChannel chan = new FileInputStream(this.file).getChannel();
168 
169             // Get a ByteBuffer from MINA's pool.  This becomes part of the Rread
170             // message and is automatically released when the message is sent
171             ByteBuffer buf = ByteBuffer.allocate(dr.count);
172             // Make sure the position and limit are set correctly (remember that
173             // the actual buffer size might be larger than requested)
174             buf.position(0).limit(dr.count);
175 
176             // Read from the channel. If no bytes were read (due to EOF), the
177             // position of the buffer will not have changed
178             int numRead = chan.read(buf.buf(), dr.offset);
179             log.debug("Read " + numRead + " bytes from " + this.file.getPath());
180             // Close the channel
181             chan.close();
182 
183             buf.flip();
184             if (numRead > 0)
185             {
186                 this.replyRead(dr.client, buf, dr.tag);
187             }
188             else
189             {
190                 RerrorMessage rErrMsg = new RerrorMessage("Internal error: " +
191                     "zero bytes read from " + this.file.getName());
192                 StyxServerProtocolHandler.reply(dr.client.getSession(),
193                     rErrMsg, dr.tag);
194             }
195         }
196         catch(FileNotFoundException fnfe)
197         {
198             RerrorMessage rErrMsg = new RerrorMessage("Internal error: file " +
199                this.file.getName() + " does not exist");
200             StyxServerProtocolHandler.reply(dr.client.getSession(),
201                 rErrMsg, dr.tag);
202         }
203         catch(IOException ioe)
204         {
205             RerrorMessage rErrMsg = new RerrorMessage("An error of class " +
206                 ioe.getClass() + " occurred when trying to read from " +
207                 this.getFullPath() + ": " + ioe.getMessage());
208             StyxServerProtocolHandler.reply(dr.client.getSession(),
209                 rErrMsg, dr.tag);
210         }
211     }
212     
213     public ULong getLength()
214     {
215         return new ULong(this.file.length());
216     }
217     
218     /***
219      * Called automatically when the status of the Styx Grid Service instance
220      * changes.
221      */
222     public void statusChanged(StatusCode newStatus)
223     {
224         log.debug("Got newStatus = " + newStatus.getText());
225         if (newStatus == StatusCode.FINISHED ||
226             newStatus == StatusCode.ABORTED ||
227             newStatus == StatusCode.ERROR)
228         {
229             this.serviceFinished = true;
230         }
231         this.processOutstandingRequests();
232     }
233     
234     /*** 
235      * Simple thread that monitors the state of the underlying file.  When the
236      * state changes, sends a signal to process outstanding requests
237      */
238     private class FileMonitor extends Thread
239     {
240         private long length = 0L;
241         
242         public void run()
243         {
244             while(!serviceFinished)
245             {
246                 long newLength = file.length();
247                 if (newLength != this.length)
248                 {
249                     this.length = newLength;
250                     processOutstandingRequests();
251                 }
252                 // Wait for 2 seconds
253                 try
254                 {
255                     Thread.sleep(2000);
256                 }
257                 catch(InterruptedException ie)
258                 {
259                     // do nothing
260                 }
261             }
262         }
263     }
264     
265     /***
266      * Class representing a request for data
267      */
268     private static class DataRequest
269     {
270         private StyxFileClient client;
271         private long offset; // The offset requested by the client
272         private int count; // The number of bytes requested by the client
273         private int tag;
274         
275         private DataRequest(StyxFileClient client, long offset, int count, int tag)
276         {
277             this.client = client;
278             this.offset = offset;
279             this.count = count;
280             this.tag = tag;
281         }
282     }
283     
284 }