View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   *
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.gridservice.server;
30  
31  import java.io.OutputStream;
32  import java.io.IOException;
33  import java.net.URL;
34  import java.net.MalformedURLException;
35  
36  import java.nio.channels.FileChannel;
37  import java.io.RandomAccessFile;
38  
39  import org.apache.mina.common.ByteBuffer;
40  
41  import uk.ac.rdg.resc.jstyx.StyxUtils;
42  import uk.ac.rdg.resc.jstyx.StyxException;
43  import uk.ac.rdg.resc.jstyx.types.ULong;
44  import uk.ac.rdg.resc.jstyx.server.StyxFile;
45  import uk.ac.rdg.resc.jstyx.server.StyxFileClient;
46  
47  import org.apache.log4j.Logger;
48  
49  /***
50   * A file that is used to provide input (as a file or as stdin stream) to a 
51   * Styx Grid Service.
52   *
53   * @author Jon Blower
54   * $Revision: 609 $
55   * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
56   * $Log$
57   * Revision 1.5  2005/12/09 18:42:41  jonblower
58   * Changed access privileges to write2() to protected
59   *
60   * Revision 1.4  2005/11/11 21:57:21  jonblower
61   * Implemented passing of URLs to input files
62   *
63   * Revision 1.3  2005/11/09 17:49:58  jonblower
64   * Implemented File subclass (representing fixed-name input files)
65   *
66   * Revision 1.2  2005/11/04 19:29:53  jonblower
67   * Moved code that writes to std input to SGSInputFile
68   *
69   * Revision 1.1  2005/11/04 09:12:23  jonblower
70   * Initial import
71   *
72   */
73  public abstract class SGSInputFile extends StyxFile
74  {
75      
76      private static final Logger log = Logger.getLogger(SGSInputFile.class);
77      
78      protected StyxGridServiceInstance instance;
79      protected boolean dataWritten;  // True when any data have been written to this file
80                                      // (don't necessarily have to have reached EOF)
81      
82      protected ByteBuffer candidateURLBuffer; // This is set when we have received a message that
83                                  // seems to contain a valid URL
84      protected String candidateURL; // This is set when we have received a message that
85                                  // seems to contain a valid URL
86      protected int candidateURLLength;
87      protected URL url;  // This is non-null if we have specified that this
88                          // file will be read from a URL
89      
90      private SGSInputFile(String name, StyxGridServiceInstance instance)
91          throws StyxException
92      {
93          super(name, 0222); // Input files are always write-only
94          this.instance = instance;
95          this.dataWritten = false;
96          this.candidateURL = null;
97          this.candidateURLLength = 0;
98          this.url = null;
99      }
100     
101     /***
102      * This method does the job of checking for URLs being written to the file.
103      */
104     protected void write2(StyxFileClient client, long offset, int count,
105         ByteBuffer data, boolean truncate, int tag)
106         throws StyxException
107     {
108         if (this.url != null && count > 0)
109         {
110             throw new StyxException("Cannot write to this stream: it is" +
111                 " reading from " + this.url);
112         }
113         try
114         {
115             // First check to see if this is an EOF message
116             if (count == 0)
117             {
118                 if (this.candidateURLBuffer != null)
119                 {
120                     if (offset == this.candidateURLLength)
121                     {
122                         // We're writing EOF at the end of an existing URL
123                         // See if this URL is recognised
124                         try
125                         {
126                             URL url = new URL(this.candidateURL);
127                             this.setURL(url);
128                             this.replyWrite(client, count, tag);
129                             return;
130                         }
131                         catch (MalformedURLException mue)
132                         {
133                             throw new StyxException(this.candidateURL +
134                                 " is not recognised as a valid URL");
135                         }
136                     }
137                     else
138                     {
139                         // We're writing EOF elsewhere in the file.  Write
140                         // the "candidate URL" to the file as it is obviously
141                         // not really a URL
142                         this.writeData(client, offset, count, this.candidateURLBuffer,
143                             truncate, tag);
144                         this.candidateURLBuffer.release();
145                         this.candidateURL = null;
146                         this.closeOutput();
147                         this.replyWrite(client, count, tag);
148                         return;
149                     }
150                 }
151                 else
152                 {
153                     // We have reached EOF and we have no stored URL to write.
154                     // Just write the empty buffer to signify EOF
155                     this.writeData(client, offset, count, data, truncate, tag);
156                     this.closeOutput();
157                     this.replyWrite(client, count, tag);
158                     return;
159                 }
160             }
161             else
162             {
163                 if (offset == 0 && parseURL(data))
164                 {
165                     // Don't write any data to the stream.  We've stored
166                     // the candidate URL for future use
167                     this.replyWrite(client, count, tag);
168                     return;
169                 }
170                 int bytesToWrite = data.remaining();
171                 if (count < data.remaining())
172                 {
173                     // Would normally be an error if count != data.remaining(),
174                     // but we'll let the calling application pick this up
175                     bytesToWrite = count;
176                 }
177                 this.writeData(client, offset, count, data, truncate, tag);
178                 this.replyWrite(client, bytesToWrite, tag);
179                 return;
180             }
181         }
182         catch(IOException ioe)
183         {
184             throw new StyxException("IOException occurred when writing to "
185                     + "the stream: " + ioe.getMessage());
186         }
187     }
188             
189     /***
190      * Attempts to retrieve a URL from the given data. The data buffer must
191      * contain the string "readfrom:<url>" where <url> is a valid URL.
192      * @return True if we might have a url: this url will be stored in
193      * this.candidateURL as a string, and this.candidateURLLength will store
194      * the length of the incoming data buffer
195      */
196     private boolean parseURL(ByteBuffer data)
197     {
198         int dataLen = data.remaining();
199         // Check to see if this could be a URL.  The dataToString() method does
200         // not change the position or limit of the buffer
201         String urlStr = StyxUtils.dataToString(data);
202         final String prefix = "readfrom:";
203         if (urlStr.startsWith(prefix))
204         {
205             // Store this data buffer as we might want to use it later
206             data.acquire();
207             this.candidateURLBuffer = data;
208             this.candidateURL = urlStr.substring(prefix.length());
209             log.debug("Got candidate URL: " + this.candidateURL);
210             this.candidateURLLength = dataLen;
211             return true;
212         }
213         else
214         {
215             return false;
216         }
217     }
218     
219     public void setURL(URL url) throws StyxException
220     {
221         log.debug("Setting url = " + url);
222         this.url = url;
223     }
224     
225     /***
226      * Writes data to the underlying file or stream
227      */
228     protected abstract void writeData(StyxFileClient client, long offset, int count,
229         ByteBuffer data, boolean truncate, int tag)
230         throws StyxException, IOException;
231     
232     /***
233      * Closes the output file or stream. This default implementation does nothing
234      */
235     protected abstract void closeOutput() throws IOException;
236     
237     /***
238      * file through which clients can write to the process's input stream directly
239      */
240     public static class StdinFile extends SGSInputFile
241     {
242         private OutputStream stream = null;
243 
244         /***
245          * Creates new StdinFile - will be called "stdin"
246          */
247         public StdinFile(StyxGridServiceInstance instance)
248             throws StyxException
249         {
250             super("stdin", instance);
251         }
252 
253         public void setOutputStream(OutputStream os)
254         {
255             this.stream = os;
256         }
257         
258         /***
259          * This just writes the data to the current position in the stream
260          * and flushes the write.  The offset is ignored.  The "bytesConsumed"
261          * service data element is updated
262          */
263         protected void writeData(StyxFileClient client, long offset, int count,
264             ByteBuffer data, boolean truncate, int tag)
265             throws StyxException, IOException
266         {
267             if (instance.getStatus() != StatusCode.RUNNING)
268             {
269                 throw new StyxException("Can't write data to standard input" +
270                     " before the service is running");
271             }
272             byte[] arr = new byte[data.remaining()];
273             data.get(arr);
274             this.stream.write(arr);
275             this.stream.flush();
276             // Update the number of bytes consumed
277             this.instance.setBytesConsumed(offset + arr.length);
278         }
279         
280         public void write(StyxFileClient client, long offset, int count,
281             ByteBuffer data, boolean truncate, int tag)
282             throws StyxException
283         {
284             this.write2(client, offset, count, data, truncate, tag);
285         }
286         
287         public void setURL(URL url) throws StyxException
288         {
289             instance.redirectToStdin(url);
290             super.setURL(url);
291         }
292         
293         protected void closeOutput() throws IOException
294         {
295             this.stream.close();
296         }
297     }
298     
299     public static class File extends SGSInputFile
300     {
301         private FileChannel chan;
302         private java.io.File file;
303         private boolean eofWritten;
304         
305         public File(java.io.File file, StyxGridServiceInstance instance)
306             throws StyxException
307         {
308             super(file.getName(), instance);
309             this.file = file;
310             this.chan = null; // We create this when we get our first write message
311             this.eofWritten = false;
312         }
313         
314         protected void closeOutput() throws IOException
315         {
316             if (this.chan != null)
317             {
318                 this.chan.close();
319             }
320         }
321         
322         /***
323          * This just writes the data to the underlying fileOnDisk
324          */
325         protected void writeData(StyxFileClient client, long offset, int count,
326             ByteBuffer data, boolean truncate, int tag)
327             throws StyxException, IOException
328         {
329             if (count == 0)
330             {
331                 this.eofWritten = true;
332             }
333             else
334             {
335                 if (this.chan == null)
336                 {
337                     try
338                     {
339                         this.chan = new RandomAccessFile(this.file, "rw").getChannel();
340                     }
341                     catch (IOException ioe)
342                     {
343                         throw new StyxException("Error creating file " +
344                             this.file.getName() + ": " + ioe.getMessage());
345                     }
346                 }
347                 // Remember old limit and position
348                 int pos = data.position();
349                 int lim = data.limit();
350                 // Make sure only the requested number of bytes get written
351                 data.limit(data.position() + count);
352 
353                 // Write to the file
354                 int nWritten = chan.write(data.buf(), offset);
355 
356                 // Reset former buffer positions
357                 data.limit(lim).position(pos);
358 
359                 // Truncate the file at the end of the new data if requested
360                 if (truncate)
361                 {
362                     chan.truncate(offset + nWritten);
363                 }
364                 this.dataWritten = true;
365                 this.eofWritten = false;
366             }
367         }
368         
369         public void write(StyxFileClient client, long offset, int count,
370             ByteBuffer data, boolean truncate, int tag)
371             throws StyxException
372         {
373             // We're not allowed to write to this file if the service is running
374             // or the input URL is set
375             if (instance.getStatus() == StatusCode.RUNNING)
376             {
377                 throw new StyxException("Cannot write to an input file while " +
378                     "the service is running");
379             }
380             else
381             {
382                 this.write2(client, offset, count, data, truncate, tag);
383             }
384         }
385         
386         /***
387          * @return true if we have received an EOF message (an empty write message)
388          */
389         public boolean dataUploadComplete()
390         {
391             return this.eofWritten = true;
392         }
393         
394         /***
395          * @return the length of the underlying file on disk
396          */
397         public ULong getLength()
398         {
399             return new ULong(this.file.length());
400         }
401     }
402     
403     /***
404      * @return the URL from which this file will get its data, or null if the 
405      * URL has not been set
406      */
407     public URL getURL()
408     {
409         return this.url;
410     }
411     
412     
413     
414 }