View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   * 
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.gridservice.client;
30  
31  import java.util.Vector;
32  import java.util.Hashtable;
33  import java.util.Iterator;
34  import java.util.Enumeration;
35  import java.util.Date;
36  
37  import java.io.File;
38  import java.io.InputStream;
39  import java.io.FileInputStream;
40  import java.io.FileOutputStream;
41  import java.io.OutputStream;
42  import java.io.PrintStream;
43  import java.io.IOException;
44  import java.io.FileNotFoundException;
45  
46  import java.net.URL;
47  import java.net.MalformedURLException;
48  import java.net.UnknownHostException;
49  
50  import org.apache.mina.common.ByteBuffer;
51  import org.apache.log4j.Logger;
52  
53  import uk.ac.rdg.resc.jstyx.gridservice.config.SGSConfig;
54  import uk.ac.rdg.resc.jstyx.gridservice.config.SGSParam;
55  import uk.ac.rdg.resc.jstyx.gridservice.config.SGSInput;
56  import uk.ac.rdg.resc.jstyx.gridservice.config.SGSOutput;
57  import uk.ac.rdg.resc.jstyx.messages.TreadMessage;
58  import uk.ac.rdg.resc.jstyx.messages.TwriteMessage;
59  import uk.ac.rdg.resc.jstyx.messages.StyxMessage;
60  import uk.ac.rdg.resc.jstyx.client.StyxConnection;
61  import uk.ac.rdg.resc.jstyx.client.CStyxFileChangeAdapter;
62  import uk.ac.rdg.resc.jstyx.client.CStyxFile;
63  import uk.ac.rdg.resc.jstyx.client.CStyxFileOutputStream;
64  import uk.ac.rdg.resc.jstyx.client.MessageCallback;
65  import uk.ac.rdg.resc.jstyx.types.DirEntry;
66  import uk.ac.rdg.resc.jstyx.StyxUtils;
67  import uk.ac.rdg.resc.jstyx.StyxException;
68  
69  /***
70   * A client for a Styx Grid Service Instance. The client interacts with the SGS
71   * instance and fires events when data arrives or the service data change.
72   *
73   * @author Jon Blower
74   * $Revision: 609 $
75   * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
76   * $Log$
77   * Revision 1.54  2006/02/22 08:52:57  jonblower
78   * Added debug code and support for setting service lifetime
79   *
80   * Revision 1.53  2006/02/20 17:35:01  jonblower
81   * Implemented correct handling of output files/streams (not fully tested yet)
82   *
83   * Revision 1.52  2006/02/20 08:37:32  jonblower
84   * Still working towards handling output data properly in SGSInstanceClient
85   *
86   * Revision 1.51  2006/02/17 17:34:43  jonblower
87   * Implemented (but didn't test) proper handling of output files
88   *
89   * Revision 1.50  2006/02/17 09:26:59  jonblower
90   * Changes to comments
91   *
92   * Revision 1.49  2006/01/06 10:15:36  jonblower
93   * Implemented uploadInputFilesAsync()
94   *
95   * Revision 1.48  2006/01/05 16:06:34  jonblower
96   * SGS clients now deal with possibility that client could be created on a different server
97   *
98   * Revision 1.47  2005/12/20 09:50:54  jonblower
99   * Continuing to implement reading of output streams
100  *
101  * Revision 1.46  2005/12/19 17:21:01  jonblower
102  * Preparing for including automatic download of output files in this class
103  *
104  * Revision 1.45  2005/12/13 09:04:21  jonblower
105  * Implemented correct handling of stdin
106  *
107  * Revision 1.44  2005/12/09 18:41:56  jonblower
108  * Continuing to simplify client interface to SGS instances
109  *
110  * Revision 1.43  2005/12/07 17:51:32  jonblower
111  * Changed "commandline" file to "args"
112  *
113  * Revision 1.42  2005/12/07 08:56:32  jonblower
114  * Refactoring SGS client code
115  *
116  * Revision 1.41  2005/12/01 17:17:07  jonblower
117  * Simplifying client interface to SGS instances
118  *
119  * Revision 1.39  2005/11/11 21:57:21  jonblower
120  * Implemented passing of URLs to input files
121  *
122  * Revision 1.38  2005/11/10 19:49:28  jonblower
123  * Renamed SGSInstanceChangeListener to SGSInstanceClientChangeListener
124  *
125  * Revision 1.37  2005/11/09 17:43:19  jonblower
126  * Added getInputStreamsDir() and removed urls/ directory from getInputStreams()
127  *
128  * Revision 1.36  2005/11/07 21:05:35  jonblower
129  * Added setCommandLineArgs() method
130  *
131  * Revision 1.35  2005/11/04 19:28:20  jonblower
132  * Changed structure of input files in config file and Styx namespace
133  *
134  * Revision 1.34  2005/11/02 09:01:54  jonblower
135  * Continuing to implement JSAP-based parameter parsing
136  *
137  * Revision 1.33  2005/10/18 14:08:14  jonblower
138  * Removed inputfiles from namespace
139  *
140  * Revision 1.32  2005/10/14 18:09:40  jonblower
141  * Changed getInputMethods() to getInputStreams() and added synchronous and async versions
142  *
143  * Revision 1.31  2005/09/19 07:41:43  jonblower
144  * Added a close() method
145  *
146  * Revision 1.30  2005/09/11 19:28:58  jonblower
147  * Added getSteeringFiles() and getOutputStream()
148  *
149  * Revision 1.29  2005/08/12 08:08:39  jonblower
150  * Developments to support web interface
151  *
152  * Revision 1.28  2005/08/04 16:49:17  jonblower
153  * Added and edited upload() methods in CStyxFile
154  *
155  * Revision 1.27  2005/08/02 08:05:08  jonblower
156  * Continuing to implement steering
157  *
158  * Revision 1.26  2005/08/01 16:38:05  jonblower
159  * Implemented simple parameter handling
160  *
161  * Revision 1.25  2005/07/29 16:55:49  jonblower
162  * Implementing reading command line asynchronously
163  *
164  * Revision 1.24  2005/06/14 07:45:16  jonblower
165  * Implemented setting of params and async notification of parameter changes
166  *
167  * Revision 1.23  2005/06/13 16:46:35  jonblower
168  * Implemented setting of parameter values via the GUI
169  *
170  * Revision 1.22  2005/06/10 07:53:12  jonblower
171  * Changed SGS namespace: removed "inurl" and subsumed functionality into "stdin"
172  *
173  * Revision 1.21  2005/06/07 16:44:45  jonblower
174  * Fixed problem with caching stream reader on client side
175  *
176  * Revision 1.20  2005/05/27 17:05:06  jonblower
177  * Changes to incorporate GeneralCachingStreamReader
178  *
179  * Revision 1.19  2005/05/26 16:52:06  jonblower
180  * Implemented detection and viewing of output streams
181  *
182  * Revision 1.18  2005/05/25 16:59:31  jonblower
183  * Added uploadInputFile()
184  *
185  * Revision 1.17  2005/05/23 16:48:23  jonblower
186  * Overhauled CStyxFile (esp. asynchronous methods) and StyxConnection (added cache of CStyxFiles)
187  *
188  * Revision 1.15  2005/05/20 07:45:27  jonblower
189  * Implemented getInputFiles() to find the input files required by the service
190  *
191  * Revision 1.14  2005/05/19 18:42:06  jonblower
192  * Implementing specification of input files required by SGS
193  *
194  * Revision 1.13  2005/05/18 17:13:51  jonblower
195  * Created SGSInstanceGUI
196  *
197  * Revision 1.12  2005/05/16 11:00:53  jonblower
198  * Changed SGS config XML file structure: separated input and output streams and changed some tag names
199  *
200  * Revision 1.11  2005/05/13 16:49:34  jonblower
201  * Coded dynamic detection and display of service data, also included streams in config file
202  *
203  * Revision 1.10  2005/05/12 16:00:28  jonblower
204  * Implementing reading of service data elements
205  *
206  * Revision 1.9  2005/05/12 14:21:03  jonblower
207  * Changed dataSent() method to dataWritten() (more accurate name)
208  *
209  * Revision 1.8  2005/05/12 08:00:53  jonblower
210  * Added getChildrenAsync() to CStyxFile and childrenFound() to CStyxFileChangeListener
211  *
212  * Revision 1.7  2005/05/12 07:40:54  jonblower
213  * CStyxFile.close() no longer throws a StyxException
214  *
215  * Revision 1.3  2005/03/18 16:45:14  jonblower
216  * Released ByteBuffers after use
217  *
218  * Revision 1.2  2005/03/18 13:55:59  jonblower
219  * Improved freeing of ByteBuffers, and bug fixes
220  *
221  * Revision 1.1  2005/03/16 22:16:44  jonblower
222  * Added Styx Grid Service classes to core module
223  *
224  * Revision 1.4  2005/03/16 17:59:35  jonblower
225  * Changed following changes to core JStyx library (replacement of java.nio.ByteBuffers with MINA's ByteBuffers)
226  *
227  * Revision 1.2  2005/02/21 18:12:29  jonblower
228  * Following changes to core JStyx library
229  *
230  * Revision 1.1  2005/02/16 19:22:29  jonblower
231  * Commit adding of SGS files to CVS
232  *
233  */
234 public class SGSInstanceClient extends CStyxFileChangeAdapter
235 {
236     private static final Logger log = Logger.getLogger(SGSInstanceClient.class);
237     
238     private SGSClient client;       // Client of the Styx Grid Service
239     private SGSConfig config;       // Full configuration information
240     private CStyxFile instanceRoot; // The file at the root of the instance
241     private CStyxFile ctlFile;      // The file that we use to stop, start and
242                                     // destroy the instance
243     
244     // Hashtable of StringBuffers, one for each CStyxFile that is being read
245     // continuously in an asynchronous fashion (see this.readDataAsync())
246     private Hashtable/*<CStyxFile, StringBuffer>*/ bufs;
247     
248     // State data
249     private CStyxFile[] serviceDataFiles;
250     private Hashtable sdeValues; // Hashtable of service data values, keyed by the
251                                  // names of the service data files
252     private long sdeValuesVersion; // Version of this Hashtable
253     private long sdeValuesVersionLastRead; // Version of this Hashtable on the last read
254     private CStyxFile statusFile;
255     
256     // Input files and streams
257     private CStyxFile inputsDir;
258     Object stdinSrc; // Source of data to stream to stdin: null if we're using System.in
259     
260     // The files we're going to upload to the service
261     // This hashtable allows multiple files or URLs to be associated with
262     // an SGSInput object
263     private Hashtable/*<SGSInput, Vector<String or File>*/ filesToUpload;
264     
265     // The destinations for files that we shall download from the service
266     private Hashtable/*<CStyxFile, PrintStream>*/ filesToDownload;
267     
268     // Output streams
269     private CStyxFile[] outputFiles;
270     private Hashtable activeStreams;
271     
272     // Parameters
273     private CStyxFile[] paramFiles;
274     
275     // Directory containing files pertaining to the lifecycle of the service
276     private CStyxFile timeDir;
277     
278     // Steerable parameters
279     private CStyxFile[] steeringFiles;
280     
281     // Used to read command line arguments for debugging
282     private CStyxFile argsFile;
283     
284     // Vector of service data, parameter etc files that we are already reading
285     // (used by readDataAsync())
286     private Vector filesBeingRead;
287     
288     // SGSInstanceClientChangeListeners that are listening for changes to this SGS instance
289     private Vector changeListeners;
290     
291     /***
292      * Creates a new SGSInstanceClient for an instance that has its root in the
293      * given CStyxFile.  Note that this constructor will make <b>blocking</b>
294      * reads to the SGS server and therefore should not be called from within
295      * a callback method.
296      * @param client The SGSClient to which this instance belongs
297      * @param instanceRoot The file representing the root of this instance
298      * @throws StyxException if there was an error creating the client (for 
299      * example, there is no instance with the given ID or there was an error
300      * reading the directory contents from the server)
301      */
302     public SGSInstanceClient(SGSClient client, CStyxFile instanceRoot)
303         throws StyxException
304     {
305         this.init(client, instanceRoot);
306     }
307     
308     /***
309      * Gets an SGSInstanceClient, given the full URL to the root of the new
310      * instance, e.g.
311      * <code>styx://thehost.com:9092/mySGS/instances/1234567890abcde</code>
312      * @throws StyxException if there was an error creating the client object
313      */
314     public SGSInstanceClient(String instanceURL) throws StyxException
315     {
316         // Check to see if the new instance is on the same server and port
317         // as the current connection
318         URL url = null;
319         try
320         {
321             url = new URL(instanceURL);
322             // Get a client for the server.  If a client already exists (perhaps
323             // the current client) it will simply be returned
324             SGSServerClient serverClient =
325                 SGSServerClient.getServerClient(url.getHost(), url.getPort());
326             String[] pathEls = url.getPath().split("/");
327             // For some reason the first element of pathEls[] is an empty string.
328             // The other three elements are the name of the SGS, the "instances"
329             // directory and the id of the SGS
330             if (pathEls.length != 4)
331             {
332                 throw new StyxException("URL format error");
333             }
334             SGSClient sgsClient = serverClient.getSGSClient(pathEls[1]);
335             this.init(sgsClient, pathEls[3]);
336         }
337         catch(MalformedURLException mue)
338         {
339             throw new StyxException(instanceURL + " is not recognised as a valid URL");
340         }
341         catch(UnknownHostException uhe)
342         {
343             throw new StyxException("The host at address " + url.getHost() +
344                 " is unknown.");
345         }
346     }
347     
348     /***
349      * Sets up this instance
350      */
351     private void init(SGSClient client, String instanceID) throws StyxException
352     {
353         this.init(client, client.getInstanceFile(instanceID));
354     }
355     
356     /***
357      * Sets up this instance
358      */
359     private void init(SGSClient client, CStyxFile instanceRoot) throws StyxException
360     {
361         this.client = client;
362         this.instanceRoot = instanceRoot;
363         this.ctlFile = this.instanceRoot.getFile("ctl");
364         this.ctlFile.addChangeListener(this);
365         
366         // Get the directory that holds the input files
367         this.inputsDir = this.instanceRoot.getFile("inputs");
368         
369         // Get the list of output files
370         this.outputFiles = this.instanceRoot.getFile("outputs").getChildren();
371         this.activeStreams = new Hashtable();
372         
373         // Get the files we use to set parameter values
374         this.paramFiles = this.instanceRoot.getFile("params").getChildren();
375         
376         // Get the directory of files pertaining to the lifecycle of the service
377         this.timeDir = this.instanceRoot.getFile("time");
378         
379         // Get the directory that holds the steerable parameters
380         this.steeringFiles = this.instanceRoot.getFile("steering").getChildren();
381         
382         // We will read this file to get the command line arguments
383         this.argsFile = this.instanceRoot.getFile("args");
384         
385         // Get the files we use to read service data
386         this.serviceDataFiles = this.instanceRoot.getFile("serviceData").getChildren();
387         this.sdeValues = new Hashtable();
388         this.sdeValuesVersion = 0;
389         this.sdeValuesVersionLastRead = 0;
390         
391         // Get the full configuration information from the server
392         this.config = this.client.getConfig();
393         
394         this.bufs = new Hashtable();
395         this.changeListeners = new Vector();
396         this.filesToUpload = new Hashtable();
397         this.filesToDownload = new Hashtable();
398         this.filesBeingRead = new Vector();
399     }
400     
401     /***
402      * @return the CStyxFile at the root of this instance
403      */
404     public CStyxFile getInstanceRoot()
405     {
406         return this.instanceRoot;
407     }
408     
409     /***
410      * @return the name of this Styx Grid Service (note: not the name of this
411      * particular instance)
412      */
413     public String getName()
414     {
415         return this.client.getName();
416     }
417     
418     /***
419      * Sends a message to start the service. When the confirmation arrives that
420      * the service has been started, the serviceStarted() event will be fired
421      * on all registered change listeners
422      */
423     public void startServiceAsync()
424     {
425         // Start reading from the exit code service data file.  When we get
426         // data from this file the service has finished
427         this.readServiceDataValueAsync("exitCode");
428         // Start reading from the status file.  This will confirm when the service
429         // is running, aborted, finished etc
430         this.readServiceDataValueAsync("status");
431         this.ctlFile.writeAsync("start", 0);
432     }
433     
434     /***
435      * Starts the service.  Blocks until the service is started.
436      * @throws StyxException if the service could not be started
437      */
438     public void startService() throws StyxException
439     {
440         // Start reading from the exit code service data file.  When we get
441         // data from this file the service has finished
442         this.readServiceDataValueAsync("exitCode");
443         // Start reading from the status file.  This will confirm when the service
444         // is running, aborted, finished etc
445         this.readServiceDataValueAsync("status");
446         this.ctlFile.setContents("start");
447         this.uploadToStdin();
448     }
449     
450     /***
451      * Gets the underlying connection object
452      */
453     public StyxConnection getConnection()
454     {
455         return this.instanceRoot.getConnection();
456     }
457     
458     /***
459      * Checks to see if we need to upload any data to the standard input, then
460      * starts a thread to do so if necessary
461      */
462     private void uploadToStdin()
463     {
464         // Search to see if we need to upload data to stdin
465         boolean gotStdin = false;
466         for (Iterator it = this.getInputs().iterator(); !gotStdin && it.hasNext(); )
467         {
468             SGSInput input = (SGSInput)it.next();
469             if (input.getName().equals("stdin"))
470             {
471                 gotStdin = true;
472             }
473         }
474         if (gotStdin)
475         {
476             if (this.stdinSrc == null)
477             {
478                 // No input source has been set, so we read from System.in
479                 new StdinReader(System.in).start();
480             }
481             else if (this.stdinSrc instanceof File)
482             {
483                 // We're reading from a local file
484                 try
485                 {
486                     FileInputStream fin = new FileInputStream((File)this.stdinSrc);
487                     new StdinReader(fin).start();
488                 }
489                 catch (FileNotFoundException fnfe)
490                 {
491                     // Should not happen because we have already checked to see 
492                     // if the file exists
493                     log.error("Internal error: " + ((File)this.stdinSrc).getPath()
494                         + " not found");
495                 }
496             }
497             // If neither of those conditions are true, we have already set a 
498             // URL for stdin
499         }
500     }
501     
502     /***
503      * Sends a message to stop the service. When the confirmation arrives that
504      * the service has been stopped, the serviceAborted() event will be fired
505      */
506     public void stopServiceAsync()
507     {
508         this.ctlFile.writeAsync("stop", 0);
509     }
510     
511     /***
512      * Stops the service.  Blocks until the service is stopped.
513      * @throws StyxException if the service could not be stopped
514      */
515     public void stopService() throws StyxException
516     {
517         this.ctlFile.setContents("stop");
518     }
519     
520     /***
521      * @return Array of Strings containing the names of the files in the input
522      * array.
523      */
524     private static String[] getFileNamesAsArray(CStyxFile[] files)
525     {
526         String[] arr = new String[files.length];
527         for (int i = 0; i < arr.length; i++)
528         {
529             arr[i] = files[i].getName();
530         }
531         return arr;
532     }
533     
534     /***
535      * @return Array of Strings, one for each element of service data that
536      * can be read.
537      */
538     public String[] getServiceDataNames()
539     {
540         return getFileNamesAsArray(this.serviceDataFiles);
541     }
542     
543     /***
544      * @return Vector of SGSParam objects, one for each input parameter that
545      * can be set.
546      */
547     public Vector getParameters()
548     {
549         return this.config.getParams();
550     }
551     
552     /***
553      * @return Array of Strings, one for each steerable parameter that can be set
554      */
555     public String[] getSteerableParameterNames()
556     {
557         return getFileNamesAsArray(this.steeringFiles);
558     }
559     
560     /***
561      * @return Vector of SGSInput objects, one for each input file that can be
562      * uploaded
563      */
564     public Vector getInputs()
565     {
566         return this.config.getInputs();
567     }
568     
569     /***
570      * Gets the names of all the output files
571      * @return Array of Strings, one for each output stream or file from which data
572      * can be read.
573      */
574     public String[] getOutputFileNames() throws StyxException
575     {
576         return getFileNamesAsArray(this.outputFiles);
577     }
578     
579     /***
580      * @return the full URL to the output file with the given name
581      * @throws IllegalArgumentException if there is no file with the given name
582      */
583     public String getOutputFileURL(String filename)
584     {
585         for (int i = 0; i < this.outputFiles.length; i++)
586         {
587             if (this.outputFiles[i].getName().equals(filename))
588             {
589                 return this.outputFiles[i].getURL();
590             }
591         }
592         throw new IllegalArgumentException("There is no output file called " + filename);
593     }
594     
595     /***
596      * Sends a message to get the command line arguments that will be executed.  Note that
597      * clients only need to call this once: the gotArguments() event on 
598      * all registered change listeners will be called automatically whenever
599      * the command line changes.
600      */
601     public void getArgumentsAsync()
602     {
603         this.readDataAsync(this.argsFile, true);
604     }
605     
606     /***
607      * Gets the command line arguments that will be executed.  This method
608      * blocks until the data are returned.
609      * @return the command line arguments that will be executed.
610      * @throws StyxException if there was an error getting the contents
611      */
612     public String getArguments() throws StyxException
613     {
614         return this.argsFile.getContents();
615     }
616     
617     /***
618      * Set the lifetime of this SGS instance.  This method will block until
619      * the lifetime is set.  This method reads the creation time from the server,
620      * adds the requested number of minutes, then sets the termination time.
621      * @param lifetimeInMinutes the lifetime of this instance in minutes.  The instance will
622      * automatically be destroyed at this time <b>after the instance was created</b>.
623      * @throws StyxException if the lifetime could not be set
624      */
625     public void setLifetime(double lifetimeInMinutes) throws StyxException
626     {
627         try
628         {
629             // Get the creation time of the service according to the server
630             String creationTimeStr = this.timeDir.getFile("creationTime").getContents();
631             Date creationTime = StyxUtils.parseXsdDateTime(creationTimeStr);
632             //System.out.println("creation time = " + creationTime + " ("
633             //    + creationTimeStr + ")");
634             long millisecondsToAdd = Math.round(lifetimeInMinutes * 60000);
635             Date terminationTime = new Date();
636             terminationTime.setTime(creationTime.getTime() + millisecondsToAdd);
637             String termTimeStr = StyxUtils.formatAsXsdDateTime(terminationTime);
638             //System.out.println("termination time = " + terminationTime + " ("
639             //    + termTimeStr + ")");
640             this.timeDir.getFile("terminationTime").setContents(termTimeStr);
641         }
642         catch(java.text.ParseException pe)
643         {
644             // Shouldn't happen
645             throw new StyxException("Error parsing creation time string from server");
646         }
647     }
648     
649     /***
650      * Sends a message to get the current value of the given piece of service
651      * data.  When the service data value is returned, the gotServiceDataValue()
652      * event will be fired on all registered change listeners.
653      * Apart from the case of <code>exitCode</code>, as soon as the server replies
654      * with the new service data value, a message will be sent to read the value
655      * again immediately and automatically.  Therefore you only need to call this
656      * method once for each element of service data.  In the case of <code>exitCode</code>
657      * there is no need to read the value more than once because the server will
658      * only reply when the remote service has stopped running: the exit code will
659      * never change after this reply.
660      * @param sdeName the name of the service data element to read
661      * @throws IllegalArgumentException if there is no element of service data
662      * with the given name
663      */
664     public void readServiceDataValueAsync(String sdeName)
665     {
666         for (int i = 0; i < this.serviceDataFiles.length; i++)
667         {
668             if (this.serviceDataFiles[i].getName().equals(sdeName))
669             {
670                 this.readDataAsync(this.serviceDataFiles[i], false);
671                 return;
672             }
673         }
674         // If we've got this far there isn't a piece of service data with the
675         // given name
676         throw new IllegalArgumentException("There is no service data element called "
677             + sdeName);
678     }
679     
680     /***
681      * Sends messages to get the current value of all pieces of service
682      * data.  When the service data value is returned, the gotServiceDataValue()
683      * event will be fired on all registered change listeners.
684      * Apart from the case of <code>exitCode</code>, as soon as the server replies
685      * with the new service data value, a message will be sent to read the value
686      * again immediately and automatically.  Therefore you only need to call this
687      * method once for each element of service data.  In the case of <code>exitCode</code>
688      * there is no need to read the value more than once because the server will
689      * only reply when the remote service has stopped running: the exit code will
690      * never change after this reply.
691      */
692     public void readAllServiceDataValuesAsync()
693     {
694         this.readDataAsync(this.serviceDataFiles, false);
695     }
696     
697     /***
698      * Sends messages to get the current value of all parameters.
699      * When the parameter value is returned, the gotParameterValue()
700      * event will be fired on all registered change listeners.  You only need
701      * to call this method once: every time the parameter value changes, the
702      * gotParameterValue() method will be fired.
703      */
704     public void readAllParameterValuesAsync()
705     {
706         this.readDataAsync(this.paramFiles, true);
707     }
708     
709     /***
710      * Sends messages to get the current value of all steerable parameters.
711      * When the parameter value is returned, the gotSteerableParameterValue()
712      * event will be fired on all registered change listeners.
713      */
714     public void readAllSteerableParameterValuesAsync()
715     {
716         this.readDataAsync(this.steeringFiles, true);
717     }
718     
719     /***
720      * Sets the file or URL from which an input file will get its data.  Note
721      * that many filenames or URLs can be associated with a single SGSInput
722      * object: this method adds, not replaces, a new one.
723      * @param inputFile SGSInput object representing the input file, as
724      * read using getInputs()
725      * @param filenameOrUrl If this String starts with the string "readfrom:", 
726      * this will be interpreted as a URL from which the server will read 
727      * the input file.  If not, this will be interpreted as the name of a local
728      * file.
729      * @throws IllegalStateException if there is an attempt to set more than
730      * one file or URL for a fixed input file or the standard input
731      * @throws FileNotFoundException if <code>filenameOrUrl</code> represents
732      * the name of a file that does not exist.
733      */
734     public void setInputSource(SGSInput inputFile, String filenameOrUrl)
735         throws FileNotFoundException
736     {
737         this.checkAllowMoreDataSources(inputFile);
738         Vector v = (Vector)this.filesToUpload.get(inputFile);
739         if (filenameOrUrl.startsWith("readfrom:"))
740         {
741             // This is a URL.  Just add it as a String
742             v.add(filenameOrUrl);
743             return;
744         }
745         else
746         {
747             // We interpret this to be a filename
748             File f = new File(filenameOrUrl);
749             if (!f.exists())
750             {
751                 throw new FileNotFoundException(filenameOrUrl + " does not exist");
752             }
753             v.add(f);
754         }
755     }
756     
757     /***
758      * Sets the local file from which an input file will get its data.  Note
759      * that many filenames or URLs can be associated with a single SGSInput
760      * object: this method adds, not replaces, a new one.
761      * @param inputFile SGSInput object representing the input file, as
762      * read using getInputs()
763      * @param file The file from which this input file will get its data
764      * @throws FileNotFoundException if <code>file</code> does not exist
765      * @throws IllegalStateException if there is an attempt to set more than
766      * one file or URL for a fixed input file or the standard input
767      */
768     public void setInputSource(SGSInput inputFile, File file)
769         throws FileNotFoundException
770     {
771         this.checkAllowMoreDataSources(inputFile);
772         if (!file.exists())
773         {
774             throw new FileNotFoundException(file.getName() + " does not exist");
775         }
776         Vector v = (Vector)this.filesToUpload.get(inputFile);
777         v.add(file);
778     }
779     
780     /***
781      * Checks to see if we are allowed to add more data sources to the given
782      * SGSInput object, throwing an IllegalStateException if we are not.
783      * @param inputFile the SGSInput object
784      * @throws IllegalStateException if <code>inputFile</code> is a stream
785      * or a fixed input file and we have already set an input source for this
786      * file. 
787      */
788     private void checkAllowMoreDataSources(SGSInput inputFile)
789     {
790         // First check to see if we have already set a data source for this SGSInput
791         if (this.filesToUpload.containsKey(inputFile))
792         {
793             if (inputFile.getType() != SGSInput.FILE_FROM_PARAM)
794             {
795                 throw new IllegalStateException("Cannot set more than one data source for "
796                     + inputFile.getName());
797             }
798         }
799         else
800         {
801             this.filesToUpload.put(inputFile, new Vector());
802         }
803     }
804     
805     /***
806      * Starts reading data from the given output file and redirects the data
807      * to the given PrintStream.  The same output cannot be redirected to multiple
808      * destinations (currently) so calling this method multiple times on the same
809      * output file name will have no effect.
810      * @param outputFileName Name of the output file (as returned by
811      * getOutputFileNames())
812      * @param dest The PrintStream (e.g. System.out) to which the data will be
813      * written.
814      * @throws IllegalArgumentException if there is no output file with the
815      * given name
816      */
817     public void redirectOutput(String outputFileName, PrintStream dest)
818     {
819         for (int i = 0; i < this.outputFiles.length; i++)
820         {
821             if (outputFileName.equals(this.outputFiles[i].getName()))
822             {
823                 // Don't do anything if we're already downloading from this file
824                 if (!this.filesToDownload.containsKey(this.outputFiles[i]))
825                 {
826                     this.filesToDownload.put(this.outputFiles[i], dest);
827                     this.outputFiles[i].addChangeListener(this);
828                     this.outputFiles[i].readAsync(0);
829                 }
830                 return;
831             }
832         }
833         throw new IllegalArgumentException(outputFileName + " is not a valid output file");
834     }
835         
836     /***
837      * Starts reading data from the given output file and redirects the data
838      * to the given local File.  The same output cannot be redirected to multiple
839      * destinations (currently) so calling this method multiple times on the same
840      * output file name will have no effect.
841      * @param outputFileName Name of the output file (as returned by
842      * getOutputFileNames())
843      * @param file the local file to which the data will be written.
844      * @throws FileNotFoundException if the local target file could not be
845      * created.
846      * @throws IllegalArgumentException if there is no output file with the
847      * given name
848      */
849     public void redirectOutput(String outputFileName, File file)
850         throws FileNotFoundException
851     {
852         this.redirectOutput(outputFileName,
853             new PrintStream(new FileOutputStream(file)));
854     }
855     
856     /***
857      * Sets the value of the parameter with the given name to the given value.
858      * This method blocks until the server responds with confirmation that the
859      * write is successful - this is not expected to take long.
860      * @param param The SGSParam object representing this parameter (as returned
861      * by this.getParameters()
862      * @param value The value of this parameter.
863      * @throws FileNotFoundException if the parameter represents an input file
864      * and the value represents a file that does not exist
865      * @throws StyxException if there was an error writing to the parameter file,
866      * or if the new value was invalid
867      */
868     public void setParameterValue(SGSParam param, String value)
869         throws FileNotFoundException, StyxException
870     {
871         this.setParameterValue(param, new String[]{value});
872     }
873     
874     /***
875      * Sets the value of the parameter with the given name to the given value.
876      * The input array is turned into a space-delimited String before being
877      * written to the server.
878      * This method blocks until the server responds with confirmation that the
879      * write is successful - this is not expected to take long.
880      * @param param The SGSParam object representing this parameter (as returned
881      * by this.getParameters()
882      * @param vals String array representing all the values for this parameter
883      * @throws FileNotFoundException if the parameter represents an input file
884      * and the value represents a file that does not exist
885      * @throws StyxException if there was an error writing to the parameter file,
886      * or if the new value was invalid
887      */
888     public void setParameterValue(SGSParam param, String[] vals)
889         throws FileNotFoundException, StyxException
890     {
891         CStyxFile paramFile = this.getParamFile(param);
892         paramFile.setContents(getParameterValue(param, vals));
893     }
894     
895     /***
896      * @return a String containing the value of the parameter exactly as it 
897      * will be written to the server, taking into account whether the parameter
898      * is an input file.
899      * @throws FileNotFoundException if the parameter represents an input file
900      * and the parameter value represents a file that does not exist
901      */
902     private String getParameterValue(SGSParam param, String[] vals)
903         throws FileNotFoundException
904     {
905         StringBuffer str = new StringBuffer();
906         if (vals != null && vals.length > 0)
907         {
908             for (int i = 0; i < vals.length; i++)
909             {
910                 if (param.getType() == SGSParam.INPUT_FILE)
911                 {
912                     // This parameter represents an input file
913                     this.setInputSource(param.getInputFile(), vals[i]);
914                     if (!vals[i].startsWith("readfrom:"))
915                     {
916                         // This is a file.  Replace the full path of the file
917                         // with just its name
918                         vals[i] = new File(vals[i]).getName();
919                     }
920                 }
921                 str.append(vals[i]);
922                 if (i < vals.length - 1)
923                 {
924                     str.append(" ");
925                 }
926             }
927         }
928         return str.toString();
929     }
930     
931     /***
932      * Sets the value of the parameter with the given name to the given value.
933      * This method returns immediately.  In order to receive confirmation that 
934      * the write has been successful, you must previously have called 
935      * readAllParameterValuesAsync(): when the parameter value has been set, 
936      * the gotParameterValue() method will be fired on all registered change
937      * listeners.  If the write was unsuccessful, the error() event will be
938      * fired on all registered change listeners.
939      * @throws FileNotFoundException if the parameter represents an input file
940      * and the value represents a file that does not exist
941      */
942     public void setParameterValueAsync(SGSParam param, String value)
943         throws FileNotFoundException
944     {
945         CStyxFile paramFile = this.getParamFile(param);
946         paramFile.writeAsync(this.getParameterValue(param, new String[]{value}), 0);
947     }
948     
949     /***
950      * @return the CStyxFile representing the given parameter, or
951      * null if this file does not exist
952      */
953     private CStyxFile getParamFile(SGSParam param)
954     {
955         for (int i = 0; i < this.paramFiles.length; i++)
956         {
957             if (this.paramFiles[i].getName().equals(param.getName()))
958             {
959                 return this.paramFiles[i];
960             }
961         }
962         return null;
963     }
964     
965     /***
966      * Sets the value of the steerable parameter with the given name to the given value.
967      * This method returns immediately.  In order to receive confirmation that 
968      * the write has been successful, you must previously have called 
969      * readAllSteerableParameterValuesAsync(): when the parameter value has been set, 
970      * the gotSteerableParameterValue() method will be fired on all registered change
971      * listeners.  If the write was unsuccessful, the error() event will be
972      * fired on all registered change listeners.
973      * @throws IllegalArgumentException if a parameter with the given name
974      * does not exist.
975      */
976     public void setSteerableParameterValueAsync(String name, String value)
977     {
978         for (int i = 0; i < this.steeringFiles.length; i++)
979         {
980             if (this.steeringFiles[i].getName().equals(name))
981             {
982                 // TODO this assumes (reasonably) that the new parameter value
983                 // can be written in a single Styx message.
984                 this.steeringFiles[i].writeAsync(value, 0);
985                 return;
986             }
987         }
988         throw new IllegalArgumentException("Steerable parameter " + name + " does not exist");
989     }
990     
991     /***
992      * Uploads the input files to the server.  This method blocks until the
993      * upload is complete.
994      * @todo Add some progress information to this (e.g. a callback)
995      * @todo Shouldn't have to call this explicitly: should be done when we
996      * start the service.
997      */
998     public void uploadInputFiles() throws StyxException
999     {
1000         // Cycle through each input file
1001         for (Enumeration en = this.filesToUpload.keys(); en.hasMoreElements(); )
1002         {
1003             SGSInput inputFile = (SGSInput)en.nextElement();
1004             // Look for all the files and URLs associated with this input file
1005             Vector filesAndUrls = (Vector)this.filesToUpload.get(inputFile);
1006             for (Iterator it = filesAndUrls.iterator(); it.hasNext(); )
1007             {
1008                 Object fileOrUrl = it.next();
1009                 if (fileOrUrl instanceof File)
1010                 {
1011                     File f = (File)fileOrUrl;
1012                     if (inputFile.getType() == SGSInput.STREAM)
1013                     {
1014                         // Don't upload any data: wait for the service to start,
1015                         // then upload in a separate thread (TODO)
1016                         this.stdinSrc = f;
1017                     }
1018                     else
1019                     {
1020                         CStyxFile targetFile;
1021                         if (inputFile.getType() == SGSInput.FILE)
1022                         {
1023                             // The name of the target file is fixed
1024                             targetFile = this.inputsDir.getFile(inputFile.getName());
1025                         }
1026                         else
1027                         {
1028                             // This must be a file whose name is given by a parameter
1029                             targetFile = this.inputsDir.getFile(f.getName());
1030                         }
1031                         log.debug("Uploading " + fileOrUrl + " to " + targetFile.getPath() + "...");
1032                         targetFile.upload(f);
1033                         log.debug("Upload of " + fileOrUrl + " complete.");
1034                     }
1035                 }
1036                 else
1037                 {
1038                     // This is a URL to a file.  We do not set this for an input
1039                     // file that is set by a parameter: this is handled separately
1040                     // by the server
1041                     if (inputFile.getType() != SGSInput.FILE_FROM_PARAM)
1042                     {
1043                         CStyxFile targetFile = this.inputsDir.getFile(inputFile.getName());
1044                         log.debug("Setting URL (" + fileOrUrl + ") for " + targetFile.getPath());
1045                         targetFile.setContents((String)fileOrUrl);
1046                         log.debug("URL for " + targetFile.getPath() + " set.");
1047                         if (inputFile.getType() == SGSInput.STREAM)
1048                         {
1049                             // Setting stdinSrc to a String is the signal that
1050                             // we have already set the stdin URL and there is no
1051                             // need to upload data from the local client
1052                             this.stdinSrc = (String)fileOrUrl;
1053                         }
1054                     }
1055                 }
1056             }
1057         }
1058     }
1059     
1060     /***
1061      * Uploads the input files to the server.  This method returns immediately:
1062      * the XXX method will be fired on all registered change listeners when 
1063      * each input file is uploaded.
1064      */
1065     public void uploadInputFilesAsync()
1066     {
1067         new InputFilesUploader().nextStage();
1068     }
1069     
1070     private class InputFilesUploader extends MessageCallback
1071     {
1072         Enumeration en = filesToUpload.keys();
1073         SGSInput inputFile;
1074         Iterator filesAndUrls;
1075         
1076         public void nextStage()
1077         {
1078             if (filesAndUrls == null || !filesAndUrls.hasNext())
1079             {
1080                 // Need to move to the next input file
1081                 if (en.hasMoreElements())
1082                 {
1083                     this.inputFile = (SGSInput)en.nextElement();
1084                     filesAndUrls = ((Vector)filesToUpload.get(this.inputFile)).iterator();
1085                 }
1086                 else
1087                 {
1088                     // There are no more files to upload
1089                     // TODO move to the next stage
1090                     return;
1091                 }
1092             }
1093             CStyxFile targetFile;
1094             Object fileOrUrl = filesAndUrls.next();
1095             if (fileOrUrl instanceof File)
1096             {
1097                 File f = (File)fileOrUrl;
1098                 if (this.inputFile.getType() == SGSInput.STREAM)
1099                 {
1100                     // Don't upload any data: wait for the service to start,
1101                     // then upload in a separate thread (TODO)
1102                     stdinSrc = f;
1103                 }
1104                 else
1105                 {
1106                     if (this.inputFile.getType() == SGSInput.FILE)
1107                     {
1108                         // The name of the target file is fixed
1109                         targetFile = inputsDir.getFile(this.inputFile.getName());
1110                     }
1111                     else
1112                     {
1113                         // This must be a file whose name is given by a parameter
1114                         targetFile = inputsDir.getFile(f.getName());
1115                     }
1116                     targetFile.uploadAsync(f, this);
1117                 }
1118             }
1119             else
1120             {
1121                 // This is a URL to a file.  We do not set this for an input
1122                 // file that is set by a parameter: this is handled separately
1123                 // by the server
1124                 if (this.inputFile.getType() != SGSInput.FILE_FROM_PARAM)
1125                 {
1126                     targetFile = inputsDir.getFile(this.inputFile.getName());
1127                     // TODO: do we now have to write EOF and close the file?
1128                     targetFile.writeAsync((String)fileOrUrl, 0, this);
1129                     if (inputFile.getType() == SGSInput.STREAM)
1130                     {
1131                         // Setting stdinSrc to a String is the signal that
1132                         // we have already set the stdin URL and there is no
1133                         // need to upload data from the local client
1134                         stdinSrc = (String)fileOrUrl;
1135                     }
1136                 }
1137             }
1138         }
1139         
1140         public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
1141         {
1142             // TODO: notify clients that file has been uploaded
1143             // Just move to the next file
1144             this.nextStage();
1145         }
1146         
1147         public void error(String message, StyxMessage tMessage)
1148         {
1149         }
1150     }
1151     
1152     /***
1153      * <p>Gets the values of all service data elements as a Hashtable, in which the
1154      * keys are the service data element names (Strings) and the values are the
1155      * service data values (also Strings).  The first time this method is called
1156      * it will return immediately with the data.  Subsequent calls will block
1157      * until <b>any one</b> of the service data values change.</p>
1158      * <p>The procedure followed is to open each file for reading, then send
1159      * a message to read each file, returning the data when it arrives.  The
1160      * files are <b>not</b> closed between calls to this function (but they might
1161      * be closed through the use of other functions)</p>
1162      */
1163     public Hashtable getServiceDataValues() throws StyxException
1164     {
1165         // Start reading the service data.  If we have already called this,
1166         // this will do nothing.
1167         this.readAllServiceDataValuesAsync();
1168         log.debug("Getting service data");
1169         log.debug("Version: " + sdeValuesVersion + ", Version last read: "
1170             + sdeValuesVersionLastRead);
1171         synchronized(this.sdeValues)
1172         {
1173             // Check the version of the hashtable and if it hasn't changed, wait
1174             // until it does
1175             while (this.sdeValuesVersionLastRead >= this.sdeValuesVersion)
1176             {
1177                 try
1178                 {
1179                     log.debug("Waiting for update to hashtable");
1180                     this.sdeValues.wait();
1181                     log.debug("Got update to hashtable");
1182                     log.debug("Version: " + sdeValuesVersion + ", Version last read: "
1183                         + sdeValuesVersionLastRead);
1184                 }
1185                 catch(InterruptedException ie)
1186                 {
1187                     // Do nothing
1188                 }
1189             }
1190             // We've got out of the loop so there must have been a change to the
1191             // service data hashtable
1192             this.sdeValuesVersionLastRead = this.sdeValuesVersion;
1193         }
1194         log.debug("Returning hashtable");
1195         return this.sdeValues;
1196     }
1197     
1198     /***
1199      * <p>Read data in an asynchronous manner from all the files in the input array.
1200      * This can be used to read from parameter files, steerable parameter files,
1201      * service data files or the command line file.  Unpredictable results may
1202      * occur if other files are passed to this method.</p>
1203      * <p>The behaviour of this is as follows: A message is sent to all files 
1204      * in the input array to start reading.  When the file has been completely
1205      * read, the appropriate event will be fired on all registered change listeners:
1206      * </p>
1207      * <table><tbody><tr><th>File type</th><th>Event fired</th></tr>
1208      * <tr><td>Parameter</td><td>gotParameterValue()</td></tr>
1209      * <tr><td>Service data</td><td>gotServiceDataValue()</td></tr>
1210      * <tr><td>Steerable parameter</td><td>gotSteerableParameterValue()</td></tr>
1211      * <tr><td>Command line</td><td>gotCommandLine()</td></tr></tbody></table>
1212      * <p>The reading of the file will go on <b>indefinitely</b> (i.e. until the
1213      * connection is closed - TODO: should stop when service stops).  When each
1214      * file has been completely read, it will immediately be read again from the
1215      * beginning.  This method therefore only needs to be called <b>once</b>
1216      * for each input file.</p>
1217      * @param files The files from which we will read data
1218      * @param openForWriting if true, we will open the files for reading and
1219      * writing (necessary for parameter files).
1220      */
1221     private void readDataAsync(CStyxFile[] files, boolean openForWriting)
1222     {
1223         for (int i = 0; i < files.length; i++)
1224         {
1225             this.readDataAsync(files[i], openForWriting);
1226         }
1227     }
1228     
1229     /***
1230      * <p>Read data in an asynchronous manner from the given file.
1231      * This can be used to read from parameter files, steerable parameter files,
1232      * service data files or the command line file.  Unpredictable results may
1233      * occur if other files are passed to this method.</p>
1234      * <p>The behaviour of this is as follows: A message is sent to start
1235      * reading from the file.  When the file has been completely
1236      * read, the appropriate event will be fired on all registered change listeners:
1237      * </p>
1238      * <p>If we are already reading from the given file, this method will do
1239      * nothing.</p>
1240      * <table><tbody><tr><th>File type</th><th>Event fired</th></tr>
1241      * <tr><td>Parameter</td><td>gotParameterValue()</td></tr>
1242      * <tr><td>Service data</td><td>gotServiceDataValue()</td></tr>
1243      * <tr><td>Steerable parameter</td><td>gotSteerableParameterValue()</td></tr>
1244      * <tr><td>Command line</td><td>gotCommandLine()</td></tr></tbody></table>
1245      * <p>The reading of the file will go on <b>indefinitely</b> (i.e. until the
1246      * connection is closed - TODO: should stop when service stops).  When each
1247      * file has been completely read, it will immediately be read again from the
1248      * beginning.  This method therefore only needs to be called <b>once</b>
1249      * for each input file.</p>
1250      * @param files The file from which we will read data
1251      * @param openForWriting if true, we will open the file for reading and
1252      * writing (necessary for parameter files).
1253      */
1254     private void readDataAsync(CStyxFile file, boolean openForWriting)
1255     {
1256         // Check to see if we are already reading from this file
1257         if (!this.filesBeingRead.contains(file))
1258         {
1259             // Create a StringBuffer to hold the data from this file, then put it
1260             // in the Hashtable, keyed by this file
1261             this.filesBeingRead.add(file);
1262             this.bufs.put(file, new StringBuffer());
1263             file.addChangeListener(this);
1264             file.readAsync(0, openForWriting);
1265         }
1266     }
1267     
1268     /***
1269      * @return The ID of the SGS instance to which this client is connected
1270      */
1271     public String getInstanceID()
1272     {
1273         return this.instanceRoot.getName();
1274     }
1275     
1276     /***
1277      * Gets a CachedStreamReader that can be used to read from the given
1278      * stream
1279      * This is not used in the current implementation of SGS but might be in
1280      * the future, perhaps to support GUI applications
1281      */
1282     public CachedStreamReader getStreamReader(CStyxFile stream)
1283     {
1284         // TODO: check that "stream" really does represent an output stream
1285         if (this.activeStreams.containsKey(stream))
1286         {
1287             return (CachedStreamReader)this.activeStreams.get(stream);
1288         }
1289         else
1290         {
1291             try
1292             {
1293                 CachedStreamReader reader = new CachedStreamReader(stream);
1294                 this.activeStreams.put(stream, reader);
1295                 return reader;
1296             }
1297             catch (IOException ioe)
1298             {
1299                 // TODO: do something more useful here
1300                 ioe.printStackTrace();
1301                 return null;
1302             }
1303         }
1304     }
1305     
1306     /***
1307      * Thread to redirect input from a given input stream to the standard
1308      * input of the SGS instance
1309      * @todo recreates code in StyxGridServiceInstance.RedirectStream: refactor
1310      */
1311     private class StdinReader extends Thread
1312     {
1313         private InputStream in;
1314         public StdinReader(InputStream in)
1315         {
1316             this.in = in;
1317         }
1318         public void run()
1319         {
1320             OutputStream os = null;
1321             try
1322             {
1323                 os = new CStyxFileOutputStream(inputsDir.getFile("stdin"));
1324                 byte[] b = new byte[1024]; // Read 1KB at a time
1325                 int n = 0;
1326                 do
1327                 {
1328                     n = in.read(b);
1329                     if (n >= 0)
1330                     {
1331                         os.write(b, 0, n);
1332                         os.flush();
1333                     }
1334                 } while (n >= 0);
1335             }
1336             catch (StyxException se)
1337             {
1338                 log.error("Error opening stream to standard input");
1339             }
1340             catch (IOException ioe)
1341             {
1342                 log.error("IOException when writing to standard input: "
1343                     + ioe.getMessage());
1344                 if (log.isDebugEnabled())
1345                 {
1346                     ioe.printStackTrace();
1347                 }
1348             }
1349             finally
1350             {
1351                 try
1352                 {
1353                     if (os != null)
1354                     {
1355                         // This will write a zero-byte message to confirm EOF
1356                         os.close();
1357                     }
1358                 }
1359                 catch (IOException ex)
1360                 {
1361                     // Ignore errors here
1362                 }
1363             }
1364         }
1365     }
1366     
1367     /***
1368      * This is called when we have read data asynchronously using readDataAsync().
1369      */
1370     public synchronized void dataArrived(CStyxFile file, TreadMessage tReadMsg,
1371         ByteBuffer data)
1372     {
1373         // Get the PrintStream that belongs to this file.  This will only give
1374         // a PrintStream if the file is an output file
1375         PrintStream prtStr = (PrintStream)this.filesToDownload.get(file);
1376         StringBuffer strBuf = null;
1377         if (prtStr == null)
1378         {
1379             // This is not an output file.  Get the StringBuffer that belongs to
1380             // this file (which is service data, a parameter file, etc)
1381             strBuf = (StringBuffer)this.bufs.get(file);
1382             if (strBuf == null)
1383             {
1384                 // This should never happen
1385                 throw new IllegalStateException("Internal error: strBuf and prtStr are both null");
1386             }
1387         }
1388         if (data.remaining() > 0)
1389         {
1390             // Calculate the offset of the next read
1391             long offset = tReadMsg.getOffset().asLong() + data.remaining();
1392             if (prtStr == null)
1393             {
1394                 // This is not an output file. Add the new data to the buffer.
1395                 strBuf.append(StyxUtils.dataToString(data));
1396             }
1397             else
1398             {
1399                 // This is an output file.  Write the data to the stream
1400                 byte[] buf = new byte[data.remaining()];
1401                 data.get(buf);
1402                 prtStr.write(buf, 0, buf.length);
1403             }
1404             // Read the next chunk of data from the file, whatever it was
1405             file.readAsync(offset);
1406         }
1407         else
1408         {
1409             // We have zero bytes from the file (i.e. EOF), so we know we have
1410             // the complete data for the file.
1411             // We now need to know what sort of file this is
1412             boolean readAgain = true;
1413             boolean found = false;
1414             // If this is an output file, close the stream
1415             if (prtStr != null)
1416             {
1417                 found = true;
1418                 readAgain = false;
1419                 // We don't close the standard streams
1420                 if (prtStr != System.out && prtStr != System.err)
1421                 {
1422                     prtStr.close();
1423                 }
1424                 // See if we have downloaded all output data
1425                 this.filesToDownload.remove(file);
1426                 if (this.filesToDownload.size() == 0)
1427                 {
1428                     this.fireAllOutputDataDownloaded();
1429                 }
1430             }
1431             // See if this is the arguments file
1432             if (!found && file == this.argsFile)
1433             {
1434                 found = true;
1435                 this.fireGotArguments(strBuf.toString());
1436             }
1437             // Check to see if this is a service data file
1438             for (int i = 0; !found && i < this.serviceDataFiles.length; i++)
1439             {
1440                 if (file == this.serviceDataFiles[i])
1441                 {
1442                     found = true;
1443                     if (file.getName().equals("exitCode"))
1444                     {
1445                         // We don't read the exit code file again because its
1446                         // contents will never change once set
1447                         readAgain = false;
1448                         try
1449                         {
1450                             int exitCode = Integer.parseInt(strBuf.toString());
1451                             this.fireGotExitCode(exitCode);
1452                         }
1453                         catch(NumberFormatException nfe)
1454                         {
1455                             this.fireError("Invalid exit code received (" +
1456                                 strBuf.toString() + ")");
1457                         }
1458                     }
1459                     this.fireGotServiceDataValue(file.getName(), strBuf.toString());
1460                 }
1461             }
1462             // Now check to see if this is a parameter file
1463             for (int i = 0; !found && i < this.paramFiles.length; i++)
1464             {
1465                 if (file == this.paramFiles[i])
1466                 {
1467                     found = true;
1468                     this.fireGotParameterValue(file.getName(), strBuf.toString());
1469                 }
1470             }
1471             // Now check to see if this is a steerable parameter file
1472             for (int i = 0; !found && i < this.steeringFiles.length; i++)
1473             {
1474                 if (file == this.steeringFiles[i])
1475                 {
1476                     found = true;
1477                     this.fireGotSteerableParameterValue(file.getName(), strBuf.toString());
1478                 }
1479             }
1480             // Clear the buffer and start reading again from the file
1481             if (strBuf != null)
1482             {
1483                 strBuf.setLength(0);
1484             }
1485             if (readAgain)
1486             {
1487                 file.readAsync(0);
1488             }
1489         }
1490     }
1491     
1492     /***
1493      * Required by the CStyxFileChangeListener interface. Called when confirmation
1494      * arrives that a message has been written to the ctl file
1495      */
1496     public void dataWritten(CStyxFile file, TwriteMessage tWriteMsg)
1497     {
1498         if (file == this.ctlFile)
1499         {
1500             // We need to find out what the original message was
1501             String message = StyxUtils.dataToString(tWriteMsg.getData());
1502             if (message.equalsIgnoreCase("start"))
1503             {
1504                 // Start writing data to standard input if necessary
1505                 this.uploadToStdin();
1506                 this.fireServiceStarted();
1507             }
1508             else if (message.equalsIgnoreCase("stop"))
1509             {
1510                 this.fireServiceAborted();
1511             }
1512         }
1513     }
1514     
1515     /***
1516      * Required by the StyxFileChangeListener interface. Called when an Rerror
1517      * message has arrived
1518      */
1519     public void error(CStyxFile file, String message)
1520     {
1521         this.fireError(message);
1522     }
1523     
1524     /***
1525      * Closes the underlying StyxConnection
1526      */
1527     public void close()
1528     {
1529         this.instanceRoot.getConnection().close();
1530     }
1531     
1532     /***
1533      * Adds a listener that will be notified of changes to this SGS. If the
1534      * listener is already registered, this will do nothing.
1535      */
1536     public void addChangeListener(SGSInstanceClientChangeListener listener)
1537     {
1538         synchronized(this.changeListeners)
1539         {
1540             if (!this.changeListeners.contains(listener))
1541             {
1542                 this.changeListeners.add(listener);
1543             }
1544         }
1545     }
1546     
1547     /***
1548      * Removes a SGSInstanceChangeListener.  (Note that this will only remove the first
1549      * instance of a given SGSInstanceChangeListener.  If, for some reason, more than one 
1550      * copy of the same change listener has been registered, this method will
1551      * only remove the first.)
1552      */
1553     public void removeChangeListener(SGSInstanceClientChangeListener listener)
1554     {
1555         synchronized(this.changeListeners)
1556         {
1557             boolean contained = this.changeListeners.remove(listener);
1558         }
1559     }
1560     
1561     /***
1562      * Fires the gotServiceDataValue() event on all registered change listeners
1563      */
1564     private void fireGotServiceDataValue(String sdName, String newData)
1565     {
1566         // Update the Hashtable of service data values
1567         synchronized(this.sdeValues)
1568         {
1569             this.sdeValues.put(sdName, newData);
1570             // Update the version and notify any waiting clients
1571             this.sdeValuesVersion++;
1572             this.sdeValues.notifyAll();
1573         }
1574         synchronized(this.changeListeners)
1575         {
1576             SGSInstanceClientChangeListener listener;
1577             for (int i = 0; i < this.changeListeners.size(); i++)
1578             {
1579                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1580                 listener.gotServiceDataValue(sdName, newData);
1581             }
1582         }
1583     }
1584     
1585     /***
1586      * Fires the serviceAborted() event on all registered change listeners
1587      */
1588     private void fireServiceStarted()
1589     {
1590         synchronized(this.changeListeners)
1591         {
1592             SGSInstanceClientChangeListener listener;
1593             for (int i = 0; i < this.changeListeners.size(); i++)
1594             {
1595                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1596                 listener.serviceStarted();
1597             }
1598         }
1599     }
1600     
1601     /***
1602      * Fires the serviceAborted() event on all registered change listeners
1603      */
1604     private void fireServiceAborted()
1605     {
1606         synchronized(this.changeListeners)
1607         {
1608             SGSInstanceClientChangeListener listener;
1609             for (int i = 0; i < this.changeListeners.size(); i++)
1610             {
1611                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1612                 listener.serviceAborted();
1613             }
1614         }
1615     }
1616     
1617     /***
1618      * Fires the error() event on all registered change listeners
1619      */
1620     private void fireError(String message)
1621     {
1622         synchronized(this.changeListeners)
1623         {
1624             SGSInstanceClientChangeListener listener;
1625             for (int i = 0; i < this.changeListeners.size(); i++)
1626             {
1627                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1628                 listener.error(message);
1629             }
1630         }
1631     }
1632     
1633     /***
1634      * Fires the gotParameterValue() event on all registered change listeners.
1635      * @param name Name of the parameter
1636      * @param value The new value of the parameter
1637      */
1638     private void fireGotParameterValue(String name, String value)
1639     {
1640         synchronized(this.changeListeners)
1641         {
1642             SGSInstanceClientChangeListener listener;
1643             for (int i = 0; i < this.changeListeners.size(); i++)
1644             {
1645                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1646                 listener.gotParameterValue(name, value);
1647             }
1648         }
1649     }
1650     
1651     /***
1652      * Fires the gotSteerableParameterValue() event on all registered change listeners.
1653      * @param name Name of the parameter
1654      * @param value The new value of the parameter
1655      */
1656     private void fireGotSteerableParameterValue(String name, String value)
1657     {
1658         synchronized(this.changeListeners)
1659         {
1660             SGSInstanceClientChangeListener listener;
1661             for (int i = 0; i < this.changeListeners.size(); i++)
1662             {
1663                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1664                 listener.gotSteerableParameterValue(name, value);
1665             }
1666         }
1667     }
1668     
1669     /***
1670      * Fires the inputFilesUploaded() event on all registered change listeners
1671      */
1672     private void fireInputFilesUploaded()
1673     {
1674         synchronized(this.changeListeners)
1675         {
1676             SGSInstanceClientChangeListener listener;
1677             for (int i = 0; i < this.changeListeners.size(); i++)
1678             {
1679                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1680                 listener.inputFilesUploaded();
1681             }
1682         }
1683     }
1684     
1685     /***
1686      * Fires the gotArguments() event on all registered change listeners
1687      */
1688     private void fireGotArguments(String newArgs)
1689     {
1690         synchronized(this.changeListeners)
1691         {
1692             SGSInstanceClientChangeListener listener;
1693             for (int i = 0; i < this.changeListeners.size(); i++)
1694             {
1695                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1696                 listener.gotArguments(newArgs);
1697             }
1698         }
1699     }
1700     
1701     /***
1702      * Fires the gotExitCode() event on all registered change listeners
1703      */
1704     private void fireGotExitCode(int exitCode)
1705     {
1706         synchronized(this.changeListeners)
1707         {
1708             SGSInstanceClientChangeListener listener;
1709             for (int i = 0; i < this.changeListeners.size(); i++)
1710             {
1711                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1712                 listener.gotExitCode(exitCode);
1713             }
1714         }
1715     }
1716     
1717     /***
1718      * Fires the allOutputDataDownloaded() event on all registered change listeners
1719      */
1720     private void fireAllOutputDataDownloaded()
1721     {
1722         synchronized(this.changeListeners)
1723         {
1724             SGSInstanceClientChangeListener listener;
1725             for (int i = 0; i < this.changeListeners.size(); i++)
1726             {
1727                 listener = (SGSInstanceClientChangeListener)this.changeListeners.get(i);
1728                 listener.allOutputDataDownloaded();
1729             }
1730         }
1731     }
1732     
1733     
1734 }