View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   * 
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.client;
30  
31  import java.net.InetSocketAddress;
32  import java.net.ConnectException;
33  import java.io.IOException;
34  
35  import java.util.Hashtable;
36  import java.util.Vector;
37  import java.util.Iterator;
38  
39  import org.apache.mina.io.IoFilter;
40  import org.apache.mina.io.filter.IoThreadPoolFilter;
41  import org.apache.mina.protocol.filter.ProtocolThreadPoolFilter;
42  import org.apache.mina.io.socket.SocketConnector;
43  import org.apache.mina.protocol.ProtocolHandler;
44  import org.apache.mina.protocol.ProtocolFilter;
45  import org.apache.mina.protocol.ProtocolProvider;
46  import org.apache.mina.protocol.ProtocolSession;
47  import org.apache.mina.protocol.io.IoProtocolConnector;
48  import org.apache.mina.common.IdleStatus;
49  
50  import org.apache.log4j.Logger;
51  
52  import uk.ac.rdg.resc.jstyx.messages.StyxMessageDecoder;
53  import uk.ac.rdg.resc.jstyx.messages.StyxMessage;
54  import uk.ac.rdg.resc.jstyx.messages.TversionMessage;
55  import uk.ac.rdg.resc.jstyx.messages.RversionMessage;
56  import uk.ac.rdg.resc.jstyx.messages.TauthMessage;
57  import uk.ac.rdg.resc.jstyx.messages.RauthMessage;
58  import uk.ac.rdg.resc.jstyx.messages.TattachMessage;
59  import uk.ac.rdg.resc.jstyx.messages.RattachMessage;
60  import uk.ac.rdg.resc.jstyx.messages.TclunkMessage;
61  import uk.ac.rdg.resc.jstyx.messages.RclunkMessage;
62  
63  import uk.ac.rdg.resc.jstyx.StyxUtils;
64  import uk.ac.rdg.resc.jstyx.StyxException;
65  
66  /***
67   * Object representing a client connection to a Styx server.
68   *
69   * @author Jon Blower
70   * $Revision: 604 $
71   * $Date: 2006-03-21 14:58:42 +0000 (Tue, 21 Mar 2006) $
72   * $Log$
73   * Revision 1.28  2006/03/21 14:58:41  jonblower
74   * Implemented clear-text password-based authentication and did some simple tests
75   *
76   * Revision 1.27  2006/03/21 09:06:14  jonblower
77   * Still implementing authentication
78   *
79   * Revision 1.24  2005/08/08 09:35:19  jonblower
80   * Commented out thread pool filters
81   *
82   * Revision 1.23  2005/07/08 16:01:27  jonblower
83   * Reinstated ProtocolThreadPoolFilter
84   *
85   * Revision 1.22  2005/07/08 15:22:54  jonblower
86   * Upgraded MINA library to 0.7.3-SNAPSHOT
87   *
88   * Revision 1.21  2005/06/27 17:17:15  jonblower
89   * Changed MessageCallback to pass Tmessage as parameter, rather than storing in the instance
90   *
91   * Revision 1.18  2005/05/25 15:37:55  jonblower
92   * Removed cache of CStyxFiles, dealt differently with root fid
93   *
94   * Revision 1.17  2005/05/23 16:48:17  jonblower
95   * Overhauled CStyxFile (esp. asynchronous methods) and StyxConnection (added cache of CStyxFiles)
96   *
97   * Revision 1.16  2005/05/16 16:16:52  jonblower
98   * Implemented getRemoteHost() and getRemotePort()
99   *
100  * Revision 1.15  2005/05/16 13:09:54  jonblower
101  * Added StyxConnection object as first argument in all StyxConnectionListener methods
102  *
103  * Revision 1.14  2005/05/16 12:57:39  jonblower
104  * Constructors no longer throw StyxException
105  *
106  * Revision 1.13  2005/05/09 13:35:48  jonblower
107  * Now throws an exception if try to send a message before connecting
108  *
109  * Revision 1.12  2005/05/05 16:57:31  jonblower
110  * Updated MINA library to revision 168337 and changed code accordingly
111  *
112  * Revision 1.11  2005/03/22 17:42:24  jonblower
113  * Changed default message size to 8192 for efficiency in MINA
114  *
115  * Revision 1.10  2005/03/22 10:19:52  jonblower
116  * Fixed problem with ByteBuffer leak in StyxMessageDecoder and StyxFileInputStream
117  *
118  * Revision 1.9  2005/03/17 07:30:05  jonblower
119  * Improved error logging code
120  *
121  * Revision 1.8  2005/03/16 17:55:52  jonblower
122  * Replaced use of java.nio.ByteBuffer with MINA's ByteBuffer to minimise copying of buffers
123  *
124  * Revision 1.7  2005/03/15 15:51:37  jonblower
125  * Removed hard limit on maximum message size
126  *
127  * Revision 1.6  2005/03/11 13:58:25  jonblower
128  * Merged MINA-Test_20059309 into main line of development
129  *
130  * Revision 1.5.2.2  2005/03/11 08:29:52  jonblower
131  * Moved to log4j logging system (from apache commons logging)
132  *
133  * Revision 1.5.2.1  2005/03/10 11:48:30  jonblower
134  * Updated to fit in with MINA framework
135  *
136  * Revision 1.4  2005/02/21 18:07:23  jonblower
137  * Separated constructor and connect methods
138  *
139  * Revision 1.3  2005/02/18 17:56:31  jonblower
140  * Set root directory in constructor; doesn't need to wait until connection is made
141  *
142  * Revision 1.1.1.1  2005/02/16 18:58:19  jonblower
143  * Initial import
144  *
145  */
146 public class StyxConnection implements ProtocolHandler
147 {
148     private static final Logger log = Logger.getLogger(StyxConnection.class);
149     
150     /***
151      * The default maximum message size that this connection will request. This
152      * is not necessarily the same as the maximum message size that will be used;
153      * this is up to the remote server.
154      *
155      * This is set to 8192 because it is most efficient if this is a power of two;
156      * MINA allocates its reusable ByteBuffers in sizes that are powers of two. If 
157      * this were, say, 8216 bytes then MINA would allocate ByteBuffers of capacity
158      * 16384 bytes to accommodate a single message.
159      */
160     private static final int DEFAULT_MAX_MESSAGE_SIZE_REQUEST = 8192;
161     
162     private String host; // The host and port to which this is connected
163     private int port;
164     private String username;
165     private String password;
166     
167     private boolean connecting;    // True if connect() or connectAsync() has been
168                                    // called, and the connection has not been closed
169     private boolean connected;     // True if this is connected to the remote
170                                    // server (handshaking might not have been done)
171     private String errMsg;         // Non-null if an error occurred
172     private Vector unsentMessages; // Messages that are waiting for the connection to be
173                                    // established before they are sent
174     
175     private Vector tagsInUse;     // These vectors keep track of the fids
176     private Vector fidsInUse;     // and tags that are in use
177     
178     private Hashtable msgQueue;   // Container for messages that are awaiting reply
179     private Hashtable tClunksPending; // Contains fids of Tclunk messages that are awaiting reply
180     
181     private long rootFid;         // The fid associated with the root of the server
182     private CStyxFile rootDirectory; // The root directory of the server as a CStyxFile
183     private int maxMessageSizeRequest;  // The requested maximum size of message that can be sent on this connection
184     private int maxMessageSize;   // The actual maximum size of message that can be sent on this connection
185     
186     private Vector listeners;     // The StyxConnectionListeners that will be informed of events
187     
188     // MINA components
189     private ProtocolSession session;
190     private IoThreadPoolFilter ioThreadPoolFilter;
191     private ProtocolThreadPoolFilter protocolThreadPoolFilter;
192     
193     private static Integer numSessions = new Integer(0); // The number of sessions that have been opened
194                                                          // This is an Integer object so we can use it as a lock
195     private static final int CONNECT_TIMEOUT = 30; // seconds
196     
197     /***
198      * Creates a new instance of StyxConnection. This does not actually make the
199      * connection; call connectAsync() or connect() to do this.
200      */
201     public StyxConnection(String host, int port, String username, String password,
202         int maxMessageSizeRequest)
203     {
204         this.host = host;
205         this.port = port;
206         this.username = username.trim();
207         this.password = password.trim();
208         this.connecting = false;
209         this.connected = false;
210         this.errMsg = null;
211         this.unsentMessages = new Vector();
212         this.rootFid = -1;
213         this.tagsInUse = new Vector();
214         this.fidsInUse = new Vector();
215         this.msgQueue = new Hashtable();
216         this.tClunksPending = new Hashtable();
217         this.listeners = new Vector();
218         this.maxMessageSizeRequest = maxMessageSizeRequest;
219         this.rootDirectory = this.getFile("/");
220         // The synchronization ensures that the numSessions static variable
221         // can only be altered by one thread at once
222         synchronized(numSessions)
223         {
224             numSessions = new Integer(numSessions.intValue() + 1);
225         }
226     }
227     
228     /***
229      * Uses DEFAULT_MAX_MESSAGE_SIZE_REQUEST
230      */
231     public StyxConnection(String host, int port, String username, String password)
232     {
233         this(host, port, username, password, DEFAULT_MAX_MESSAGE_SIZE_REQUEST);
234     }
235     
236     /***
237      * Creates a new instance of StyxConnection, connecting as an anonymous user
238      */
239     public StyxConnection(String host, int port, int maxMessageSizeRequest)
240     {
241         this(host, port, "", "", maxMessageSizeRequest);
242     }
243     
244     /***
245      * Creates a new instance of StyxConnection, connecting as an anonymous user
246      */
247     public StyxConnection(String host, int port)
248     {
249         this(host, port, "", "");
250     }
251     
252     /***
253      * @return the name (or IP address) of the remote host
254      */
255     public String getRemoteHost()
256     {
257         return this.host;
258     }
259     
260     /***
261      * @return the port of the remote host
262      */
263     public int getRemotePort()
264     {
265         return this.port;
266     }
267     
268     /***
269      * Connects to the remote server and handshakes. This method returns 
270      * immediately; when the connection and handshaking are complete, the
271      * connectionReady() event will be fired on all registered 
272      * StyxConnectionListeners. If an error occurred when connecting or
273      * handshaking, the connectionError() event will be fired on the listeners.
274      * This method will do nothing if we have already connected or are in the
275      * process of connecting.
276      * @throws StyxException if the IOProcessor could not be started
277      */
278     public synchronized void connectAsync() throws StyxException
279     {
280         if (!this.connecting)
281         {
282             this.connecting = true;
283 
284             this.ioThreadPoolFilter = new IoThreadPoolFilter();
285             this.protocolThreadPoolFilter = new ProtocolThreadPoolFilter();
286 
287             // TODO: if we start these thread pool filters, they don't seem to
288             // get stopped again and (one of) the threads are left running.
289             //this.ioThreadPoolFilter.start();
290             //this.protocolThreadPoolFilter.start();
291             
292             IoProtocolConnector connector = new IoProtocolConnector( new SocketConnector() );
293             
294             // TODO: do we need these thread pools for a client connection?
295             //connector.getIoConnector().getFilterChain().addLast("Thread pool filter", ioThreadPoolFilter );
296             //connector.getFilterChain().addLast("Protocol thread pool filter",  protocolThreadPoolFilter );
297 
298             ProtocolProvider protocolProvider = new StyxClientProtocolProvider(this);
299 
300             try
301             {
302                 this.session = connector.connect( new InetSocketAddress( this.host,
303                     this.port ), CONNECT_TIMEOUT, protocolProvider );
304             }
305             catch( IOException e )
306             {
307                 throw new StyxException("Failed to connect: " + e.getMessage());
308             }
309         }
310         else
311         {
312             log.info("Already connecting");
313         }
314     }
315     
316     /***
317      * Connects to the remote server and handshakes. This method blocks until
318      * the connection is made and the handshaking is complete, throwing a
319      * StyxException if an error occurred when connecting. If the connection is
320      * already made, this method does nothing. The connectionReady()
321      * and connectionError() events will be fired on any registered listeners
322      * when the connection is ready or if an error occurs.
323      * @throws StyxException if the IOProcessor could not be started or if 
324      * an error occurred during connection or handshaking
325      */
326     public void connect() throws StyxException
327     {
328         this.connectAsync(); // This does nothing if already started
329         while (this.rootFid < 0 && this.errMsg == null)
330         {
331             synchronized(this)
332             {
333                 try
334                 {
335                     this.wait();
336                 }
337                 catch(InterruptedException ie)
338                 {
339                     // do nothing
340                 }
341             }
342         }
343         if (this.errMsg != null)
344         {
345             // An error occurred when connecting
346             throw new StyxException(this.errMsg);
347         }
348     }
349     
350     /***
351      * Overrides the close() method in Session.  Clunks all fids before
352      * closing the connection.  Does nothing if the connection has not been made.
353      * If the connection has been made, but the handshaking has not been done,
354      * this will enqueue the close request so that, when handshaking is
355      * complete, the fids will be clunked.
356      * Note that there is a possibility that if close() is called immediately
357      * after connectAsync(), the call to close() will return (having done nothing)
358      * before the connection is open. TODO: what can we do about this?
359      * The session is only definitively closed when the connectionClosed() event
360      * is fired on the registered StyxConnectionListeners.
361      * @todo rename this closeAsync() and implement a blocking close() method?
362      */
363     public void close()
364     {
365         log.debug("Called close() on StyxConnection");
366         if (this.connected)
367         {
368             // Start off the chain of clunking fids by clunking the last fid 
369             // in the list. When the reply arrives, the next fid will be clunked,
370             // and so forth until there are no more fids to clunk.
371             // TODO: The problem with this approach is that if any of the Rclunks
372             // do not arrive the connection will not close properly.  Should we
373             // implement a timeout or something?
374             this.clunkNextFid
375             (
376                 new MessageCallback()
377                 {
378                     public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
379                     {
380                         clunkNextFid(this);
381                     }
382                     public void error(String message, StyxMessage tMessage)
383                     {
384                         log.debug("Error clunking fid: " + message);
385                         clunkNextFid(this);
386                     }
387                 }
388             );
389         }
390         else
391         {
392             //log.debug("Connection was not open");
393             // Make sure the threads are stopped
394             this.sessionClosed(this.session);
395         }
396     }
397     
398     /***
399      * Clunks the last fid in the fidsInUse vector. If there are no more fids
400      * left to clunk, the connection is closed.
401      * @todo Should we actually be closing CStyxFiles rather than clunking the fids?
402      */
403     private void clunkNextFid(MessageCallback callback)
404     {
405         int openFids = fidsInUse.size();
406         boolean tClunkSent = false;
407         int i = 1;
408         while(!tClunkSent && (openFids - i) >= 0)
409         {            
410             // Get the last fid in the list
411             Long fid = (Long)fidsInUse.get(openFids - i);
412             if (this.tClunksPending.containsValue(fid))
413             {
414                 // We have already sent a Tclunk for this fid. Look for the next fid.
415                 i++;
416             }
417             else
418             {
419                 // We don't have a Tclunk outstanding for this fid
420                 this.sendAsync(new TclunkMessage(fid.longValue()), callback);
421                 tClunkSent = true;
422             }
423         }
424         if(!tClunkSent)
425         {
426             // No more fids left to clunk
427             this.session.close();
428         }
429     }
430     
431     /***
432      * Gets the CStyxFile representing the root directory of the remote Styx
433      * server
434      */
435     public CStyxFile getRootDirectory()
436     {
437         return this.rootDirectory;
438     }
439     
440     /***
441      * Simple struct to contain a T-message plus its associated callback. Note 
442      * that we cannot store the T-message in the callback itself because a callback
443      * may be associated with several T-messages
444      */
445     private class MessagePlusCallback
446     {
447         private StyxMessage tMessage;
448         private MessageCallback callback;
449         public MessagePlusCallback(StyxMessage tMessage, MessageCallback callback)
450         {
451             this.tMessage = tMessage;
452             this.callback = callback;
453         }
454     }
455     
456     /***
457      * Sends a message and returns its tag.  Note that this method will return
458      * immediately.  When the reply arrives, the replyArrived() method of the
459      * callback object will be called. (The callback can be null.).  If the 
460      * connection hasn't been made yet, the message will be put in a queue and
461      * will be sent when the connection is ready (TODO this is convenient as it
462      * saves waiting for the connectionReady() event, but is this the best thing
463      * to do?)
464      *
465      * If this is called before connect() or connectAsync(), the callback's error
466      * function will be called.
467      *
468      * @param tMessage the message to be sent
469      * @param callback the MessageCallback to be called when the reply arrives
470      * @return the tag of the outgoing message
471      */
472     public int sendAsync(StyxMessage tMessage, MessageCallback callback)
473     {
474         return this.sendAsync(tMessage, callback, false);
475     }
476     
477     /***
478      * Sends a message and returns its tag.  Note that this method will return
479      * immediately.  When the reply arrives, the replyArrived() method of the
480      * callback object will be called. (The callback can be null.).  If the 
481      * connection hasn't been made yet, the message will be put in a queue and
482      * will be sent when the connection is ready (TODO this is convenient as it
483      * saves waiting for the connectionReady() event, but is this the best thing
484      * to do?)
485      *
486      * If this is called before connect() or connectAsync(), the callback's error
487      * function will be called unless isHandshake==true;
488      *
489      * @param tMessage the message to be sent
490      * @param callback the MessageCallback to be called when the reply arrives
491      * @param isHandshake if true, this message is part of the connection process
492      * itself (e.g. authentication)
493      * @return the tag of the outgoing message
494      */
495     public int sendAsync(StyxMessage tMessage, MessageCallback callback, boolean isHandshake)
496     {
497         if (!this.connecting)
498         {
499             if (callback != null)
500             {
501                 callback.error("Must connect before sending a message", tMessage);
502             }
503             else
504             {
505                 log.error("Attempt to send a message before connecting");
506             }
507         }
508         // Set the tag for the message: this is also the key for the message
509         // in the Hashtable of outstanding messages
510         int tag;
511         if (tMessage instanceof TversionMessage)
512         {            
513             // if this is a TversionMessage, we can use NOTAG
514             tag = StyxUtils.NOTAG;
515         }
516         else
517         {
518             tag = this.getFreeTag();
519         }
520         tMessage.setTag(tag);
521 
522         if (callback != null)
523         {
524             // add to queue of waiting messages
525             this.msgQueue.put(new Integer(tag), new MessagePlusCallback(tMessage, callback));
526         }
527         
528         if (tMessage instanceof TclunkMessage)
529         {
530             // Store the fids of outstanding Tclunks so we don't attempt to
531             // send another clunk for this fid when we close the connection
532             TclunkMessage tClunkMsg = (TclunkMessage)tMessage;
533             this.tClunksPending.put(new Integer(tag), new Long(tClunkMsg.getFid()));
534         }
535         
536         synchronized(this)
537         {
538             // If the connection is ready (signalled by rootFid being set), or
539             // if this is part of a handshake, send the message, otherwise, enqueue it
540             if (this.rootFid >= 0 || tMessage instanceof TclunkMessage || isHandshake)
541             {
542                 // Send the message
543                 this.session.write(tMessage);
544             }
545             else
546             {
547                 // We just store the message because the callback has already been stored
548                 this.unsentMessages.add(tMessage);
549             }
550         }
551         
552         // return the message's tag
553         return tag;
554     }
555     
556     /***
557      * Sends a message and blocks until the corresponding reply arrives
558      * @throws StyxException if the message type is not as expected, or if the 
559      * connection has not been made.
560      */
561     public StyxMessage send(StyxMessage message) throws StyxException
562     {
563         // Create a callback object
564         StyxReplyCallback callback = new StyxReplyCallback();
565         this.sendAsync(message, callback);
566         // callback.getReply() waits until the reply arrives, then returns the reply
567         StyxMessage reply = callback.getReply();
568         return reply;
569     }
570     
571     /***
572      * Called when a reply has arrived from a Styx server
573      */
574     public void messageReceived( ProtocolSession session, Object message )
575     {
576         if (log.isDebugEnabled())
577         {
578             log.debug("RCVD: " + message);
579         }
580         StyxMessage rMessage = (StyxMessage)message;
581         
582         // Get the tag of the reply
583         int tag = rMessage.getTag();
584         
585         // Find the callback object in the list of outstanding messages
586         MessagePlusCallback mpc = (MessagePlusCallback)this.msgQueue.remove(new Integer(tag));
587         
588         // Return the tag to the pool if it isn't a RversionMessage
589         if (!(message instanceof RversionMessage))
590         {
591             this.returnTag(tag);
592         }
593         
594         if (message instanceof RclunkMessage)
595         {
596             Long lngFid = (Long)this.tClunksPending.remove(new Integer(tag));
597             // Return the fid to the pool
598             this.returnFid(lngFid.longValue());
599         }
600         
601         if (mpc != null && mpc.callback != null)
602         {
603             mpc.callback.gotReply(rMessage, mpc.tMessage);
604         }
605     }
606     
607     /***
608      * @return the next unused fid, or -1 if there are none left (this is
609      * extremely unlikely and would require StyxUtils.MAXUINT tags to be in
610      * use - this would probably only happen due to a bug)
611      */
612     public long getFreeFid()
613     {
614         synchronized(this.fidsInUse)
615         {
616             for (long i = 0; i < StyxUtils.MAXUINT; i++)
617             {
618                 Long lngFid = new Long(i);
619                 // Check to see if this tag is already in use
620                 if (!this.fidsInUse.contains(lngFid))
621                 {
622                     // If not in use, add to the Vector, then return it
623                     this.fidsInUse.add(lngFid);
624                     return i;
625                 }
626             }
627             return -1;
628         }
629     }
630     
631     /***
632      * @return the next unused tag, or -1 if there are none left (this is
633      * extremely unlikely and would require 65535 tags to be in simultaneous
634      * use - this would probably only happen due to a bug)
635      * @todo: this works by searching a Vector of used tags. Would this be
636      * quicker if it were done some other way?
637      */
638     private int getFreeTag()
639     {
640         synchronized(this.tagsInUse)
641         {
642             for (int i = 0; i < StyxUtils.MAXUSHORT; i++)
643             {
644                 // Check to see if this tag is already in use
645                 if (!this.tagsInUse.contains(new Integer(i)))
646                 {
647                     // If not in use, add to the Vector, then return it
648                     this.tagsInUse.add(new Integer(i));
649                     return i;
650                 }
651             }
652             return -1;
653         }
654     }
655     
656     /***
657      * Returns the given fid back to the pool
658      */
659     public void returnFid(long fid)
660     {
661         // The remove() method is synchronized so this is safe
662         this.fidsInUse.remove(new Long(fid));
663     }
664     
665     /***
666      * Returns the given tag back to the pool
667      */
668     private void returnTag(int tag)
669     {
670         // The remove() method is synchronized so this is safe
671         this.tagsInUse.remove(new Integer(tag));
672     }
673     
674     /***
675      * @return the fid associated with the root of the remote server
676      */
677     public long getRootFid()
678     {
679         return this.rootFid;
680     }
681     
682     /***
683      * @return the maximum size of message that can be sent on this connection
684      */
685     public int getMaxMessageSize()
686     {
687         return this.maxMessageSize;
688     }
689     
690     /***
691      * Sets the maximum size of message that can be sent on this connection
692      */
693     public void setMaxMessageSize(int maxMessageSize)
694     {
695         this.maxMessageSize = maxMessageSize;
696     }
697     
698     /***
699      * Invoked when the session is created.  Initialize default socket
700      * parameters and user-defined attributes here.
701      */
702     public void sessionCreated( ProtocolSession session ) throws Exception
703     {
704         // TODO: not sure what should be done in this method
705         log.debug("Connection created");
706     }
707     
708     /***
709      * Called when the socket connection to the remote server has been established
710      */
711     public void sessionOpened( ProtocolSession session )
712     {
713         this.connected = true;
714         log.debug("Connection established.");
715         TversionMessage tVerMsg = new TversionMessage(this.maxMessageSizeRequest);
716         this.sendAsync(tVerMsg, new TversionCallback(), true);
717     }
718     
719     private class TversionCallback extends MessageCallback
720     {
721         public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
722         {
723             RversionMessage rVerMsg = (RversionMessage)rMessage;
724             maxMessageSize = (int)rVerMsg.getMaxMessageSize();
725             if (username.equalsIgnoreCase(""))
726             {
727                 log.debug("Unauthenticated connection");
728                 TattachMessage tAttMsg = new TattachMessage(getFreeFid(), username);
729                 sendAsync(tAttMsg, new TattachCallback(), true);
730             }
731             else
732             {
733                 log.debug("Authenticated connection");
734                 TauthMessage tAuthMsg = new TauthMessage(getFreeFid(), username, "");
735                 sendAsync(tAuthMsg, new TauthCallback(), true);
736             }
737         }
738         public void error(String message, StyxMessage tMessage)
739         {
740             fireStyxConnectionError(new Throwable(message));
741         }
742     }
743     
744     private class TauthCallback extends MessageCallback
745     {
746         public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
747         {
748             TauthMessage tAuthMsg = (TauthMessage)tMessage;
749             RauthMessage rAuthMsg = (RauthMessage)rMessage;
750             // We now have a fid to an authentication file.  We write the
751             // password into this file
752             CStyxFile authFile = getFile("/auth");
753             authFile.setFid(tAuthMsg.getAfid());
754             authFile.setQid(rAuthMsg.getAQid());
755             authFile.writeAsync(password, 0, new AuthFileWriteCallback(tAuthMsg.getAfid()));
756         }
757         public void error(String message, StyxMessage tMessage)
758         {
759             TauthMessage tAuthMsg = (TauthMessage)tMessage;
760             returnFid(tAuthMsg.getAfid());
761             fireStyxConnectionError(new Throwable(message));
762         }
763     }
764     
765     /***
766      * This callback is called when we have written the password to the
767      * auth file
768      */
769     private class AuthFileWriteCallback extends MessageCallback
770     {
771         private long afid;
772         public AuthFileWriteCallback(long afid)
773         {
774             this.afid = afid;
775         }
776         public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
777         {
778             // The write was successful.  Now we can attach
779             TattachMessage tAttMsg =
780                 new TattachMessage(getFreeFid(), this.afid, username, "");
781             sendAsync(tAttMsg, new TattachCallback(), true);
782         }
783         public void error(String message, StyxMessage tMessage)
784         {
785             fireStyxConnectionError(new Throwable(message));
786         }
787     }
788     
789     private class TattachCallback extends MessageCallback
790     {
791         public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
792         {
793             TattachMessage tAttMsg = (TattachMessage)tMessage;
794             rootFid = tAttMsg.getFid();
795             fireStyxConnectionReady();
796         }
797         public void error(String message, StyxMessage tMessage)
798         {
799             TattachMessage tAttMsg = (TattachMessage)tMessage;
800             returnFid(tAttMsg.getFid());
801             fireStyxConnectionError(new Throwable(message));
802         }
803     }
804     
805     /***
806      * Called when the connection is closed
807      */
808     public void sessionClosed( ProtocolSession session )
809     {
810         if (this.connected)
811         {
812             log.debug("Connection closed.");
813         }
814         this.connected = false;
815         this.connecting = false;
816         this.rootFid = -1;
817         
818         // Stop threads
819         this.ioThreadPoolFilter.stop();
820         this.protocolThreadPoolFilter.stop();
821         
822         // Free resources associated with StyxMessageDecoder
823         StyxMessageDecoder decoder = (StyxMessageDecoder)this.session.getDecoder();
824         decoder.release();
825         
826         // The synchronization ensures that the numSessions static variable
827         // can only be altered by one thread at once
828         // TODO: we're not using this at the moment. Can we get rid of it?
829         synchronized(numSessions)
830         {
831             numSessions = new Integer(numSessions.intValue() - 1);
832             if (numSessions.equals(new Integer(0)));
833             {
834                 // If this is the last session that has been closed, request that
835                 // the IoProcessor thread stopped.
836                 // TODO: what do we do here in MINA?
837                 // StyxUtils.stopIoProcessor();
838             }
839         }
840         this.fireStyxConnectionClosed();
841     }
842     
843     /***
844      * Called when an exception is caught by MINA; fires the connectError()
845      * event on all registered listeners and closes the connection.
846      */
847     public void exceptionCaught( ProtocolSession session, Throwable cause )
848     {
849         this.fireStyxConnectionError(cause);
850     }
851     
852     /***
853      * Gets a CStyxFile with the given path.  Note that each call to this method
854      * will return a new object, even if the path is identical.
855      * This does not open, create or check the existence of the file: no
856      * messages are sent to the server in this method so this will never block.
857      * @throws InvalidPathException if the given path is not valid and absolute
858      * (only catch this runtime exception if it is likely that the path could be
859      * invalid, e.g. when the path is being input by a user)
860      */
861     public CStyxFile getFile(String path)
862     {
863         if (!path.startsWith("/"))
864         {
865             path = "/" + path;
866         }
867         return new CStyxFile(this, path);
868     }
869     
870     /***
871      * Opens a file on the server, throwing a StyxException if the file can't 
872      * be found or opened in the given mode. Blocks until the file is open.
873      * @param path The path of the file relative to the server root.
874      * @param mode Integer representing the mode - see the constants in StyxUtils.
875      * For example, to open a file for reading, use StyxUtils.OREAD. To open a
876      * file for writing with truncation use StyxUtils.OWRITE | StyxUtils.OTRUNC.
877      * @return The file that has just been opened
878      */
879     public CStyxFile openFile(String path, int mode) throws StyxException
880     {
881         CStyxFile file = this.getFile(path);
882         file.open(mode);
883         return file;
884     }
885     
886     /***
887      * Gets the entire contents of a file as a single string. Opens the file
888      * for reading, reads the contents and closes the file.
889      * This should only be used for relatively short files that can sensibly
890      * fit into a String - do not use for large files as not only will this 
891      * method block until the file is read, you may run into memory problems.
892      * @param path The path of the file relative to the server root.
893      */
894     public String getContents(String path) throws StyxException
895     {
896         return this.getFile(path).getContents();        
897     }
898     
899     /***
900      * Adds a StyxConnectionListener to this connection. The methods of the
901      * listener will be called when events happen, such as the connection being
902      * ready for messages, the connection being closed, and an error in connection.
903      * If this listener is already registered, this method does nothing.
904      */
905     public void addListener(StyxConnectionListener listener)
906     {
907         synchronized(this.listeners)
908         {
909             if (!this.listeners.contains(listener))
910             {
911                 this.listeners.add(listener);
912             }
913         }
914     }
915     
916     /***
917      * Removes the given listener; does nothing if the listener has not been
918      * registered with this.addListener().
919      */
920     public void removeListener(StyxConnectionListener listener)
921     {
922         this.listeners.remove(listener);
923     }
924     
925     /***
926      * Fired when the connection is ready for traffic (i.e. when a Rattach
927      * message arrives).
928      */
929     private void fireStyxConnectionReady()
930     {
931         log.info("***** CONNECTION OPENED TO " + host + ":" + port
932             + " *****");
933         synchronized(this)
934         {
935             // Notify any waiting threads (e.g. if using connect())
936             this.notifyAll();
937         }
938         synchronized(this.listeners)
939         {
940             for (int i = 0; i < this.listeners.size(); i++)
941             {
942                 StyxConnectionListener listener =
943                     (StyxConnectionListener)this.listeners.get(i);
944                 listener.connectionReady(this);
945             }
946         }
947         // Now we can send the messages that are awaiting connection
948         synchronized(this.unsentMessages)
949         {
950             Iterator it = this.unsentMessages.iterator();
951             while(it.hasNext())
952             {
953                 StyxMessage message = (StyxMessage)it.next();
954                 log.debug("Sending message now connection is ready");
955                 this.session.write(message);
956                 it.remove();
957             }
958         }
959     }
960     
961     /***
962      * Fired when the connection is closed.
963      */
964     private void fireStyxConnectionClosed()
965     {
966         log.info("***** CONNECTION TO " + this.host + ":" + this.port
967             + " CLOSED *****");
968         synchronized(this.listeners)
969         {
970             for (int i = 0; i < this.listeners.size(); i++)
971             {
972                 StyxConnectionListener listener =
973                     (StyxConnectionListener)this.listeners.get(i);
974                 listener.connectionClosed(this);
975             }
976         }
977     }
978     
979     /***
980      * Fired when an error occurs (i.e. an exception is caught by MINA)
981      */
982     private void fireStyxConnectionError(Throwable cause)
983     {
984         this.errMsg = cause.getMessage();
985         if (log.isDebugEnabled())
986         {
987             cause.printStackTrace();
988         }
989         log.error("***** ERROR OCCURRED ON CONNECTION TO " + this.host +
990             ":" + this.port + " (" + cause.getClass().getName() + ":" +
991             this.errMsg + ") *****");
992         synchronized(this)
993         {
994             this.notifyAll();
995         }
996         synchronized(this.listeners)
997         {
998             for (int i = 0; i < this.listeners.size(); i++)
999             {
1000                 StyxConnectionListener listener =
1001                     (StyxConnectionListener)this.listeners.get(i);
1002                 listener.connectionError(this, this.errMsg);
1003             }
1004         }
1005         // call error() on all waiting callbacks of outstanding messages
1006         synchronized(this.unsentMessages)
1007         {
1008             Iterator it = this.unsentMessages.iterator();
1009             while(it.hasNext())
1010             {
1011                 StyxMessage tMessage = (StyxMessage)it.next();
1012                 int tag = tMessage.getTag();
1013                 MessageCallback callback =
1014                     (MessageCallback)this.msgQueue.remove(new Integer(tag));
1015                 callback.error("Could not send message: " + tMessage, tMessage);
1016                 it.remove();
1017             }
1018         }
1019         // Make sure the connection is closed
1020         this.close();
1021     }
1022     
1023     /***
1024      * Called by MINA when a message has been sent
1025      */
1026     public void messageSent( ProtocolSession session, Object message )
1027     {
1028         if (log.isDebugEnabled())
1029         {
1030             log.debug("SENT: " + message);
1031         }
1032     }
1033     
1034     /***
1035      * Required by the ProtocolHandler interface. Does nothing in this class.
1036      */
1037     public void sessionIdle( ProtocolSession session, IdleStatus status )
1038     {
1039         return;
1040     }
1041 }