1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package uk.ac.rdg.resc.jstyx.gridservice.server;
30
31 import java.io.OutputStream;
32 import java.io.IOException;
33 import java.net.URL;
34 import java.net.MalformedURLException;
35
36 import java.nio.channels.FileChannel;
37 import java.io.RandomAccessFile;
38
39 import org.apache.mina.common.ByteBuffer;
40
41 import uk.ac.rdg.resc.jstyx.StyxUtils;
42 import uk.ac.rdg.resc.jstyx.StyxException;
43 import uk.ac.rdg.resc.jstyx.types.ULong;
44 import uk.ac.rdg.resc.jstyx.server.StyxFile;
45 import uk.ac.rdg.resc.jstyx.server.StyxFileClient;
46
47 import org.apache.log4j.Logger;
48
49 /***
50 * A file that is used to provide input (as a file or as stdin stream) to a
51 * Styx Grid Service.
52 *
53 * @author Jon Blower
54 * $Revision: 609 $
55 * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
56 * $Log$
57 * Revision 1.5 2005/12/09 18:42:41 jonblower
58 * Changed access privileges to write2() to protected
59 *
60 * Revision 1.4 2005/11/11 21:57:21 jonblower
61 * Implemented passing of URLs to input files
62 *
63 * Revision 1.3 2005/11/09 17:49:58 jonblower
64 * Implemented File subclass (representing fixed-name input files)
65 *
66 * Revision 1.2 2005/11/04 19:29:53 jonblower
67 * Moved code that writes to std input to SGSInputFile
68 *
69 * Revision 1.1 2005/11/04 09:12:23 jonblower
70 * Initial import
71 *
72 */
73 public abstract class SGSInputFile extends StyxFile
74 {
75
76 private static final Logger log = Logger.getLogger(SGSInputFile.class);
77
78 protected StyxGridServiceInstance instance;
79 protected boolean dataWritten;
80
81
82 protected ByteBuffer candidateURLBuffer;
83
84 protected String candidateURL;
85
86 protected int candidateURLLength;
87 protected URL url;
88
89
90 private SGSInputFile(String name, StyxGridServiceInstance instance)
91 throws StyxException
92 {
93 super(name, 0222);
94 this.instance = instance;
95 this.dataWritten = false;
96 this.candidateURL = null;
97 this.candidateURLLength = 0;
98 this.url = null;
99 }
100
101 /***
102 * This method does the job of checking for URLs being written to the file.
103 */
104 protected void write2(StyxFileClient client, long offset, int count,
105 ByteBuffer data, boolean truncate, int tag)
106 throws StyxException
107 {
108 if (this.url != null && count > 0)
109 {
110 throw new StyxException("Cannot write to this stream: it is" +
111 " reading from " + this.url);
112 }
113 try
114 {
115
116 if (count == 0)
117 {
118 if (this.candidateURLBuffer != null)
119 {
120 if (offset == this.candidateURLLength)
121 {
122
123
124 try
125 {
126 URL url = new URL(this.candidateURL);
127 this.setURL(url);
128 this.replyWrite(client, count, tag);
129 return;
130 }
131 catch (MalformedURLException mue)
132 {
133 throw new StyxException(this.candidateURL +
134 " is not recognised as a valid URL");
135 }
136 }
137 else
138 {
139
140
141
142 this.writeData(client, offset, count, this.candidateURLBuffer,
143 truncate, tag);
144 this.candidateURLBuffer.release();
145 this.candidateURL = null;
146 this.closeOutput();
147 this.replyWrite(client, count, tag);
148 return;
149 }
150 }
151 else
152 {
153
154
155 this.writeData(client, offset, count, data, truncate, tag);
156 this.closeOutput();
157 this.replyWrite(client, count, tag);
158 return;
159 }
160 }
161 else
162 {
163 if (offset == 0 && parseURL(data))
164 {
165
166
167 this.replyWrite(client, count, tag);
168 return;
169 }
170 int bytesToWrite = data.remaining();
171 if (count < data.remaining())
172 {
173
174
175 bytesToWrite = count;
176 }
177 this.writeData(client, offset, count, data, truncate, tag);
178 this.replyWrite(client, bytesToWrite, tag);
179 return;
180 }
181 }
182 catch(IOException ioe)
183 {
184 throw new StyxException("IOException occurred when writing to "
185 + "the stream: " + ioe.getMessage());
186 }
187 }
188
189 /***
190 * Attempts to retrieve a URL from the given data. The data buffer must
191 * contain the string "readfrom:<url>" where <url> is a valid URL.
192 * @return True if we might have a url: this url will be stored in
193 * this.candidateURL as a string, and this.candidateURLLength will store
194 * the length of the incoming data buffer
195 */
196 private boolean parseURL(ByteBuffer data)
197 {
198 int dataLen = data.remaining();
199
200
201 String urlStr = StyxUtils.dataToString(data);
202 final String prefix = "readfrom:";
203 if (urlStr.startsWith(prefix))
204 {
205
206 data.acquire();
207 this.candidateURLBuffer = data;
208 this.candidateURL = urlStr.substring(prefix.length());
209 log.debug("Got candidate URL: " + this.candidateURL);
210 this.candidateURLLength = dataLen;
211 return true;
212 }
213 else
214 {
215 return false;
216 }
217 }
218
219 public void setURL(URL url) throws StyxException
220 {
221 log.debug("Setting url = " + url);
222 this.url = url;
223 }
224
225 /***
226 * Writes data to the underlying file or stream
227 */
228 protected abstract void writeData(StyxFileClient client, long offset, int count,
229 ByteBuffer data, boolean truncate, int tag)
230 throws StyxException, IOException;
231
232 /***
233 * Closes the output file or stream. This default implementation does nothing
234 */
235 protected abstract void closeOutput() throws IOException;
236
237 /***
238 * file through which clients can write to the process's input stream directly
239 */
240 public static class StdinFile extends SGSInputFile
241 {
242 private OutputStream stream = null;
243
244 /***
245 * Creates new StdinFile - will be called "stdin"
246 */
247 public StdinFile(StyxGridServiceInstance instance)
248 throws StyxException
249 {
250 super("stdin", instance);
251 }
252
253 public void setOutputStream(OutputStream os)
254 {
255 this.stream = os;
256 }
257
258 /***
259 * This just writes the data to the current position in the stream
260 * and flushes the write. The offset is ignored. The "bytesConsumed"
261 * service data element is updated
262 */
263 protected void writeData(StyxFileClient client, long offset, int count,
264 ByteBuffer data, boolean truncate, int tag)
265 throws StyxException, IOException
266 {
267 if (instance.getStatus() != StatusCode.RUNNING)
268 {
269 throw new StyxException("Can't write data to standard input" +
270 " before the service is running");
271 }
272 byte[] arr = new byte[data.remaining()];
273 data.get(arr);
274 this.stream.write(arr);
275 this.stream.flush();
276
277 this.instance.setBytesConsumed(offset + arr.length);
278 }
279
280 public void write(StyxFileClient client, long offset, int count,
281 ByteBuffer data, boolean truncate, int tag)
282 throws StyxException
283 {
284 this.write2(client, offset, count, data, truncate, tag);
285 }
286
287 public void setURL(URL url) throws StyxException
288 {
289 instance.redirectToStdin(url);
290 super.setURL(url);
291 }
292
293 protected void closeOutput() throws IOException
294 {
295 this.stream.close();
296 }
297 }
298
299 public static class File extends SGSInputFile
300 {
301 private FileChannel chan;
302 private java.io.File file;
303 private boolean eofWritten;
304
305 public File(java.io.File file, StyxGridServiceInstance instance)
306 throws StyxException
307 {
308 super(file.getName(), instance);
309 this.file = file;
310 this.chan = null;
311 this.eofWritten = false;
312 }
313
314 protected void closeOutput() throws IOException
315 {
316 if (this.chan != null)
317 {
318 this.chan.close();
319 }
320 }
321
322 /***
323 * This just writes the data to the underlying fileOnDisk
324 */
325 protected void writeData(StyxFileClient client, long offset, int count,
326 ByteBuffer data, boolean truncate, int tag)
327 throws StyxException, IOException
328 {
329 if (count == 0)
330 {
331 this.eofWritten = true;
332 }
333 else
334 {
335 if (this.chan == null)
336 {
337 try
338 {
339 this.chan = new RandomAccessFile(this.file, "rw").getChannel();
340 }
341 catch (IOException ioe)
342 {
343 throw new StyxException("Error creating file " +
344 this.file.getName() + ": " + ioe.getMessage());
345 }
346 }
347
348 int pos = data.position();
349 int lim = data.limit();
350
351 data.limit(data.position() + count);
352
353
354 int nWritten = chan.write(data.buf(), offset);
355
356
357 data.limit(lim).position(pos);
358
359
360 if (truncate)
361 {
362 chan.truncate(offset + nWritten);
363 }
364 this.dataWritten = true;
365 this.eofWritten = false;
366 }
367 }
368
369 public void write(StyxFileClient client, long offset, int count,
370 ByteBuffer data, boolean truncate, int tag)
371 throws StyxException
372 {
373
374
375 if (instance.getStatus() == StatusCode.RUNNING)
376 {
377 throw new StyxException("Cannot write to an input file while " +
378 "the service is running");
379 }
380 else
381 {
382 this.write2(client, offset, count, data, truncate, tag);
383 }
384 }
385
386 /***
387 * @return true if we have received an EOF message (an empty write message)
388 */
389 public boolean dataUploadComplete()
390 {
391 return this.eofWritten = true;
392 }
393
394 /***
395 * @return the length of the underlying file on disk
396 */
397 public ULong getLength()
398 {
399 return new ULong(this.file.length());
400 }
401 }
402
403 /***
404 * @return the URL from which this file will get its data, or null if the
405 * URL has not been set
406 */
407 public URL getURL()
408 {
409 return this.url;
410 }
411
412
413
414 }