1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package uk.ac.rdg.resc.jstyx.gridservice.server;
30
31 import java.util.Vector;
32 import java.io.File;
33 import java.io.RandomAccessFile;
34 import java.io.FileInputStream;
35 import java.io.FileNotFoundException;
36 import java.io.IOException;
37 import java.nio.channels.FileChannel;
38
39 import org.apache.log4j.Logger;
40 import org.apache.mina.common.ByteBuffer;
41
42 import uk.ac.rdg.resc.jstyx.server.StyxFile;
43 import uk.ac.rdg.resc.jstyx.server.StyxFileChangeListener;
44 import uk.ac.rdg.resc.jstyx.server.StyxFileClient;
45 import uk.ac.rdg.resc.jstyx.server.StyxServerProtocolHandler;
46 import uk.ac.rdg.resc.jstyx.types.ULong;
47 import uk.ac.rdg.resc.jstyx.messages.RerrorMessage;
48 import uk.ac.rdg.resc.jstyx.StyxException;
49
50 /***
51 * A file that can be read to obtain data from a file that is put out by an
52 * executable in a Styx Grid Service
53 * @todo this repeats code in FileOnDisk and GeneralCachingStreamReader. Can
54 * we refactor?
55 *
56 * @author Jon Blower
57 * $Revision: 579 $
58 * $Date: 2006-02-17 09:26:08 +0000 (Fri, 17 Feb 2006) $
59 * $Log$
60 * Revision 1.3 2006/02/17 09:26:08 jonblower
61 * Removed reference to SGSParamFile
62 *
63 * Revision 1.2 2005/11/11 21:57:21 jonblower
64 * Implemented passing of URLs to input files
65 *
66 * Revision 1.1 2005/11/10 19:50:43 jonblower
67 * Added code to handle output files
68 *
69 */
70
71 public class SGSOutputFile extends StyxFile implements SGSInstanceChangeListener
72 {
73 private static final Logger log = Logger.getLogger(SGSOutputFile.class);
74
75 private File file;
76 private Vector requestQueue;
77 private boolean serviceFinished;
78
79 /*** Creates a new instance of SGSOutputFile */
80 public SGSOutputFile(File file, StyxGridServiceInstance instance)
81 throws StyxException
82 {
83 super(file.getName(), 0444);
84 this.file = file;
85 instance.addChangeListener(this);
86 this.serviceFinished = false;
87 this.requestQueue = new Vector();
88 new FileMonitor().start();
89 }
90
91 public synchronized void read(StyxFileClient client, long offset, int count,
92 int tag) throws StyxException
93 {
94 DataRequest dr = new DataRequest(client, offset, count, tag);
95 if (!processRequest(dr))
96 {
97
98 synchronized(this.requestQueue)
99 {
100 this.requestQueue.add(dr);
101 }
102 }
103 }
104
105 /***
106 * @return true if the request was processed successfully, false otherwise
107 */
108 private synchronized boolean processRequest(DataRequest dr)
109 {
110 if (dr.offset < this.getLength().asLong())
111 {
112
113 this.readAndReply(dr);
114 return true;
115 }
116 else if (this.serviceFinished)
117 {
118
119
120 this.replyRead(dr.client, new byte[0], dr.tag);
121 return true;
122 }
123 else
124 {
125
126 return false;
127 }
128 }
129
130 /***
131 * Processes all outstanding requests. This is called when more data arrive
132 * or when EOF is reached.
133 */
134 private void processOutstandingRequests()
135 {
136
137 synchronized(this.requestQueue)
138 {
139 for (int i = 0; i < this.requestQueue.size(); )
140 {
141 DataRequest dr = (DataRequest)this.requestQueue.get(i);
142 boolean processed = this.processRequest(dr);
143 if (processed)
144 {
145
146
147 this.requestQueue.remove(i);
148 }
149 else
150 {
151 i++;
152 }
153 }
154 }
155 }
156
157 /***
158 * Reads from the underlying file and replies to the client. Before this
159 * method is called, we must check to see if dr.offset is less than the
160 * length of the file
161 */
162 private synchronized void readAndReply(DataRequest dr)
163 {
164 try
165 {
166
167 FileChannel chan = new FileInputStream(this.file).getChannel();
168
169
170
171 ByteBuffer buf = ByteBuffer.allocate(dr.count);
172
173
174 buf.position(0).limit(dr.count);
175
176
177
178 int numRead = chan.read(buf.buf(), dr.offset);
179 log.debug("Read " + numRead + " bytes from " + this.file.getPath());
180
181 chan.close();
182
183 buf.flip();
184 if (numRead > 0)
185 {
186 this.replyRead(dr.client, buf, dr.tag);
187 }
188 else
189 {
190 RerrorMessage rErrMsg = new RerrorMessage("Internal error: " +
191 "zero bytes read from " + this.file.getName());
192 StyxServerProtocolHandler.reply(dr.client.getSession(),
193 rErrMsg, dr.tag);
194 }
195 }
196 catch(FileNotFoundException fnfe)
197 {
198 RerrorMessage rErrMsg = new RerrorMessage("Internal error: file " +
199 this.file.getName() + " does not exist");
200 StyxServerProtocolHandler.reply(dr.client.getSession(),
201 rErrMsg, dr.tag);
202 }
203 catch(IOException ioe)
204 {
205 RerrorMessage rErrMsg = new RerrorMessage("An error of class " +
206 ioe.getClass() + " occurred when trying to read from " +
207 this.getFullPath() + ": " + ioe.getMessage());
208 StyxServerProtocolHandler.reply(dr.client.getSession(),
209 rErrMsg, dr.tag);
210 }
211 }
212
213 public ULong getLength()
214 {
215 return new ULong(this.file.length());
216 }
217
218 /***
219 * Called automatically when the status of the Styx Grid Service instance
220 * changes.
221 */
222 public void statusChanged(StatusCode newStatus)
223 {
224 log.debug("Got newStatus = " + newStatus.getText());
225 if (newStatus == StatusCode.FINISHED ||
226 newStatus == StatusCode.ABORTED ||
227 newStatus == StatusCode.ERROR)
228 {
229 this.serviceFinished = true;
230 }
231 this.processOutstandingRequests();
232 }
233
234 /***
235 * Simple thread that monitors the state of the underlying file. When the
236 * state changes, sends a signal to process outstanding requests
237 */
238 private class FileMonitor extends Thread
239 {
240 private long length = 0L;
241
242 public void run()
243 {
244 while(!serviceFinished)
245 {
246 long newLength = file.length();
247 if (newLength != this.length)
248 {
249 this.length = newLength;
250 processOutstandingRequests();
251 }
252
253 try
254 {
255 Thread.sleep(2000);
256 }
257 catch(InterruptedException ie)
258 {
259
260 }
261 }
262 }
263 }
264
265 /***
266 * Class representing a request for data
267 */
268 private static class DataRequest
269 {
270 private StyxFileClient client;
271 private long offset;
272 private int count;
273 private int tag;
274
275 private DataRequest(StyxFileClient client, long offset, int count, int tag)
276 {
277 this.client = client;
278 this.offset = offset;
279 this.count = count;
280 this.tag = tag;
281 }
282 }
283
284 }