1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package uk.ac.rdg.resc.jstyx.gridservice.server;
30
31 import java.net.URL;
32 import java.net.MalformedURLException;
33 import java.net.URLConnection;
34
35 import java.io.InputStream;
36 import java.io.OutputStream;
37 import java.io.FileOutputStream;
38 import java.io.IOException;
39 import java.io.File;
40 import java.io.RandomAccessFile;
41 import java.io.FileNotFoundException;
42
43 import java.util.Vector;
44 import java.util.Iterator;
45 import java.util.Date;
46
47 import org.apache.mina.common.ByteBuffer;
48 import org.apache.log4j.Logger;
49
50 import com.martiansoftware.jsap.JSAP;
51 import com.martiansoftware.jsap.Parameter;
52 import com.martiansoftware.jsap.Switch;
53 import com.martiansoftware.jsap.FlaggedOption;
54 import com.martiansoftware.jsap.UnflaggedOption;
55 import com.martiansoftware.jsap.JSAPResult;
56
57 import uk.ac.rdg.resc.jstyx.server.StyxFile;
58 import uk.ac.rdg.resc.jstyx.server.StyxFileChangeListener;
59 import uk.ac.rdg.resc.jstyx.server.AsyncStyxFile;
60 import uk.ac.rdg.resc.jstyx.server.StyxDirectory;
61 import uk.ac.rdg.resc.jstyx.server.StyxFileClient;
62 import uk.ac.rdg.resc.jstyx.server.InMemoryFile;
63 import uk.ac.rdg.resc.jstyx.server.FileOnDisk;
64 import uk.ac.rdg.resc.jstyx.server.DirectoryOnDisk;
65 import uk.ac.rdg.resc.jstyx.server.MonitoredFileOnDisk;
66
67 import uk.ac.rdg.resc.jstyx.StyxException;
68 import uk.ac.rdg.resc.jstyx.StyxUtils;
69 import uk.ac.rdg.resc.jstyx.types.ULong;
70
71 import uk.ac.rdg.resc.jstyx.gridservice.config.*;
72
73 /***
74 * Class representing a StyxGridService instance
75 *
76 * @author Jon Blower
77 * $Revision: 609 $
78 * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
79 * $Log$
80 * Revision 1.48 2006/02/23 09:06:25 jonblower
81 * Fixed bug in destroy(): instances and their cached files are now destroyed properly
82 *
83 * Revision 1.47 2006/02/20 17:35:01 jonblower
84 * Implemented correct handling of output files/streams (not fully tested yet)
85 *
86 * Revision 1.46 2006/02/17 17:34:12 jonblower
87 * Changes to comments
88 *
89 * Revision 1.45 2006/02/17 09:24:02 jonblower
90 * Changed so that output files are added to the namespace on initialization and
91 * parameters representing output files are not present in the namespace
92 *
93 * Revision 1.44 2006/01/04 16:45:29 jonblower
94 * Implemented automatic termination of SGS instances using Quartz scheduler
95 *
96 * Revision 1.43 2006/01/04 11:24:58 jonblower
97 * Implemented time directory in the SGS instance namespace
98 *
99 * Revision 1.42 2005/12/07 17:47:58 jonblower
100 * Changed "commandline" file to "args" - now just contains arguments, not program name
101 *
102 * Revision 1.41 2005/12/01 08:37:48 jonblower
103 * Changed ID from int to String
104 *
105 * Revision 1.40 2005/11/14 21:31:54 jonblower
106 * Got SGSRun working for SC2005 demo
107 *
108 * Revision 1.39 2005/11/11 21:57:21 jonblower
109 * Implemented passing of URLs to input files
110 *
111 * Revision 1.38 2005/11/10 19:50:43 jonblower
112 * Added code to handle output files
113 *
114 * Revision 1.37 2005/11/09 18:01:31 jonblower
115 * Changed way that input files are exposed and their relation to parameters
116 *
117 * Revision 1.36 2005/11/04 19:29:53 jonblower
118 * Moved code that writes to std input to SGSInputFile
119 *
120 * Revision 1.35 2005/11/04 09:11:23 jonblower
121 * Made SGSParamFile inherit from AsyncStyxFile instead of InMemoryFile
122 *
123 * Revision 1.34 2005/11/03 07:42:47 jonblower
124 * Implemented JSAP-based parameter parsing
125 *
126 * Revision 1.30 2005/10/18 14:08:14 jonblower
127 * Removed inputfiles from namespace
128 *
129 * Revision 1.29 2005/10/14 18:03:23 jonblower
130 * Fixed bug with writing to input stream
131 *
132 * Revision 1.28 2005/09/08 07:08:59 jonblower
133 * Removed "String user" from list of parameters to StyxFile.write()
134 *
135 * Revision 1.27 2005/08/31 17:08:54 jonblower
136 * Fixed bug with handling exception when Process could not be created
137 *
138 * Revision 1.26 2005/08/30 16:29:00 jonblower
139 * Added processAndReplyRead() helper functions to StyxFile
140 *
141 * Revision 1.24 2005/08/02 08:05:18 jonblower
142 * Continuing to implement steering
143 *
144 * Revision 1.22 2005/08/01 16:38:05 jonblower
145 * Implemented simple parameter handling
146 *
147 * Revision 1.21 2005/07/29 16:56:07 jonblower
148 * Implementing reading command line asynchronously
149 *
150 * Revision 1.20 2005/06/20 07:17:35 jonblower
151 * Wrapped SGSParamFile as AsyncStyxFile
152 *
153 * Revision 1.19 2005/06/14 07:45:16 jonblower
154 * Implemented setting of params and async notification of parameter changes
155 *
156 * Revision 1.18 2005/06/10 07:53:12 jonblower
157 * Changed SGS namespace: removed "inurl" and subsumed functionality into "stdin"
158 *
159 * Revision 1.17 2005/05/27 17:05:07 jonblower
160 * Changes to incorporate GeneralCachingStreamReader
161 *
162 * Revision 1.16 2005/05/26 21:24:44 jonblower
163 * Added exitCode as a new service data element
164 *
165 * Revision 1.15 2005/05/26 16:50:57 jonblower
166 * Fixed bug with input files directory
167 *
168 * Revision 1.14 2005/05/19 18:42:07 jonblower
169 * Implementing specification of input files required by SGS
170 *
171 * Revision 1.13 2005/05/16 11:00:53 jonblower
172 * Changed SGS config XML file structure: separated input and output streams and changed some tag names
173 *
174 * Revision 1.12 2005/05/13 16:49:34 jonblower
175 * Coded dynamic detection and display of service data, also included streams in config file
176 *
177 * Revision 1.11 2005/05/11 15:14:31 jonblower
178 * Implemented more flexible definition of service data elements
179 *
180 * Revision 1.9 2005/04/27 16:11:43 jonblower
181 * Added capability to add documentation files to SGS namespace
182 *
183 * Revision 1.8 2005/04/26 07:46:11 jonblower
184 * Continuing to improve setting of parameters in Styx Grid Services
185 *
186 * Revision 1.7 2005/03/24 17:33:51 jonblower
187 * Improved reading of service parameters from config file
188 *
189 * Revision 1.6 2005/03/24 14:47:47 jonblower
190 * Provided default read() and write() methods for StyxFile so it is no longer abstract
191 *
192 * Revision 1.5 2005/03/24 09:48:31 jonblower
193 * Changed 'count' from long to int throughout for reading and writing
194 *
195 * Revision 1.4 2005/03/24 07:57:41 jonblower
196 * Improved code for reading SSL info from SGSconfig file and included parameter
197 * information for the Grid Services in the config file
198 *
199 * Revision 1.3 2005/03/19 21:47:02 jonblower
200 * Further fixes relating to releasing ByteBuffers
201 *
202 * Revision 1.2 2005/03/18 16:45:18 jonblower
203 * Released ByteBuffers after use
204 *
205 * Revision 1.1 2005/03/16 22:16:44 jonblower
206 * Added Styx Grid Service classes to core module
207 *
208 * Revision 1.2 2005/03/16 17:59:35 jonblower
209 * Changed following changes to core JStyx library (replacement of
210 * java.nio.ByteBuffers with MINA's ByteBuffers)
211 *
212 * Revision 1.1 2005/02/16 19:22:32 jonblower
213 * Commit adding of SGS files to CVS
214 *
215 */
216 class StyxGridServiceInstance extends StyxDirectory
217 {
218
219 private static final Logger log = Logger.getLogger(StyxGridServiceInstance.class);
220 private static final Runtime runtime = Runtime.getRuntime();
221
222 private StyxGridService sgs;
223 private String id;
224 private SGSConfig sgsConfig;
225 private File workDir;
226 private Process process = null;
227 private StatusCode statusCode;
228 private ServiceDataElement status;
229 private ServiceDataElement bytesConsumed;
230 private ExitCodeFile exitCodeFile;
231 private CachingStreamReader stdout = new CachingStreamReader(this, "stdout");
232 private CachingStreamReader stderr = new CachingStreamReader(this, "stderr");
233 private StyxDirectory inputsDir;
234 private StyxDirectory outputsDir;
235 private SGSInputFile.StdinFile stdin;
236 private boolean redirectingToStdin;
237 private JSAP jsap;
238 private StyxDirectory paramDir;
239 private Vector paramFiles;
240 private StyxFile argsFile;
241 private String command;
242 private long startTime;
243
244 private Date creationTime;
245 private Date terminationTime;
246
247
248 private Vector changeListeners;
249
250 /***
251 * Creates a new StyxGridService with the given configuration
252 * @todo: sort out permissions and owners on all these files
253 */
254 public StyxGridServiceInstance(StyxGridService sgs, String id,
255 SGSConfig sgsConfig) throws StyxException
256 {
257 super(id);
258 this.sgs = sgs;
259 this.id = id;
260 this.sgsConfig = sgsConfig;
261
262
263
264 this.creationTime = new Date();
265 this.terminationTime = null;
266
267 this.command = sgsConfig.getCommand();
268 this.workDir = new File(sgsConfig.getWorkingDirectory() +
269 StyxUtils.SYSTEM_FILE_SEPARATOR + id);
270 this.changeListeners = new Vector();
271
272 this.redirectingToStdin = false;
273
274 if (this.workDir.exists())
275 {
276
277 deleteDir(this.workDir);
278 }
279
280 if (!this.workDir.mkdirs())
281 {
282 throw new StyxException("Unable to create working directory "
283 + this.workDir);
284 }
285
286
287 this.addChild(new ControlFile(this));
288
289
290 this.paramDir = new StyxDirectory("params");
291 this.paramFiles = new Vector();
292 this.jsap = sgsConfig.getParamParser();
293 Vector params = sgsConfig.getParams();
294 for (int i = 0; i < params.size(); i++)
295 {
296 SGSParam param = (SGSParam)params.get(i);
297 SGSParamFile paramFile = new SGSParamFile(param, this);
298 this.paramFiles.add(paramFile);
299 if (param.getType() == SGSParam.OUTPUT_FILE)
300 {
301
302
303
304
305
306 paramFile.setParameterValue(param.getName());
307 }
308 else
309 {
310
311
312 this.paramDir.addChild(paramFile);
313 }
314 }
315 this.addChild(paramDir);
316
317
318 this.inputsDir = new StyxDirectory("inputs");
319 this.outputsDir = new StyxDirectory("outputs");
320
321 Vector inputs = sgsConfig.getInputs();
322 for (int i = 0; i < inputs.size(); i++)
323 {
324 SGSInput input = (SGSInput)inputs.get(i);
325 if (input.getType() == SGSInput.STREAM)
326 {
327 this.stdin = new SGSInputFile.StdinFile(this);
328 this.inputsDir.addChild(this.stdin);
329 }
330 else if (input.getType() == SGSInput.FILE)
331 {
332
333
334 this.addInputFile(input.getName());
335 }
336 else if (input.getType() == SGSInput.FILE_FROM_PARAM)
337 {
338
339
340 }
341 else
342 {
343 throw new StyxException("Internal error: unknown type of input "
344 + input.getName());
345 }
346 }
347
348
349 this.addOutputFiles();
350
351
352 this.addChild(this.inputsDir).addChild(this.outputsDir);
353
354
355 StyxDirectory steeringDir = new StyxDirectory("steering");
356 Vector steerables = sgsConfig.getSteerables();
357 for (int i = 0; i < steerables.size(); i++)
358 {
359 Steerable steerable = (Steerable)steerables.get(i);
360
361 File file = new File(this.workDir, steerable.getFilePath());
362 try
363 {
364
365 RandomAccessFile raf = new RandomAccessFile(file, "rw");
366 raf.seek(0);
367
368 raf.write(StyxUtils.strToUTF8(steerable.getInitialValue()));
369
370 raf.setLength(raf.getFilePointer());
371 raf.close();
372 }
373 catch (IOException ioe)
374 {
375 if (log.isDebugEnabled())
376 {
377 ioe.printStackTrace();
378 }
379 throw new StyxException("IOException creating steering file " +
380 file.getName() + ": " + ioe.getMessage());
381 }
382 steeringDir.addChild(new AsyncStyxFile(new FileOnDisk(steerable.getName(), file)));
383 }
384 this.addChild(steeringDir);
385
386
387
388 StyxDirectory serviceDataDir = new StyxDirectory("serviceData");
389 Vector serviceDataElements = sgsConfig.getServiceData();
390
391 this.status = new StringServiceDataElement("status", true, "created");
392 serviceDataDir.addChild(this.status.getAsyncStyxFile());
393 this.exitCodeFile = new ExitCodeFile();
394 serviceDataDir.addChild(this.exitCodeFile);
395
396 if (this.stdin != null)
397 {
398 this.bytesConsumed = new StringServiceDataElement("bytesConsumed",
399 true, "0", 2.0f);
400 serviceDataDir.addChild(this.bytesConsumed.getAsyncStyxFile());
401 }
402
403 for (int i = 0; i < serviceDataElements.size(); i++)
404 {
405 SDEConfig sde = (SDEConfig)serviceDataElements.get(i);
406
407 if (sde.getName().equals("status") ||
408 sde.getName().equals("bytesConsumed") ||
409 sde.getName().equals("exitCode"))
410 {
411
412
413
414 }
415 else
416 {
417
418 if (sde.getFilePath().equalsIgnoreCase(""))
419 {
420 throw new StyxException("Service data element " +
421 sde.getName() + " must have a backing file");
422 }
423 MonitoredFileOnDisk monFile = new MonitoredFileOnDisk(sde.getName(),
424 new File(this.workDir, sde.getFilePath()),
425 (long)(sde.getMinUpdateInterval() * 1000));
426 monFile.startMonitoring();
427
428 serviceDataDir.addChild(monFile);
429 }
430 }
431 this.addChild(serviceDataDir);
432
433
434
435
436
437 this.argsFile = new ArgsFile();
438 this.addChild(new AsyncStyxFile(this.argsFile));
439
440
441 this.addChild(new TimeDirectory(this));
442
443 this.statusCode = StatusCode.CREATED;
444 }
445
446 /***
447 * Adds the output files to the namespace
448 */
449 private void addOutputFiles() throws StyxException
450 {
451 Vector outputs = this.sgsConfig.getOutputs();
452 for (int i = 0; i < outputs.size(); i++)
453 {
454 SGSOutput output = (SGSOutput)outputs.get(i);
455 if (output.getType() == SGSOutput.STREAM)
456 {
457 if (output.getName().equals("stdout"))
458 {
459
460 this.outputsDir.addChild(this.stdout);
461 }
462 else if (output.getName().equals("stderr"))
463 {
464
465 this.outputsDir.addChild(this.stderr);
466 }
467 }
468 else
469 {
470
471
472
473 File file = new File(this.workDir, output.getName());
474 this.outputsDir.addChild(new SGSOutputFile(file, this));
475 }
476 }
477 }
478
479 /***
480 * Adds a new input file to the inputs/ directory
481 */
482 public void addInputFile(String filename) throws StyxException
483 {
484 File file = new File(this.workDir, filename);
485 this.inputsDir.addChild(new SGSInputFile.File(file, this));
486 }
487
488 /***
489 * Remove input files from the inputs/ directory
490 */
491 public void removeInputFiles(String[] filenames)
492 {
493 synchronized (this.inputsDir)
494 {
495 for (int i = 0; i < filenames.length; i++)
496 {
497 StyxFile child = this.inputsDir.getChild(filenames[i]);
498 if (child != null)
499 {
500 this.inputsDir.removeChild(child);
501 }
502 }
503 }
504 }
505
506 /***
507 * Makes sure all the input files are ready
508 * @throws StyxException if a required input file is not ready and a URL
509 * has not been set.
510 */
511 private void prepareInputFiles() throws StyxException
512 {
513 StyxFile[] inputFiles = this.inputsDir.getChildren();
514 for (int i = 0; i < inputFiles.length; i++)
515 {
516 log.debug("Preparing " + inputFiles[i].getName() + "...");
517 if (inputFiles[i] instanceof SGSInputFile)
518 {
519 log.debug(inputFiles[i].getName() + " is an SGSInputFile");
520 SGSInputFile inputFile = (SGSInputFile)inputFiles[i];
521 URL url = inputFile.getURL();
522 log.debug("URL = " + url);
523 if (inputFile instanceof SGSInputFile.File)
524 {
525 log.debug(inputFiles[i].getName() + " is an SGSInputFile.File");
526 SGSInputFile.File inFile = (SGSInputFile.File)inputFile;
527 if (url == null)
528 {
529
530 if (!inFile.dataUploadComplete())
531 {
532 throw new StyxException("Must upload data to input file "
533 + inFile.getName());
534 }
535 }
536 else
537 {
538
539 this.downloadFrom(url, inFile.getName());
540 }
541 }
542 }
543 }
544 }
545
546 public void downloadFrom(URL url, String filename) throws StyxException
547 {
548 try
549 {
550 File targetFile = new File(this.workDir, filename);
551 log.debug("Downloading from " + url + " to " + targetFile.getPath());
552 System.err.println(" ****** Downloading from " + url + " to " + targetFile.getPath());
553 FileOutputStream fout = new FileOutputStream(targetFile);
554 InputStream in = url.openStream();
555 byte[] b = new byte[8192];
556 int n = 0;
557 do
558 {
559 n = in.read(b);
560 if (n >= 0)
561 {
562 fout.write(b, 0, n);
563 }
564 else
565 {
566 in.close();
567 fout.close();
568 b = null;
569 }
570 } while (n >= 0);
571 }
572 catch(IOException ioe)
573 {
574 throw new StyxException("IOException downloading from "
575 + url + ": " + ioe.getMessage());
576 }
577 }
578
579 /***
580 * Returns the working directory of this instance
581 */
582 public File getWorkingDirectory()
583 {
584 return this.workDir;
585 }
586
587 /***
588 * Gets the status of this service instance
589 */
590 public StatusCode getStatus()
591 {
592 return this.statusCode;
593 }
594
595 /***
596 * Gets the time at which this instance was created
597 */
598 Date getCreationTime()
599 {
600 return this.creationTime;
601 }
602
603 /***
604 * Gets the time at which this instance will be terminated
605 */
606 Date getTerminationTime()
607 {
608 return this.terminationTime;
609 }
610
611 /***
612 * Sets the time at which this instance will be terminated.
613 * @param termTime The termination time. This must be in the future. This
614 * can be null, which means that the instance will never be terminated
615 * automatically.
616 * @throws StyxException if the termination time is in the past
617 */
618 void setTerminationTime(Date termTime) throws StyxException
619 {
620 if (termTime != null)
621 {
622 Date now = new Date();
623 if (!termTime.after(now))
624 {
625 throw new StyxException("Termination time must be in the future");
626 }
627 }
628 this.sgs.scheduleTermination(this, termTime);
629 this.terminationTime = termTime;
630 }
631
632 /***
633 * File used to control the service instance (start, stop, destroy etc)
634 * @todo Reading from this file could return a list of supported commands.
635 */
636 private class ControlFile extends StyxFile
637 {
638
639 private StyxDirectory instanceRoot;
640
641 /*** Creates a new instance of ControlFile */
642 public ControlFile(StyxDirectory instanceRoot) throws StyxException
643 {
644 super("ctl");
645 this.instanceRoot = instanceRoot;
646 }
647
648 public void write(StyxFileClient client, long offset, int count,
649 ByteBuffer data, boolean truncate, int tag)
650 throws StyxException
651 {
652
653 if (count == 0)
654 {
655 this.replyWrite(client, 0, tag);
656 return;
657 }
658 String cmdString = StyxUtils.dataToString(data);
659
660 if (cmdString.endsWith(StyxUtils.NEWLINE))
661 {
662 cmdString = cmdString.substring(0, cmdString.length() - 1);
663 }
664 if (cmdString.equalsIgnoreCase("start"))
665 {
666 synchronized(statusCode)
667 {
668 if (statusCode == StatusCode.RUNNING)
669 {
670 throw new StyxException("Service is already running");
671 }
672 }
673
674 StyxFile[] paramFiles = paramDir.getChildren();
675 for (int i = 0; i < paramFiles.length; i++)
676 {
677 SGSParamFile sgsPF = (SGSParamFile)paramFiles[i];
678
679
680
681
682 sgsPF.checkValid();
683 }
684
685
686
687
688 prepareInputFiles();
689
690
691 try
692 {
693 setBytesConsumed(0);
694 startTime = System.currentTimeMillis();
695
696 process = runtime.exec(command + " " + getArguments(),
697 null, workDir);
698 setStatus(StatusCode.RUNNING);
699 new Waiter().start();
700
701
702
703
704 if (stdin != null && stdin.getURL() != null)
705 {
706
707 redirectToStdin(stdin.getURL());
708 }
709
710
711
712
713
714 stdout.setCacheFile(new File(workDir, "stdout"));
715 stdout.startReading(process.getInputStream());
716 stderr.setCacheFile(new File(workDir, "stderr"));
717 stderr.startReading(process.getErrorStream());
718 }
719 catch(IOException ioe)
720 {
721 ioe.printStackTrace();
722 if (process == null)
723 {
724
725 throw new StyxException("Internal error: could not create process "
726 + sgs.getRoot().getName() + " " + getArguments());
727 }
728 else
729 {
730
731 process.destroy();
732 setStatus(StatusCode.ERROR, ioe.getMessage());
733 throw new StyxException("Internal error: could not start "
734 + "reading from output and error streams");
735 }
736 }
737
738 if (stdin != null)
739 {
740 stdin.setOutputStream(process.getOutputStream());
741 }
742 this.replyWrite(client, count, tag);
743 }
744 else if (cmdString.equalsIgnoreCase("stop"))
745 {
746 synchronized(statusCode)
747 {
748
749
750
751 if (statusCode == StatusCode.RUNNING)
752 {
753
754 process.destroy();
755 setStatus(StatusCode.ABORTED);
756 }
757 }
758 this.replyWrite(client, count, tag);
759 }
760 else if (cmdString.equalsIgnoreCase("destroy"))
761 {
762 if (statusCode == StatusCode.RUNNING)
763 {
764 throw new StyxException("Cannot destroy a running service: stop it first");
765 }
766 destroy();
767
768 this.replyWrite(client, count, tag);
769 }
770 else
771 {
772 throw new StyxException("unknown command: " + cmdString);
773 }
774 }
775 }
776
777 /***
778 * Destroys this SGS instance
779 */
780 void destroy()
781 {
782
783 this.removeAllChildren();
784
785 try
786 {
787 this.remove();
788 }
789 catch (StyxException se)
790 {
791
792 log.error("Internal error: got StyxException when calling remove()" +
793 " on instance root directory");
794 }
795
796 this.deleteDir(this.workDir);
797 log.debug("**** INSTANCE " + this.getName() + " DESTROYED ****");
798 }
799
800 /***
801 * File that, when read, reveals the argument list that will be passed
802 * to the executable when the SGS instance is started. Clients can write
803 * the whole argument list to this file and hence set all the parameters
804 * at once.
805 */
806 private class ArgsFile extends StyxFile
807 {
808 public ArgsFile() throws StyxException
809 {
810 super("args", 0666);
811 }
812
813 public void read(StyxFileClient client, long offset, int count, int tag)
814 throws StyxException
815 {
816 this.processAndReplyRead(getArguments(), client, offset, count, tag);
817 }
818
819 /***
820 * Write the arguments all in one go (not including the name of the
821 * executable itself). This will set the values of
822 * all the parameters in the params/ directory. At the moment the command
823 * line must be written in a single Styx message.
824 */
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907 }
908
909 /***
910 * This is called when something changes to change the command line arguments
911 * (e.g. a parameter value changes)
912 */
913 public void argumentsChanged()
914 {
915 this.argsFile.contentsChanged();
916 }
917
918
919 private class Waiter extends Thread
920 {
921 public void run()
922 {
923 try
924 {
925 int exitCodeVal = process.waitFor();
926 long duration = System.currentTimeMillis() - startTime;
927 synchronized(statusCode)
928 {
929
930
931 if (statusCode != StatusCode.ABORTED && statusCode != StatusCode.ERROR)
932 {
933
934 setStatus(StatusCode.FINISHED, "took " +
935 (float)duration / 1000 + " seconds.");
936 }
937 exitCodeFile.setExitCode(exitCodeVal);
938 }
939 }
940 catch(Exception e)
941 {
942 if (log.isDebugEnabled())
943 {
944 e.printStackTrace();
945 }
946 }
947 }
948 }
949
950 /***
951 * Starts a thread that redirects the data from the given URL to the
952 * input stream of the process. If the process has not yet been created,
953 * this method does nothing
954 * @throws StyxException if no data could be read from the given url
955 */
956 void redirectToStdin(URL url) throws StyxException
957 {
958 if (this.process != null && !this.redirectingToStdin)
959 {
960 try
961 {
962 this.redirectingToStdin = true;
963 InputStream is = url.openStream();
964 OutputStream os = this.process.getOutputStream();
965 new RedirectStream(is, os).start();
966 log.debug("*** Reading stdin from " + url + "***");
967 }
968 catch (IOException ioe)
969 {
970 process.destroy();
971 setStatus(StatusCode.ERROR);
972 throw new StyxException("Cannot read from " + url);
973 }
974 }
975 }
976
977
978 private class RedirectStream extends Thread
979 {
980 private InputStream is;
981 private OutputStream os;
982
983 public RedirectStream(InputStream is, OutputStream os)
984 {
985 this.is = is;
986 this.os = os;
987 }
988
989 public void run()
990 {
991 long bytesCons = 0;
992 try
993 {
994 byte[] arr = new byte[8192];
995 int n = 0;
996 while (n >= 0)
997 {
998 n = this.is.read(arr);
999 if (n >= 0)
1000 {
1001 this.os.write(arr, 0, n);
1002 bytesCons += n;
1003 }
1004
1005
1006
1007
1008 setBytesConsumed(bytesCons);
1009 }
1010 }
1011 catch(IOException ioe)
1012 {
1013
1014
1015 synchronized(statusCode)
1016 {
1017
1018
1019 if (statusCode != StatusCode.ABORTED)
1020 {
1021
1022 process.destroy();
1023 setStatus(StatusCode.ERROR, "when reading input data: " + ioe.getMessage());
1024 }
1025 }
1026 }
1027 finally
1028 {
1029
1030 bytesConsumed.flush();
1031 try
1032 {
1033
1034 this.is.close();
1035 this.os.close();
1036 }
1037 catch(IOException ioe)
1038 {
1039
1040 }
1041 }
1042 }
1043 }
1044
1045 /***
1046 * Gets the argument list that will be passed to the executable as a String
1047 */
1048 private String getArguments()
1049 {
1050 StringBuffer buf = new StringBuffer();
1051 for (int i = 0; i < this.paramFiles.size(); i++)
1052 {
1053
1054 SGSParamFile paramFile = (SGSParamFile)this.paramFiles.get(i);
1055 String frag = paramFile.getCommandLineFragment();
1056 if (!frag.trim().equals(""))
1057 {
1058 buf.append(frag + " ");
1059 }
1060 }
1061 return buf.toString();
1062 }
1063
1064 /***
1065 * Sets the status of the service and updates the status service data
1066 */
1067 private void setStatus(StatusCode code, String message)
1068 {
1069 synchronized(this.statusCode)
1070 {
1071 this.statusCode = code;
1072 }
1073 String msg = "";
1074 if (message != null && message != "")
1075 {
1076 msg = ": " + message;
1077 }
1078 if (this.status != null)
1079 {
1080 this.status.setValue(code.getText() + msg);
1081 }
1082 this.fireStatusChanged();
1083 }
1084
1085 private void setStatus(StatusCode code)
1086 {
1087 this.setStatus(code, null);
1088 }
1089
1090 /***
1091 * Sets the number of bytes consumed by the service instance
1092 * @param flush If true, will force the waiting clients to get the new value
1093 * (should only be used sparingly)
1094 */
1095 synchronized void setBytesConsumed(long newValue, boolean flush)
1096 {
1097 if (this.bytesConsumed != null)
1098 {
1099 this.bytesConsumed.setValue("" + newValue);
1100 }
1101 if (flush)
1102 {
1103 this.bytesConsumed.flush();
1104 }
1105 }
1106
1107 /***
1108 * Sets the number of bytes consumed by the service instance
1109 */
1110 synchronized void setBytesConsumed(long newValue)
1111 {
1112 this.setBytesConsumed(newValue, false);
1113 }
1114
1115 /***
1116 * Deletes a directory and its contents
1117 * @return true if the deletion was successful, false otherwise
1118 */
1119 private static boolean deleteDir(File dir)
1120 {
1121 log.debug("Deleting contents of " + dir.getPath());
1122 if (dir.isDirectory())
1123 {
1124 String[] children = dir.list();
1125 for (int i = 0; i < children.length; i++)
1126 {
1127 boolean success = deleteDir(new File(dir, children[i]));
1128 if (!success)
1129 {
1130 return false;
1131 }
1132 }
1133 }
1134 return dir.delete();
1135 }
1136
1137 /***
1138 * Called when the status of this service instance changes. Fires the
1139 * statusChanged() event on all registered change listeners
1140 */
1141 private void fireStatusChanged()
1142 {
1143 synchronized(this.changeListeners)
1144 {
1145 SGSInstanceChangeListener listener;
1146 for (int i = 0; i < this.changeListeners.size(); i++)
1147 {
1148 listener = (SGSInstanceChangeListener)this.changeListeners.get(i);
1149 listener.statusChanged(this.statusCode);
1150 }
1151 }
1152 }
1153
1154 /***
1155 * Adds a listener that will be notified of changes to this SGS. If the
1156 * listener is already registered, this will do nothing.
1157 */
1158 public void addChangeListener(SGSInstanceChangeListener listener)
1159 {
1160 synchronized(this.changeListeners)
1161 {
1162 if (!this.changeListeners.contains(listener))
1163 {
1164 this.changeListeners.add(listener);
1165 }
1166 }
1167 }
1168
1169 /***
1170 * Removes a SGSInstanceChangeListener. (Note that this will only remove the first
1171 * instance of a given SGSInstanceChangeListener. If, for some reason, more than one
1172 * copy of the same change listener has been registered, this method will
1173 * only remove the first.)
1174 */
1175 public void removeChangeListener(SGSInstanceChangeListener listener)
1176 {
1177 synchronized(this.changeListeners)
1178 {
1179 boolean contained = this.changeListeners.remove(listener);
1180 }
1181 }
1182
1183 }
1184