View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   * 
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.gridservice.server;
30  
31  import java.net.URL;
32  import java.net.MalformedURLException;
33  import java.net.URLConnection;
34  
35  import java.io.InputStream;
36  import java.io.OutputStream;
37  import java.io.FileOutputStream;
38  import java.io.IOException;
39  import java.io.File;
40  import java.io.RandomAccessFile;
41  import java.io.FileNotFoundException;
42  
43  import java.util.Vector;
44  import java.util.Iterator;
45  import java.util.Date;
46  
47  import org.apache.mina.common.ByteBuffer;
48  import org.apache.log4j.Logger;
49  
50  import com.martiansoftware.jsap.JSAP;
51  import com.martiansoftware.jsap.Parameter;
52  import com.martiansoftware.jsap.Switch;
53  import com.martiansoftware.jsap.FlaggedOption;
54  import com.martiansoftware.jsap.UnflaggedOption;
55  import com.martiansoftware.jsap.JSAPResult;
56  
57  import uk.ac.rdg.resc.jstyx.server.StyxFile;
58  import uk.ac.rdg.resc.jstyx.server.StyxFileChangeListener;
59  import uk.ac.rdg.resc.jstyx.server.AsyncStyxFile;
60  import uk.ac.rdg.resc.jstyx.server.StyxDirectory;
61  import uk.ac.rdg.resc.jstyx.server.StyxFileClient;
62  import uk.ac.rdg.resc.jstyx.server.InMemoryFile;
63  import uk.ac.rdg.resc.jstyx.server.FileOnDisk;
64  import uk.ac.rdg.resc.jstyx.server.DirectoryOnDisk;
65  import uk.ac.rdg.resc.jstyx.server.MonitoredFileOnDisk;
66  
67  import uk.ac.rdg.resc.jstyx.StyxException;
68  import uk.ac.rdg.resc.jstyx.StyxUtils;
69  import uk.ac.rdg.resc.jstyx.types.ULong;
70  
71  import uk.ac.rdg.resc.jstyx.gridservice.config.*;
72  
73  /***
74   * Class representing a StyxGridService instance
75   *
76   * @author Jon Blower
77   * $Revision: 609 $
78   * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
79   * $Log$
80   * Revision 1.48  2006/02/23 09:06:25  jonblower
81   * Fixed bug in destroy(): instances and their cached files are now destroyed properly
82   *
83   * Revision 1.47  2006/02/20 17:35:01  jonblower
84   * Implemented correct handling of output files/streams (not fully tested yet)
85   *
86   * Revision 1.46  2006/02/17 17:34:12  jonblower
87   * Changes to comments
88   *
89   * Revision 1.45  2006/02/17 09:24:02  jonblower
90   * Changed so that output files are added to the namespace on initialization and 
91   * parameters representing output files are not present in the namespace
92   *
93   * Revision 1.44  2006/01/04 16:45:29  jonblower
94   * Implemented automatic termination of SGS instances using Quartz scheduler
95   *
96   * Revision 1.43  2006/01/04 11:24:58  jonblower
97   * Implemented time directory in the SGS instance namespace
98   *
99   * Revision 1.42  2005/12/07 17:47:58  jonblower
100  * Changed "commandline" file to "args" - now just contains arguments, not program name
101  *
102  * Revision 1.41  2005/12/01 08:37:48  jonblower
103  * Changed ID from int to String
104  *
105  * Revision 1.40  2005/11/14 21:31:54  jonblower
106  * Got SGSRun working for SC2005 demo
107  *
108  * Revision 1.39  2005/11/11 21:57:21  jonblower
109  * Implemented passing of URLs to input files
110  *
111  * Revision 1.38  2005/11/10 19:50:43  jonblower
112  * Added code to handle output files
113  *
114  * Revision 1.37  2005/11/09 18:01:31  jonblower
115  * Changed way that input files are exposed and their relation to parameters
116  *
117  * Revision 1.36  2005/11/04 19:29:53  jonblower
118  * Moved code that writes to std input to SGSInputFile
119  *
120  * Revision 1.35  2005/11/04 09:11:23  jonblower
121  * Made SGSParamFile inherit from AsyncStyxFile instead of InMemoryFile
122  *
123  * Revision 1.34  2005/11/03 07:42:47  jonblower
124  * Implemented JSAP-based parameter parsing
125  *
126  * Revision 1.30  2005/10/18 14:08:14  jonblower
127  * Removed inputfiles from namespace
128  *
129  * Revision 1.29  2005/10/14 18:03:23  jonblower
130  * Fixed bug with writing to input stream
131  *
132  * Revision 1.28  2005/09/08 07:08:59  jonblower
133  * Removed "String user" from list of parameters to StyxFile.write()
134  *
135  * Revision 1.27  2005/08/31 17:08:54  jonblower
136  * Fixed bug with handling exception when Process could not be created
137  *
138  * Revision 1.26  2005/08/30 16:29:00  jonblower
139  * Added processAndReplyRead() helper functions to StyxFile
140  *
141  * Revision 1.24  2005/08/02 08:05:18  jonblower
142  * Continuing to implement steering
143  *
144  * Revision 1.22  2005/08/01 16:38:05  jonblower
145  * Implemented simple parameter handling
146  *
147  * Revision 1.21  2005/07/29 16:56:07  jonblower
148  * Implementing reading command line asynchronously
149  *
150  * Revision 1.20  2005/06/20 07:17:35  jonblower
151  * Wrapped SGSParamFile as AsyncStyxFile
152  *
153  * Revision 1.19  2005/06/14 07:45:16  jonblower
154  * Implemented setting of params and async notification of parameter changes
155  *
156  * Revision 1.18  2005/06/10 07:53:12  jonblower
157  * Changed SGS namespace: removed "inurl" and subsumed functionality into "stdin"
158  *
159  * Revision 1.17  2005/05/27 17:05:07  jonblower
160  * Changes to incorporate GeneralCachingStreamReader
161  *
162  * Revision 1.16  2005/05/26 21:24:44  jonblower
163  * Added exitCode as a new service data element
164  *
165  * Revision 1.15  2005/05/26 16:50:57  jonblower
166  * Fixed bug with input files directory
167  *
168  * Revision 1.14  2005/05/19 18:42:07  jonblower
169  * Implementing specification of input files required by SGS
170  *
171  * Revision 1.13  2005/05/16 11:00:53  jonblower
172  * Changed SGS config XML file structure: separated input and output streams and changed some tag names
173  *
174  * Revision 1.12  2005/05/13 16:49:34  jonblower
175  * Coded dynamic detection and display of service data, also included streams in config file
176  *
177  * Revision 1.11  2005/05/11 15:14:31  jonblower
178  * Implemented more flexible definition of service data elements
179  *
180  * Revision 1.9  2005/04/27 16:11:43  jonblower
181  * Added capability to add documentation files to SGS namespace
182  *
183  * Revision 1.8  2005/04/26 07:46:11  jonblower
184  * Continuing to improve setting of parameters in Styx Grid Services
185  *
186  * Revision 1.7  2005/03/24 17:33:51  jonblower
187  * Improved reading of service parameters from config file
188  *
189  * Revision 1.6  2005/03/24 14:47:47  jonblower
190  * Provided default read() and write() methods for StyxFile so it is no longer abstract
191  *
192  * Revision 1.5  2005/03/24 09:48:31  jonblower
193  * Changed 'count' from long to int throughout for reading and writing
194  *
195  * Revision 1.4  2005/03/24 07:57:41  jonblower
196  * Improved code for reading SSL info from SGSconfig file and included parameter
197  * information for the Grid Services in the config file
198  *
199  * Revision 1.3  2005/03/19 21:47:02  jonblower
200  * Further fixes relating to releasing ByteBuffers
201  *
202  * Revision 1.2  2005/03/18 16:45:18  jonblower
203  * Released ByteBuffers after use
204  *
205  * Revision 1.1  2005/03/16 22:16:44  jonblower
206  * Added Styx Grid Service classes to core module
207  *
208  * Revision 1.2  2005/03/16 17:59:35  jonblower
209  * Changed following changes to core JStyx library (replacement of
210  * java.nio.ByteBuffers with MINA's ByteBuffers)
211  *
212  * Revision 1.1  2005/02/16 19:22:32  jonblower
213  * Commit adding of SGS files to CVS
214  *
215  */
216 class StyxGridServiceInstance extends StyxDirectory
217 {
218     
219     private static final Logger log = Logger.getLogger(StyxGridServiceInstance.class);
220     private static final Runtime runtime = Runtime.getRuntime();
221     
222     private StyxGridService sgs; // The SGS to which this instance belongs
223     private String id; // The ID of this instance
224     private SGSConfig sgsConfig; // The configuration object used to create this instance
225     private File workDir; // The working directory of this instance
226     private Process process = null; // The process object returned by runtime.exec()
227     private StatusCode statusCode;
228     private ServiceDataElement status; // The status of the service
229     private ServiceDataElement bytesConsumed; // The number of bytes consumed by the service
230     private ExitCodeFile exitCodeFile; // The exit code from the executable
231     private CachingStreamReader stdout = new CachingStreamReader(this, "stdout");  // The standard output from the program
232     private CachingStreamReader stderr = new CachingStreamReader(this, "stderr");  // The standard error from the program
233     private StyxDirectory inputsDir; // Contains the input files
234     private StyxDirectory outputsDir; // Contains the output files
235     private SGSInputFile.StdinFile stdin;  // The standard input to the program
236     private boolean redirectingToStdin;
237     private JSAP jsap; // JSAP object for parsing the command-line parameters
238     private StyxDirectory paramDir; // Contains the command-line parameters to pass to the executable
239     private Vector paramFiles; // Contains the SGSParamFiles
240     private StyxFile argsFile; // The file containing the command line arguments
241     private String command; // The command to run (i.e. the string that is passed to System.exec)
242     private long startTime;
243     
244     private Date creationTime;  // The time at which this instance was created
245     private Date terminationTime; // The time at which this instance will automatically be terminated
246     
247     // SGSInstanceChangeListeners that are listening for changes to this SGS instance
248     private Vector changeListeners;
249     
250     /***
251      * Creates a new StyxGridService with the given configuration
252      * @todo: sort out permissions and owners on all these files
253      */
254     public StyxGridServiceInstance(StyxGridService sgs, String id,
255         SGSConfig sgsConfig) throws StyxException
256     {
257         super(id);
258         this.sgs = sgs;
259         this.id = id;
260         this.sgsConfig = sgsConfig;
261         
262         // Set the creation time and the termination time.  By default, the 
263         // termination time is null, i.e. the instance will last forever
264         this.creationTime = new Date();
265         this.terminationTime = null;
266         
267         this.command = sgsConfig.getCommand();
268         this.workDir = new File(sgsConfig.getWorkingDirectory() +
269             StyxUtils.SYSTEM_FILE_SEPARATOR + id);
270         this.changeListeners = new Vector();
271         
272         this.redirectingToStdin = false;
273         
274         if (this.workDir.exists())
275         {
276             // Delete the directory and all its contents
277             deleteDir(this.workDir);
278         }
279         // (Re)create the working directory
280         if (!this.workDir.mkdirs())
281         {
282             throw new StyxException("Unable to create working directory "
283                 + this.workDir);
284         }
285         
286         // Add the ctl file
287         this.addChild(new ControlFile(this)); // the ctl file
288         
289         // Add the parameters as SGSParamFiles.
290         this.paramDir = new StyxDirectory("params");
291         this.paramFiles = new Vector();
292         this.jsap = sgsConfig.getParamParser();
293         Vector params = sgsConfig.getParams();
294         for (int i = 0; i < params.size(); i++)
295         {
296             SGSParam param = (SGSParam)params.get(i);
297             SGSParamFile paramFile = new SGSParamFile(param, this);
298             this.paramFiles.add(paramFile);
299             if (param.getType() == SGSParam.OUTPUT_FILE)
300             {
301                 // For parameters that represent output files, we don't allow
302                 // their values to be changed: the output file is just named
303                 // after the parameter.  The only reason for bothering to create
304                 // an SGSParamFile is to make it easier to get the command-line
305                 // arguments - see the getArguments() method
306                 paramFile.setParameterValue(param.getName());
307             }
308             else
309             {
310                 // We don't add parameters pertaining to output files to the namespace:
311                 // the name of these files in the namespace is fixed
312                 this.paramDir.addChild(paramFile);
313             }
314         }
315         this.addChild(paramDir);
316         
317         // Add the inputs and outputs
318         this.inputsDir = new StyxDirectory("inputs");
319         this.outputsDir = new StyxDirectory("outputs");
320         
321         Vector inputs = sgsConfig.getInputs();
322         for (int i = 0; i < inputs.size(); i++)
323         {
324             SGSInput input = (SGSInput)inputs.get(i);
325             if (input.getType() == SGSInput.STREAM)
326             {
327                 this.stdin = new SGSInputFile.StdinFile(this);
328                 this.inputsDir.addChild(this.stdin);
329             }
330             else if (input.getType() == SGSInput.FILE)
331             {
332                 // This is a fixed input file.  Create the java.io.File object
333                 // that represents the local file itself.
334                 this.addInputFile(input.getName());
335             }
336             else if (input.getType() == SGSInput.FILE_FROM_PARAM)
337             {
338                 // Do nothing: these files do not appear in the namespace until
339                 // the parameter name is set.  See SGSParamFile.setParameterValue()
340             }
341             else
342             {
343                 throw new StyxException("Internal error: unknown type of input "
344                     + input.getName());
345             }
346         }
347         
348         // Now add the output files
349         this.addOutputFiles();
350         
351         // We add the output files when the service is started
352         this.addChild(this.inputsDir).addChild(this.outputsDir);
353         
354         // Add the steerable parameters
355         StyxDirectory steeringDir = new StyxDirectory("steering");
356         Vector steerables = sgsConfig.getSteerables();
357         for (int i = 0; i < steerables.size(); i++)
358         {
359             Steerable steerable = (Steerable)steerables.get(i);
360             // Create a file object for this steerable object
361             File file = new File(this.workDir, steerable.getFilePath());
362             try
363             {
364                 // Create the backing file and enter the initial value
365                 RandomAccessFile raf = new RandomAccessFile(file, "rw");
366                 raf.seek(0);
367                 // Write the initial value to the file as an array of bytes
368                 raf.write(StyxUtils.strToUTF8(steerable.getInitialValue()));
369                 // Truncate the file after the data we've just written
370                 raf.setLength(raf.getFilePointer());
371                 raf.close();
372             }
373             catch (IOException ioe)
374             {
375                 if (log.isDebugEnabled())
376                 {
377                     ioe.printStackTrace();
378                 }
379                 throw new StyxException("IOException creating steering file " +
380                     file.getName() + ": " + ioe.getMessage());
381             }
382             steeringDir.addChild(new AsyncStyxFile(new FileOnDisk(steerable.getName(), file)));
383         }
384         this.addChild(steeringDir);
385         
386         // Add the service data: the files exposing the service data will all
387         // have asynchronous behaviour
388         StyxDirectory serviceDataDir = new StyxDirectory("serviceData");
389         Vector serviceDataElements = sgsConfig.getServiceData();
390         // Add the default SDEs that all services have
391         this.status = new StringServiceDataElement("status", true, "created");
392         serviceDataDir.addChild(this.status.getAsyncStyxFile());
393         this.exitCodeFile = new ExitCodeFile();
394         serviceDataDir.addChild(this.exitCodeFile);        
395         // If we are reading from stdin, add a bytesConsumed SDE
396         if (this.stdin != null)
397         {
398             this.bytesConsumed = new StringServiceDataElement("bytesConsumed",
399                 true, "0", 2.0f);
400             serviceDataDir.addChild(this.bytesConsumed.getAsyncStyxFile());
401         }
402         // Add the rest of the SDEs
403         for (int i = 0; i < serviceDataElements.size(); i++)
404         {
405             SDEConfig sde = (SDEConfig)serviceDataElements.get(i);
406             // Look for the special SDEs.
407             if (sde.getName().equals("status") ||
408                 sde.getName().equals("bytesConsumed") ||
409                 sde.getName().equals("exitCode"))
410             {
411                 // Ignore these: these are default SDEs that we will add automatically
412                 // TODO should we throw an exception here and treat these as
413                 //     reserved words?
414             }
415             else
416             {
417                 // This is a custom SDE
418                 if (sde.getFilePath().equalsIgnoreCase(""))
419                 {
420                     throw new StyxException("Service data element " +
421                         sde.getName() + " must have a backing file");
422                 }
423                 MonitoredFileOnDisk monFile = new MonitoredFileOnDisk(sde.getName(),
424                     new File(this.workDir, sde.getFilePath()), 
425                     (long)(sde.getMinUpdateInterval() * 1000));
426                 monFile.startMonitoring();
427                 // TODO: stop monitoring somehow, when service is destroyed?
428                 serviceDataDir.addChild(monFile);
429             }
430         }
431         this.addChild(serviceDataDir);
432         
433         // Add a file that, when read, reveals the arguments that will be passed
434         // to Runtime.exec(). This is an AsyncStyxFile so
435         // that clients can be notified asynchronously of changes to the 
436         // command line if they wish
437         this.argsFile = new ArgsFile();
438         this.addChild(new AsyncStyxFile(this.argsFile));
439         
440         // Add the files that are pertinent to the lifecycle of the SGS
441         this.addChild(new TimeDirectory(this));
442         
443         this.statusCode = StatusCode.CREATED;
444     }
445     
446     /***
447      * Adds the output files to the namespace
448      */
449     private void addOutputFiles() throws StyxException
450     {
451         Vector outputs = this.sgsConfig.getOutputs();
452         for (int i = 0; i < outputs.size(); i++)
453         {
454             SGSOutput output = (SGSOutput)outputs.get(i);
455             if (output.getType() == SGSOutput.STREAM)
456             {
457                 if (output.getName().equals("stdout"))
458                 {
459                     // Add the standard output file
460                     this.outputsDir.addChild(this.stdout);
461                 }
462                 else if (output.getName().equals("stderr"))
463                 {
464                     // Add the standard error file
465                     this.outputsDir.addChild(this.stderr);
466                 }
467             }
468             else
469             {
470                 // For fixed-name files we create an SGSOutputFile named after the
471                 // file name.  For files that get their name from a parameter,
472                 // we name the SGSOutputFile after the parameter name
473                 File file = new File(this.workDir, output.getName());
474                 this.outputsDir.addChild(new SGSOutputFile(file, this));
475             }
476         }
477     }
478     
479     /***
480      * Adds a new input file to the inputs/ directory
481      */
482     public void addInputFile(String filename) throws StyxException
483     {
484         File file = new File(this.workDir, filename);
485         this.inputsDir.addChild(new SGSInputFile.File(file, this));
486     }
487     
488     /***
489      * Remove input files from the inputs/ directory
490      */
491     public void removeInputFiles(String[] filenames)
492     {
493         synchronized (this.inputsDir)
494         {
495             for (int i = 0; i < filenames.length; i++)
496             {
497                 StyxFile child = this.inputsDir.getChild(filenames[i]);
498                 if (child != null)
499                 {
500                     this.inputsDir.removeChild(child);
501                 }
502             }
503         }
504     }
505     
506     /***
507      * Makes sure all the input files are ready
508      * @throws StyxException if a required input file is not ready and a URL
509      * has not been set.
510      */
511     private void prepareInputFiles() throws StyxException
512     {
513         StyxFile[] inputFiles = this.inputsDir.getChildren();
514         for (int i = 0; i < inputFiles.length; i++)
515         {
516             log.debug("Preparing " + inputFiles[i].getName() + "...");
517             if (inputFiles[i] instanceof SGSInputFile)
518             {
519                 log.debug(inputFiles[i].getName() + " is an SGSInputFile");
520                 SGSInputFile inputFile = (SGSInputFile)inputFiles[i];
521                 URL url = inputFile.getURL();
522                 log.debug("URL = " + url);
523                 if (inputFile instanceof SGSInputFile.File)
524                 {
525                     log.debug(inputFiles[i].getName() + " is an SGSInputFile.File");
526                     SGSInputFile.File inFile = (SGSInputFile.File)inputFile;
527                     if (url == null)
528                     {
529                         // Check to see if any data have been uploaded
530                         if (!inFile.dataUploadComplete())
531                         {
532                             throw new StyxException("Must upload data to input file "
533                                 + inFile.getName());
534                         }
535                     }
536                     else
537                     {
538                         // We have set a URL for this file.  Download the data.
539                         this.downloadFrom(url, inFile.getName());
540                     }
541                 }
542             }
543         }
544     }
545     
546     public void downloadFrom(URL url, String filename) throws StyxException
547     {
548         try
549         {
550             File targetFile = new File(this.workDir, filename);
551             log.debug("Downloading from " + url + " to " + targetFile.getPath());
552             System.err.println("     ****** Downloading from " + url + " to " + targetFile.getPath());
553             FileOutputStream fout = new FileOutputStream(targetFile);
554             InputStream in = url.openStream();
555             byte[] b = new byte[8192];
556             int n = 0;
557             do
558             {
559                 n = in.read(b);
560                 if (n >= 0)
561                 {
562                     fout.write(b, 0, n);
563                 }
564                 else
565                 {
566                     in.close();
567                     fout.close();
568                     b = null;
569                 }
570             } while (n >= 0);
571         }
572         catch(IOException ioe)
573         {
574             throw new StyxException("IOException downloading from "
575                 + url + ": " + ioe.getMessage());
576         }
577     }
578     
579     /***
580      * Returns the working directory of this instance
581      */
582     public File getWorkingDirectory()
583     {
584         return this.workDir;
585     }
586     
587     /***
588      * Gets the status of this service instance
589      */
590     public StatusCode getStatus()
591     {
592         return this.statusCode;
593     }
594     
595     /***
596      * Gets the time at which this instance was created
597      */
598     Date getCreationTime()
599     {
600         return this.creationTime;
601     }
602     
603     /***
604      * Gets the time at which this instance will be terminated
605      */
606     Date getTerminationTime()
607     {
608         return this.terminationTime;
609     }
610     
611     /***
612      * Sets the time at which this instance will be terminated.
613      * @param termTime The termination time.  This must be in the future.  This
614      * can be null, which means that the instance will never be terminated
615      * automatically.
616      * @throws StyxException if the termination time is in the past
617      */
618     void setTerminationTime(Date termTime) throws StyxException
619     {
620         if (termTime != null)
621         {
622             Date now = new Date();
623             if (!termTime.after(now))
624             {
625                 throw new StyxException("Termination time must be in the future");
626             }
627         }
628         this.sgs.scheduleTermination(this, termTime);
629         this.terminationTime = termTime;
630     }
631     
632     /***
633      * File used to control the service instance (start, stop, destroy etc)
634      * @todo Reading from this file could return a list of supported commands.
635      */
636     private class ControlFile extends StyxFile
637     {
638         
639         private StyxDirectory instanceRoot; // The root directory of the SGS instance
640         
641         /*** Creates a new instance of ControlFile */
642         public ControlFile(StyxDirectory instanceRoot) throws StyxException
643         {
644             super("ctl");
645             this.instanceRoot = instanceRoot;
646         }
647         
648         public void write(StyxFileClient client, long offset, int count,
649             ByteBuffer data, boolean truncate, int tag)
650             throws StyxException
651         {
652             // Ignore empty messages (e.g. EOFs)
653             if (count == 0)
654             {
655                 this.replyWrite(client, 0, tag);
656                 return;
657             }
658             String cmdString = StyxUtils.dataToString(data);
659             // Strip the trailing newline if it exists
660             if (cmdString.endsWith(StyxUtils.NEWLINE))
661             {
662                 cmdString = cmdString.substring(0, cmdString.length() - 1);
663             }            
664             if (cmdString.equalsIgnoreCase("start"))
665             {
666                 synchronized(statusCode)
667                 {
668                     if (statusCode == StatusCode.RUNNING)
669                     {
670                         throw new StyxException("Service is already running");
671                     }
672                 }
673                 // Check that all the parameters are valid
674                 StyxFile[] paramFiles = paramDir.getChildren();
675                 for (int i = 0; i < paramFiles.length; i++)
676                 {
677                     SGSParamFile sgsPF = (SGSParamFile)paramFiles[i];
678                     // The checkValid() method throws a StyxException if the
679                     // contents of the parameter file are not valid for some reason
680                     // The checkValid() method downloads any input files that are
681                     // specified by URLs in the parameters
682                     sgsPF.checkValid();
683                 }
684                 
685                 // Check all input files have been uploaded, and download
686                 // any input files that have been specified by reference.
687                 // TODO: this will block until all the data have been downloaded.
688                 prepareInputFiles();
689                 
690                 // Start the executable
691                 try
692                 {
693                     setBytesConsumed(0);
694                     startTime = System.currentTimeMillis();
695                     // Start the process running in the correct working directory
696                     process = runtime.exec(command + " " + getArguments(),
697                         null, workDir);
698                     setStatus(StatusCode.RUNNING);
699                     new Waiter().start(); // Thread that waits for the process
700                                           // to finish, then sets status
701                     
702                     // If we have set a URL for stdin, start redirecting data
703                     // to the standard input of the process
704                     if (stdin != null && stdin.getURL() != null)
705                     {
706                         // Start redirecting data to the standard input
707                         redirectToStdin(stdin.getURL());
708                     }
709                     
710                     // Start reading from stdout and stderr. Note that we do this
711                     // even if the "stdout" and "stderr" streams are not exposed
712                     // through the Styx interface (we must do this to consume the
713                     // stdout and stderr data)
714                     stdout.setCacheFile(new File(workDir, "stdout"));
715                     stdout.startReading(process.getInputStream());
716                     stderr.setCacheFile(new File(workDir, "stderr"));
717                     stderr.startReading(process.getErrorStream());
718                 }
719                 catch(IOException ioe)
720                 {
721                     ioe.printStackTrace();
722                     if (process == null)
723                     {
724                         // We didn't even start the process
725                         throw new StyxException("Internal error: could not create process "
726                             + sgs.getRoot().getName() + " " + getArguments());
727                     }
728                     else
729                     {
730                         // We've started the process but an error occurred elsewhere
731                         process.destroy();
732                         setStatus(StatusCode.ERROR, ioe.getMessage());
733                         throw new StyxException("Internal error: could not start "
734                             + "reading from output and error streams");
735                     }
736                 }
737                 // Check to see if the process expects some data on standard input
738                 if (stdin != null)
739                 {
740                     stdin.setOutputStream(process.getOutputStream());
741                 }
742                 this.replyWrite(client, count, tag);
743             }
744             else if (cmdString.equalsIgnoreCase("stop"))
745             {
746                 synchronized(statusCode)
747                 {
748                     // This synchronization prevents the Waiter thread from 
749                     // setting the status to "finished" before we can set the
750                     // status to "aborted" here
751                     if (statusCode == StatusCode.RUNNING)
752                     {
753                         // Only do this if the process is running
754                         process.destroy();
755                         setStatus(StatusCode.ABORTED);
756                     }
757                 }
758                 this.replyWrite(client, count, tag);
759             }
760             else if (cmdString.equalsIgnoreCase("destroy"))
761             {
762                 if (statusCode == StatusCode.RUNNING)
763                 {
764                     throw new StyxException("Cannot destroy a running service: stop it first");
765                 }
766                 destroy();
767                 // TODO: should we remove the working directory too?
768                 this.replyWrite(client, count, tag);              
769             }
770             else
771             {
772                 throw new StyxException("unknown command: " + cmdString);
773             }
774         }        
775     }
776     
777     /***
778      * Destroys this SGS instance
779      */
780     void destroy()
781     {
782         // Remove all the children of this directory
783         this.removeAllChildren();
784         // Now remove this directory
785         try
786         {
787             this.remove();
788         }
789         catch (StyxException se)
790         {
791             // This should never happen
792             log.error("Internal error: got StyxException when calling remove()" +
793                 " on instance root directory");
794         }
795         // Now remove the working directory
796         this.deleteDir(this.workDir);
797         log.debug("**** INSTANCE " + this.getName() + " DESTROYED ****");
798     }
799     
800     /***
801      * File that, when read, reveals the argument list that will be passed
802      * to the executable when the SGS instance is started.  Clients can write
803      * the whole argument list to this file and hence set all the parameters
804      * at once.
805      */
806     private class ArgsFile extends StyxFile
807     {
808         public ArgsFile() throws StyxException
809         {
810             super("args", 0666);
811         }
812         
813         public void read(StyxFileClient client, long offset, int count, int tag)
814             throws StyxException
815         {
816             this.processAndReplyRead(getArguments(), client, offset, count, tag);
817         }
818         
819         /***
820          * Write the arguments all in one go (not including the name of the
821          * executable itself).  This will set the values of
822          * all the parameters in the params/ directory. At the moment the command
823          * line must be written in a single Styx message.
824          */
825         /*public void write(StyxFileClient client, long offset, int count,
826             ByteBuffer data, boolean truncate, int tag)
827             throws StyxException
828         {
829             if (offset != 0)
830             {
831                 throw new StyxException("Must write to the start of the args file");
832             }
833             if (!truncate)
834             {
835                 throw new StyxException("Must write to the args file with truncation");
836             }
837             // Set the limit of the input data buffer correctly
838             data.limit(data.position() + count);
839             String cmdLine = StyxUtils.dataToString(data);
840             
841             // Parse the command line
842             JSAPResult result = jsap.parse(cmdLine);
843             
844             if (result.success())
845             {
846                 // Parsing was successful: populate all of the parameter files
847                 // TODO: I guess that if the new and old values are identical
848                 //   we don't need to change the contents and hence notify waiting clients?
849                 //   Maybe this logic can be handled in the SGSParamFile class.
850                 StyxFile[] paramFiles = paramDir.getChildren();
851                 for (int i = 0; i < paramFiles.length; i++)
852                 {
853                     SGSParamFile sgsPF = (SGSParamFile)paramFiles[i];
854                     Parameter param = sgsPF.getJSAPParameter();
855 
856                     if (param instanceof Switch)
857                     {
858                         boolean switchSet = result.getBoolean(sgsPF.getName());
859                         sgsPF.setParameterValue(switchSet ? "true" : "false");
860                     }
861                     else
862                     {
863                         String[] arr = result.getStringArray(sgsPF.getName());
864                         if (arr != null && arr.length > 0)
865                         {
866                             StringBuffer str = new StringBuffer(arr[0]);
867                             for (int j = 1; j < arr.length; j++)
868                             {
869                                 str.append(" " + arr[j]);
870                             }
871                             sgsPF.setParameterValue(str.toString());
872                         }
873                         else
874                         {
875                             // This probably won't be reached but it's here just
876                             // in case getStringArray() doesn't return a result
877                             // but getString() does - unlikely!
878                             String str = result.getString(sgsPF.getName());
879                             sgsPF.setParameterValue(str == null ? "" : str);
880                         }
881                     }
882                 }
883             }
884             else
885             {
886                 // An error occurred in parsing: get the first error in the
887                 // Iterator (TODO: get more errors?)
888                 Iterator errIt = result.getErrorMessageIterator();
889                 String errMsg = "Error occurred parsing command line: ";
890                 if (errIt.hasNext())
891                 {
892                     errMsg += (String)errIt.next();
893                 }
894                 else
895                 {
896                     errMsg += "no details";
897                 }
898                 throw new StyxException(errMsg);
899             }
900             
901             // Notify waiting clients that the command line has changed
902             // TODO: should this just be an AsyncStyxFile instead?
903             argumentsChanged();
904             
905             this.replyWrite(client, count, tag);
906         }*/
907     }
908     
909     /***
910      * This is called when something changes to change the command line arguments
911      * (e.g. a parameter value changes)
912      */
913     public void argumentsChanged()
914     {
915         this.argsFile.contentsChanged();
916     }
917     
918     // Thread that waits for the executable to finish, then sets the status
919     private class Waiter extends Thread
920     {
921         public void run()
922         {
923             try
924             {
925                 int exitCodeVal = process.waitFor();
926                 long duration = System.currentTimeMillis() - startTime;
927                 synchronized(statusCode)
928                 {
929                     // We must get the lock on the statusCode because
930                     // we could be changing the status in another thread
931                     if (statusCode != StatusCode.ABORTED && statusCode != StatusCode.ERROR)
932                     {
933                         // don't set the status if we have terminated abnormally
934                         setStatus(StatusCode.FINISHED, "took " +
935                             (float)duration / 1000 + " seconds.");
936                     }
937                     exitCodeFile.setExitCode(exitCodeVal);
938                 }
939             }
940             catch(Exception e)
941             {
942                 if (log.isDebugEnabled())
943                 {
944                     e.printStackTrace();
945                 }
946             }
947         }
948     }
949     
950     /***
951      * Starts a thread that redirects the data from the given URL to the 
952      * input stream of the process.  If the process has not yet been created,
953      * this method does nothing
954      * @throws StyxException if no data could be read from the given url
955      */
956     void redirectToStdin(URL url) throws StyxException
957     {
958         if (this.process != null && !this.redirectingToStdin)
959         {
960             try
961             {
962                 this.redirectingToStdin = true;
963                 InputStream is = url.openStream();
964                 OutputStream os = this.process.getOutputStream();
965                 new RedirectStream(is, os).start();
966                 log.debug("*** Reading stdin from " + url + "***");
967             }
968             catch (IOException ioe)
969             {
970                 process.destroy();
971                 setStatus(StatusCode.ERROR);
972                 throw new StyxException("Cannot read from " + url);
973             }
974         }
975     }
976     
977     // Reads from an input stream and writes the result to an output stream
978     private class RedirectStream extends Thread
979     {
980         private InputStream is;
981         private OutputStream os;
982         
983         public RedirectStream(InputStream is, OutputStream os)
984         {
985             this.is = is;
986             this.os = os;
987         }
988         
989         public void run()
990         {
991             long bytesCons = 0;
992             try
993             {
994                 byte[] arr = new byte[8192]; // TODO: is this an appropriate buffer size?
995                 int n = 0;
996                 while (n >= 0)
997                 {
998                     n = this.is.read(arr);
999                     if (n >= 0)
1000                     {
1001                         this.os.write(arr, 0, n);
1002                         bytesCons += n;
1003                     }
1004                     // Update the number of bytes consumed
1005                     // TODO: should we do this here or in another thread?
1006                     // It won't hold us up as long as the network is the limiting
1007                     // factor.
1008                     setBytesConsumed(bytesCons);
1009                 }
1010             }
1011             catch(IOException ioe)
1012             {
1013                 // This will be thrown if there was an error reading from the stream
1014                 // or writing to the output stream
1015                 synchronized(statusCode)
1016                 {
1017                     // We must get the lock on the statusCode because
1018                     // we could be changing the status in another thread
1019                     if (statusCode != StatusCode.ABORTED)
1020                     {
1021                         // don't do this if the process was aborted manually
1022                         process.destroy();
1023                         setStatus(StatusCode.ERROR, "when reading input data: " + ioe.getMessage());
1024                     }
1025                 }
1026             }
1027             finally
1028             {
1029                 // Make sure all clients have the final value of bytesConsumed
1030                 bytesConsumed.flush();
1031                 try
1032                 {
1033                     // Close the streams
1034                     this.is.close();
1035                     this.os.close();
1036                 }
1037                 catch(IOException ioe)
1038                 {
1039                     // Ignore errors when closing the streams.
1040                 }
1041             }
1042         }
1043     }
1044     
1045     /***
1046      * Gets the argument list that will be passed to the executable as a String
1047      */
1048     private String getArguments()
1049     {
1050         StringBuffer buf = new StringBuffer();
1051         for (int i = 0; i < this.paramFiles.size(); i++)
1052         {
1053             // We can be pretty confident that this cast is safe
1054             SGSParamFile paramFile = (SGSParamFile)this.paramFiles.get(i);
1055             String frag = paramFile.getCommandLineFragment();
1056             if (!frag.trim().equals(""))
1057             {
1058                 buf.append(frag + " ");
1059             }
1060         }
1061         return buf.toString();
1062     }
1063     
1064     /***
1065      * Sets the status of the service and updates the status service data
1066      */
1067     private void setStatus(StatusCode code, String message)
1068     {
1069         synchronized(this.statusCode)
1070         {
1071             this.statusCode = code;
1072         }
1073         String msg = "";
1074         if (message != null && message != "")
1075         {
1076             msg = ": " + message;
1077         }
1078         if (this.status != null)
1079         {
1080             this.status.setValue(code.getText() + msg);
1081         }
1082         this.fireStatusChanged();
1083     }
1084     
1085     private void setStatus(StatusCode code)
1086     {
1087         this.setStatus(code, null);
1088     }
1089     
1090     /***
1091      * Sets the number of bytes consumed by the service instance
1092      * @param flush If true, will force the waiting clients to get the new value
1093      * (should only be used sparingly)
1094      */
1095     synchronized void setBytesConsumed(long newValue, boolean flush)
1096     {
1097         if (this.bytesConsumed != null)
1098         {
1099             this.bytesConsumed.setValue("" + newValue);
1100         }
1101         if (flush)
1102         {
1103             this.bytesConsumed.flush();
1104         }
1105     }
1106     
1107     /***
1108      * Sets the number of bytes consumed by the service instance
1109      */
1110     synchronized void setBytesConsumed(long newValue)
1111     {
1112         this.setBytesConsumed(newValue, false);
1113     }
1114     
1115     /***
1116      * Deletes a directory and its contents
1117      * @return true if the deletion was successful, false otherwise
1118      */
1119     private static boolean deleteDir(File dir)
1120     {
1121         log.debug("Deleting contents of " + dir.getPath());
1122         if (dir.isDirectory())
1123         {
1124             String[] children = dir.list();
1125             for (int i = 0; i < children.length; i++)
1126             {
1127                 boolean success = deleteDir(new File(dir, children[i]));
1128                 if (!success)
1129                 {
1130                     return false;
1131                 }
1132             }
1133         }
1134         return dir.delete();
1135     }
1136     
1137     /***
1138      * Called when the status of this service instance changes. Fires the
1139      * statusChanged() event on all registered change listeners
1140      */
1141     private void fireStatusChanged()
1142     {
1143         synchronized(this.changeListeners)
1144         {
1145             SGSInstanceChangeListener listener;
1146             for (int i = 0; i < this.changeListeners.size(); i++)
1147             {
1148                 listener = (SGSInstanceChangeListener)this.changeListeners.get(i);
1149                 listener.statusChanged(this.statusCode);
1150             }
1151         }
1152     }
1153     
1154     /***
1155      * Adds a listener that will be notified of changes to this SGS. If the
1156      * listener is already registered, this will do nothing.
1157      */
1158     public void addChangeListener(SGSInstanceChangeListener listener)
1159     {
1160         synchronized(this.changeListeners)
1161         {
1162             if (!this.changeListeners.contains(listener))
1163             {
1164                 this.changeListeners.add(listener);
1165             }
1166         }
1167     }
1168     
1169     /***
1170      * Removes a SGSInstanceChangeListener.  (Note that this will only remove the first
1171      * instance of a given SGSInstanceChangeListener.  If, for some reason, more than one 
1172      * copy of the same change listener has been registered, this method will
1173      * only remove the first.)
1174      */
1175     public void removeChangeListener(SGSInstanceChangeListener listener)
1176     {
1177         synchronized(this.changeListeners)
1178         {
1179             boolean contained = this.changeListeners.remove(listener);
1180         }
1181     }
1182     
1183 }
1184