1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package uk.ac.rdg.resc.jstyx.client;
30
31 import java.net.InetSocketAddress;
32 import java.net.ConnectException;
33 import java.io.IOException;
34
35 import java.util.Hashtable;
36 import java.util.Vector;
37 import java.util.Iterator;
38
39 import org.apache.mina.io.IoFilter;
40 import org.apache.mina.io.filter.IoThreadPoolFilter;
41 import org.apache.mina.protocol.filter.ProtocolThreadPoolFilter;
42 import org.apache.mina.io.socket.SocketConnector;
43 import org.apache.mina.protocol.ProtocolHandler;
44 import org.apache.mina.protocol.ProtocolFilter;
45 import org.apache.mina.protocol.ProtocolProvider;
46 import org.apache.mina.protocol.ProtocolSession;
47 import org.apache.mina.protocol.io.IoProtocolConnector;
48 import org.apache.mina.common.IdleStatus;
49
50 import org.apache.log4j.Logger;
51
52 import uk.ac.rdg.resc.jstyx.messages.StyxMessageDecoder;
53 import uk.ac.rdg.resc.jstyx.messages.StyxMessage;
54 import uk.ac.rdg.resc.jstyx.messages.TversionMessage;
55 import uk.ac.rdg.resc.jstyx.messages.RversionMessage;
56 import uk.ac.rdg.resc.jstyx.messages.TauthMessage;
57 import uk.ac.rdg.resc.jstyx.messages.RauthMessage;
58 import uk.ac.rdg.resc.jstyx.messages.TattachMessage;
59 import uk.ac.rdg.resc.jstyx.messages.RattachMessage;
60 import uk.ac.rdg.resc.jstyx.messages.TclunkMessage;
61 import uk.ac.rdg.resc.jstyx.messages.RclunkMessage;
62
63 import uk.ac.rdg.resc.jstyx.StyxUtils;
64 import uk.ac.rdg.resc.jstyx.StyxException;
65
66 /***
67 * Object representing a client connection to a Styx server.
68 *
69 * @author Jon Blower
70 * $Revision: 604 $
71 * $Date: 2006-03-21 14:58:42 +0000 (Tue, 21 Mar 2006) $
72 * $Log$
73 * Revision 1.28 2006/03/21 14:58:41 jonblower
74 * Implemented clear-text password-based authentication and did some simple tests
75 *
76 * Revision 1.27 2006/03/21 09:06:14 jonblower
77 * Still implementing authentication
78 *
79 * Revision 1.24 2005/08/08 09:35:19 jonblower
80 * Commented out thread pool filters
81 *
82 * Revision 1.23 2005/07/08 16:01:27 jonblower
83 * Reinstated ProtocolThreadPoolFilter
84 *
85 * Revision 1.22 2005/07/08 15:22:54 jonblower
86 * Upgraded MINA library to 0.7.3-SNAPSHOT
87 *
88 * Revision 1.21 2005/06/27 17:17:15 jonblower
89 * Changed MessageCallback to pass Tmessage as parameter, rather than storing in the instance
90 *
91 * Revision 1.18 2005/05/25 15:37:55 jonblower
92 * Removed cache of CStyxFiles, dealt differently with root fid
93 *
94 * Revision 1.17 2005/05/23 16:48:17 jonblower
95 * Overhauled CStyxFile (esp. asynchronous methods) and StyxConnection (added cache of CStyxFiles)
96 *
97 * Revision 1.16 2005/05/16 16:16:52 jonblower
98 * Implemented getRemoteHost() and getRemotePort()
99 *
100 * Revision 1.15 2005/05/16 13:09:54 jonblower
101 * Added StyxConnection object as first argument in all StyxConnectionListener methods
102 *
103 * Revision 1.14 2005/05/16 12:57:39 jonblower
104 * Constructors no longer throw StyxException
105 *
106 * Revision 1.13 2005/05/09 13:35:48 jonblower
107 * Now throws an exception if try to send a message before connecting
108 *
109 * Revision 1.12 2005/05/05 16:57:31 jonblower
110 * Updated MINA library to revision 168337 and changed code accordingly
111 *
112 * Revision 1.11 2005/03/22 17:42:24 jonblower
113 * Changed default message size to 8192 for efficiency in MINA
114 *
115 * Revision 1.10 2005/03/22 10:19:52 jonblower
116 * Fixed problem with ByteBuffer leak in StyxMessageDecoder and StyxFileInputStream
117 *
118 * Revision 1.9 2005/03/17 07:30:05 jonblower
119 * Improved error logging code
120 *
121 * Revision 1.8 2005/03/16 17:55:52 jonblower
122 * Replaced use of java.nio.ByteBuffer with MINA's ByteBuffer to minimise copying of buffers
123 *
124 * Revision 1.7 2005/03/15 15:51:37 jonblower
125 * Removed hard limit on maximum message size
126 *
127 * Revision 1.6 2005/03/11 13:58:25 jonblower
128 * Merged MINA-Test_20059309 into main line of development
129 *
130 * Revision 1.5.2.2 2005/03/11 08:29:52 jonblower
131 * Moved to log4j logging system (from apache commons logging)
132 *
133 * Revision 1.5.2.1 2005/03/10 11:48:30 jonblower
134 * Updated to fit in with MINA framework
135 *
136 * Revision 1.4 2005/02/21 18:07:23 jonblower
137 * Separated constructor and connect methods
138 *
139 * Revision 1.3 2005/02/18 17:56:31 jonblower
140 * Set root directory in constructor; doesn't need to wait until connection is made
141 *
142 * Revision 1.1.1.1 2005/02/16 18:58:19 jonblower
143 * Initial import
144 *
145 */
146 public class StyxConnection implements ProtocolHandler
147 {
148 private static final Logger log = Logger.getLogger(StyxConnection.class);
149
150 /***
151 * The default maximum message size that this connection will request. This
152 * is not necessarily the same as the maximum message size that will be used;
153 * this is up to the remote server.
154 *
155 * This is set to 8192 because it is most efficient if this is a power of two;
156 * MINA allocates its reusable ByteBuffers in sizes that are powers of two. If
157 * this were, say, 8216 bytes then MINA would allocate ByteBuffers of capacity
158 * 16384 bytes to accommodate a single message.
159 */
160 private static final int DEFAULT_MAX_MESSAGE_SIZE_REQUEST = 8192;
161
162 private String host;
163 private int port;
164 private String username;
165 private String password;
166
167 private boolean connecting;
168
169 private boolean connected;
170
171 private String errMsg;
172 private Vector unsentMessages;
173
174
175 private Vector tagsInUse;
176 private Vector fidsInUse;
177
178 private Hashtable msgQueue;
179 private Hashtable tClunksPending;
180
181 private long rootFid;
182 private CStyxFile rootDirectory;
183 private int maxMessageSizeRequest;
184 private int maxMessageSize;
185
186 private Vector listeners;
187
188
189 private ProtocolSession session;
190 private IoThreadPoolFilter ioThreadPoolFilter;
191 private ProtocolThreadPoolFilter protocolThreadPoolFilter;
192
193 private static Integer numSessions = new Integer(0);
194
195 private static final int CONNECT_TIMEOUT = 30;
196
197 /***
198 * Creates a new instance of StyxConnection. This does not actually make the
199 * connection; call connectAsync() or connect() to do this.
200 */
201 public StyxConnection(String host, int port, String username, String password,
202 int maxMessageSizeRequest)
203 {
204 this.host = host;
205 this.port = port;
206 this.username = username.trim();
207 this.password = password.trim();
208 this.connecting = false;
209 this.connected = false;
210 this.errMsg = null;
211 this.unsentMessages = new Vector();
212 this.rootFid = -1;
213 this.tagsInUse = new Vector();
214 this.fidsInUse = new Vector();
215 this.msgQueue = new Hashtable();
216 this.tClunksPending = new Hashtable();
217 this.listeners = new Vector();
218 this.maxMessageSizeRequest = maxMessageSizeRequest;
219 this.rootDirectory = this.getFile("/");
220
221
222 synchronized(numSessions)
223 {
224 numSessions = new Integer(numSessions.intValue() + 1);
225 }
226 }
227
228 /***
229 * Uses DEFAULT_MAX_MESSAGE_SIZE_REQUEST
230 */
231 public StyxConnection(String host, int port, String username, String password)
232 {
233 this(host, port, username, password, DEFAULT_MAX_MESSAGE_SIZE_REQUEST);
234 }
235
236 /***
237 * Creates a new instance of StyxConnection, connecting as an anonymous user
238 */
239 public StyxConnection(String host, int port, int maxMessageSizeRequest)
240 {
241 this(host, port, "", "", maxMessageSizeRequest);
242 }
243
244 /***
245 * Creates a new instance of StyxConnection, connecting as an anonymous user
246 */
247 public StyxConnection(String host, int port)
248 {
249 this(host, port, "", "");
250 }
251
252 /***
253 * @return the name (or IP address) of the remote host
254 */
255 public String getRemoteHost()
256 {
257 return this.host;
258 }
259
260 /***
261 * @return the port of the remote host
262 */
263 public int getRemotePort()
264 {
265 return this.port;
266 }
267
268 /***
269 * Connects to the remote server and handshakes. This method returns
270 * immediately; when the connection and handshaking are complete, the
271 * connectionReady() event will be fired on all registered
272 * StyxConnectionListeners. If an error occurred when connecting or
273 * handshaking, the connectionError() event will be fired on the listeners.
274 * This method will do nothing if we have already connected or are in the
275 * process of connecting.
276 * @throws StyxException if the IOProcessor could not be started
277 */
278 public synchronized void connectAsync() throws StyxException
279 {
280 if (!this.connecting)
281 {
282 this.connecting = true;
283
284 this.ioThreadPoolFilter = new IoThreadPoolFilter();
285 this.protocolThreadPoolFilter = new ProtocolThreadPoolFilter();
286
287
288
289
290
291
292 IoProtocolConnector connector = new IoProtocolConnector( new SocketConnector() );
293
294
295
296
297
298 ProtocolProvider protocolProvider = new StyxClientProtocolProvider(this);
299
300 try
301 {
302 this.session = connector.connect( new InetSocketAddress( this.host,
303 this.port ), CONNECT_TIMEOUT, protocolProvider );
304 }
305 catch( IOException e )
306 {
307 throw new StyxException("Failed to connect: " + e.getMessage());
308 }
309 }
310 else
311 {
312 log.info("Already connecting");
313 }
314 }
315
316 /***
317 * Connects to the remote server and handshakes. This method blocks until
318 * the connection is made and the handshaking is complete, throwing a
319 * StyxException if an error occurred when connecting. If the connection is
320 * already made, this method does nothing. The connectionReady()
321 * and connectionError() events will be fired on any registered listeners
322 * when the connection is ready or if an error occurs.
323 * @throws StyxException if the IOProcessor could not be started or if
324 * an error occurred during connection or handshaking
325 */
326 public void connect() throws StyxException
327 {
328 this.connectAsync();
329 while (this.rootFid < 0 && this.errMsg == null)
330 {
331 synchronized(this)
332 {
333 try
334 {
335 this.wait();
336 }
337 catch(InterruptedException ie)
338 {
339
340 }
341 }
342 }
343 if (this.errMsg != null)
344 {
345
346 throw new StyxException(this.errMsg);
347 }
348 }
349
350 /***
351 * Overrides the close() method in Session. Clunks all fids before
352 * closing the connection. Does nothing if the connection has not been made.
353 * If the connection has been made, but the handshaking has not been done,
354 * this will enqueue the close request so that, when handshaking is
355 * complete, the fids will be clunked.
356 * Note that there is a possibility that if close() is called immediately
357 * after connectAsync(), the call to close() will return (having done nothing)
358 * before the connection is open. TODO: what can we do about this?
359 * The session is only definitively closed when the connectionClosed() event
360 * is fired on the registered StyxConnectionListeners.
361 * @todo rename this closeAsync() and implement a blocking close() method?
362 */
363 public void close()
364 {
365 log.debug("Called close() on StyxConnection");
366 if (this.connected)
367 {
368
369
370
371
372
373
374 this.clunkNextFid
375 (
376 new MessageCallback()
377 {
378 public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
379 {
380 clunkNextFid(this);
381 }
382 public void error(String message, StyxMessage tMessage)
383 {
384 log.debug("Error clunking fid: " + message);
385 clunkNextFid(this);
386 }
387 }
388 );
389 }
390 else
391 {
392
393
394 this.sessionClosed(this.session);
395 }
396 }
397
398 /***
399 * Clunks the last fid in the fidsInUse vector. If there are no more fids
400 * left to clunk, the connection is closed.
401 * @todo Should we actually be closing CStyxFiles rather than clunking the fids?
402 */
403 private void clunkNextFid(MessageCallback callback)
404 {
405 int openFids = fidsInUse.size();
406 boolean tClunkSent = false;
407 int i = 1;
408 while(!tClunkSent && (openFids - i) >= 0)
409 {
410
411 Long fid = (Long)fidsInUse.get(openFids - i);
412 if (this.tClunksPending.containsValue(fid))
413 {
414
415 i++;
416 }
417 else
418 {
419
420 this.sendAsync(new TclunkMessage(fid.longValue()), callback);
421 tClunkSent = true;
422 }
423 }
424 if(!tClunkSent)
425 {
426
427 this.session.close();
428 }
429 }
430
431 /***
432 * Gets the CStyxFile representing the root directory of the remote Styx
433 * server
434 */
435 public CStyxFile getRootDirectory()
436 {
437 return this.rootDirectory;
438 }
439
440 /***
441 * Simple struct to contain a T-message plus its associated callback. Note
442 * that we cannot store the T-message in the callback itself because a callback
443 * may be associated with several T-messages
444 */
445 private class MessagePlusCallback
446 {
447 private StyxMessage tMessage;
448 private MessageCallback callback;
449 public MessagePlusCallback(StyxMessage tMessage, MessageCallback callback)
450 {
451 this.tMessage = tMessage;
452 this.callback = callback;
453 }
454 }
455
456 /***
457 * Sends a message and returns its tag. Note that this method will return
458 * immediately. When the reply arrives, the replyArrived() method of the
459 * callback object will be called. (The callback can be null.). If the
460 * connection hasn't been made yet, the message will be put in a queue and
461 * will be sent when the connection is ready (TODO this is convenient as it
462 * saves waiting for the connectionReady() event, but is this the best thing
463 * to do?)
464 *
465 * If this is called before connect() or connectAsync(), the callback's error
466 * function will be called.
467 *
468 * @param tMessage the message to be sent
469 * @param callback the MessageCallback to be called when the reply arrives
470 * @return the tag of the outgoing message
471 */
472 public int sendAsync(StyxMessage tMessage, MessageCallback callback)
473 {
474 return this.sendAsync(tMessage, callback, false);
475 }
476
477 /***
478 * Sends a message and returns its tag. Note that this method will return
479 * immediately. When the reply arrives, the replyArrived() method of the
480 * callback object will be called. (The callback can be null.). If the
481 * connection hasn't been made yet, the message will be put in a queue and
482 * will be sent when the connection is ready (TODO this is convenient as it
483 * saves waiting for the connectionReady() event, but is this the best thing
484 * to do?)
485 *
486 * If this is called before connect() or connectAsync(), the callback's error
487 * function will be called unless isHandshake==true;
488 *
489 * @param tMessage the message to be sent
490 * @param callback the MessageCallback to be called when the reply arrives
491 * @param isHandshake if true, this message is part of the connection process
492 * itself (e.g. authentication)
493 * @return the tag of the outgoing message
494 */
495 public int sendAsync(StyxMessage tMessage, MessageCallback callback, boolean isHandshake)
496 {
497 if (!this.connecting)
498 {
499 if (callback != null)
500 {
501 callback.error("Must connect before sending a message", tMessage);
502 }
503 else
504 {
505 log.error("Attempt to send a message before connecting");
506 }
507 }
508
509
510 int tag;
511 if (tMessage instanceof TversionMessage)
512 {
513
514 tag = StyxUtils.NOTAG;
515 }
516 else
517 {
518 tag = this.getFreeTag();
519 }
520 tMessage.setTag(tag);
521
522 if (callback != null)
523 {
524
525 this.msgQueue.put(new Integer(tag), new MessagePlusCallback(tMessage, callback));
526 }
527
528 if (tMessage instanceof TclunkMessage)
529 {
530
531
532 TclunkMessage tClunkMsg = (TclunkMessage)tMessage;
533 this.tClunksPending.put(new Integer(tag), new Long(tClunkMsg.getFid()));
534 }
535
536 synchronized(this)
537 {
538
539
540 if (this.rootFid >= 0 || tMessage instanceof TclunkMessage || isHandshake)
541 {
542
543 this.session.write(tMessage);
544 }
545 else
546 {
547
548 this.unsentMessages.add(tMessage);
549 }
550 }
551
552
553 return tag;
554 }
555
556 /***
557 * Sends a message and blocks until the corresponding reply arrives
558 * @throws StyxException if the message type is not as expected, or if the
559 * connection has not been made.
560 */
561 public StyxMessage send(StyxMessage message) throws StyxException
562 {
563
564 StyxReplyCallback callback = new StyxReplyCallback();
565 this.sendAsync(message, callback);
566
567 StyxMessage reply = callback.getReply();
568 return reply;
569 }
570
571 /***
572 * Called when a reply has arrived from a Styx server
573 */
574 public void messageReceived( ProtocolSession session, Object message )
575 {
576 if (log.isDebugEnabled())
577 {
578 log.debug("RCVD: " + message);
579 }
580 StyxMessage rMessage = (StyxMessage)message;
581
582
583 int tag = rMessage.getTag();
584
585
586 MessagePlusCallback mpc = (MessagePlusCallback)this.msgQueue.remove(new Integer(tag));
587
588
589 if (!(message instanceof RversionMessage))
590 {
591 this.returnTag(tag);
592 }
593
594 if (message instanceof RclunkMessage)
595 {
596 Long lngFid = (Long)this.tClunksPending.remove(new Integer(tag));
597
598 this.returnFid(lngFid.longValue());
599 }
600
601 if (mpc != null && mpc.callback != null)
602 {
603 mpc.callback.gotReply(rMessage, mpc.tMessage);
604 }
605 }
606
607 /***
608 * @return the next unused fid, or -1 if there are none left (this is
609 * extremely unlikely and would require StyxUtils.MAXUINT tags to be in
610 * use - this would probably only happen due to a bug)
611 */
612 public long getFreeFid()
613 {
614 synchronized(this.fidsInUse)
615 {
616 for (long i = 0; i < StyxUtils.MAXUINT; i++)
617 {
618 Long lngFid = new Long(i);
619
620 if (!this.fidsInUse.contains(lngFid))
621 {
622
623 this.fidsInUse.add(lngFid);
624 return i;
625 }
626 }
627 return -1;
628 }
629 }
630
631 /***
632 * @return the next unused tag, or -1 if there are none left (this is
633 * extremely unlikely and would require 65535 tags to be in simultaneous
634 * use - this would probably only happen due to a bug)
635 * @todo: this works by searching a Vector of used tags. Would this be
636 * quicker if it were done some other way?
637 */
638 private int getFreeTag()
639 {
640 synchronized(this.tagsInUse)
641 {
642 for (int i = 0; i < StyxUtils.MAXUSHORT; i++)
643 {
644
645 if (!this.tagsInUse.contains(new Integer(i)))
646 {
647
648 this.tagsInUse.add(new Integer(i));
649 return i;
650 }
651 }
652 return -1;
653 }
654 }
655
656 /***
657 * Returns the given fid back to the pool
658 */
659 public void returnFid(long fid)
660 {
661
662 this.fidsInUse.remove(new Long(fid));
663 }
664
665 /***
666 * Returns the given tag back to the pool
667 */
668 private void returnTag(int tag)
669 {
670
671 this.tagsInUse.remove(new Integer(tag));
672 }
673
674 /***
675 * @return the fid associated with the root of the remote server
676 */
677 public long getRootFid()
678 {
679 return this.rootFid;
680 }
681
682 /***
683 * @return the maximum size of message that can be sent on this connection
684 */
685 public int getMaxMessageSize()
686 {
687 return this.maxMessageSize;
688 }
689
690 /***
691 * Sets the maximum size of message that can be sent on this connection
692 */
693 public void setMaxMessageSize(int maxMessageSize)
694 {
695 this.maxMessageSize = maxMessageSize;
696 }
697
698 /***
699 * Invoked when the session is created. Initialize default socket
700 * parameters and user-defined attributes here.
701 */
702 public void sessionCreated( ProtocolSession session ) throws Exception
703 {
704
705 log.debug("Connection created");
706 }
707
708 /***
709 * Called when the socket connection to the remote server has been established
710 */
711 public void sessionOpened( ProtocolSession session )
712 {
713 this.connected = true;
714 log.debug("Connection established.");
715 TversionMessage tVerMsg = new TversionMessage(this.maxMessageSizeRequest);
716 this.sendAsync(tVerMsg, new TversionCallback(), true);
717 }
718
719 private class TversionCallback extends MessageCallback
720 {
721 public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
722 {
723 RversionMessage rVerMsg = (RversionMessage)rMessage;
724 maxMessageSize = (int)rVerMsg.getMaxMessageSize();
725 if (username.equalsIgnoreCase(""))
726 {
727 log.debug("Unauthenticated connection");
728 TattachMessage tAttMsg = new TattachMessage(getFreeFid(), username);
729 sendAsync(tAttMsg, new TattachCallback(), true);
730 }
731 else
732 {
733 log.debug("Authenticated connection");
734 TauthMessage tAuthMsg = new TauthMessage(getFreeFid(), username, "");
735 sendAsync(tAuthMsg, new TauthCallback(), true);
736 }
737 }
738 public void error(String message, StyxMessage tMessage)
739 {
740 fireStyxConnectionError(new Throwable(message));
741 }
742 }
743
744 private class TauthCallback extends MessageCallback
745 {
746 public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
747 {
748 TauthMessage tAuthMsg = (TauthMessage)tMessage;
749 RauthMessage rAuthMsg = (RauthMessage)rMessage;
750
751
752 CStyxFile authFile = getFile("/auth");
753 authFile.setFid(tAuthMsg.getAfid());
754 authFile.setQid(rAuthMsg.getAQid());
755 authFile.writeAsync(password, 0, new AuthFileWriteCallback(tAuthMsg.getAfid()));
756 }
757 public void error(String message, StyxMessage tMessage)
758 {
759 TauthMessage tAuthMsg = (TauthMessage)tMessage;
760 returnFid(tAuthMsg.getAfid());
761 fireStyxConnectionError(new Throwable(message));
762 }
763 }
764
765 /***
766 * This callback is called when we have written the password to the
767 * auth file
768 */
769 private class AuthFileWriteCallback extends MessageCallback
770 {
771 private long afid;
772 public AuthFileWriteCallback(long afid)
773 {
774 this.afid = afid;
775 }
776 public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
777 {
778
779 TattachMessage tAttMsg =
780 new TattachMessage(getFreeFid(), this.afid, username, "");
781 sendAsync(tAttMsg, new TattachCallback(), true);
782 }
783 public void error(String message, StyxMessage tMessage)
784 {
785 fireStyxConnectionError(new Throwable(message));
786 }
787 }
788
789 private class TattachCallback extends MessageCallback
790 {
791 public void replyArrived(StyxMessage rMessage, StyxMessage tMessage)
792 {
793 TattachMessage tAttMsg = (TattachMessage)tMessage;
794 rootFid = tAttMsg.getFid();
795 fireStyxConnectionReady();
796 }
797 public void error(String message, StyxMessage tMessage)
798 {
799 TattachMessage tAttMsg = (TattachMessage)tMessage;
800 returnFid(tAttMsg.getFid());
801 fireStyxConnectionError(new Throwable(message));
802 }
803 }
804
805 /***
806 * Called when the connection is closed
807 */
808 public void sessionClosed( ProtocolSession session )
809 {
810 if (this.connected)
811 {
812 log.debug("Connection closed.");
813 }
814 this.connected = false;
815 this.connecting = false;
816 this.rootFid = -1;
817
818
819 this.ioThreadPoolFilter.stop();
820 this.protocolThreadPoolFilter.stop();
821
822
823 StyxMessageDecoder decoder = (StyxMessageDecoder)this.session.getDecoder();
824 decoder.release();
825
826
827
828
829 synchronized(numSessions)
830 {
831 numSessions = new Integer(numSessions.intValue() - 1);
832 if (numSessions.equals(new Integer(0)));
833 {
834
835
836
837
838 }
839 }
840 this.fireStyxConnectionClosed();
841 }
842
843 /***
844 * Called when an exception is caught by MINA; fires the connectError()
845 * event on all registered listeners and closes the connection.
846 */
847 public void exceptionCaught( ProtocolSession session, Throwable cause )
848 {
849 this.fireStyxConnectionError(cause);
850 }
851
852 /***
853 * Gets a CStyxFile with the given path. Note that each call to this method
854 * will return a new object, even if the path is identical.
855 * This does not open, create or check the existence of the file: no
856 * messages are sent to the server in this method so this will never block.
857 * @throws InvalidPathException if the given path is not valid and absolute
858 * (only catch this runtime exception if it is likely that the path could be
859 * invalid, e.g. when the path is being input by a user)
860 */
861 public CStyxFile getFile(String path)
862 {
863 if (!path.startsWith("/"))
864 {
865 path = "/" + path;
866 }
867 return new CStyxFile(this, path);
868 }
869
870 /***
871 * Opens a file on the server, throwing a StyxException if the file can't
872 * be found or opened in the given mode. Blocks until the file is open.
873 * @param path The path of the file relative to the server root.
874 * @param mode Integer representing the mode - see the constants in StyxUtils.
875 * For example, to open a file for reading, use StyxUtils.OREAD. To open a
876 * file for writing with truncation use StyxUtils.OWRITE | StyxUtils.OTRUNC.
877 * @return The file that has just been opened
878 */
879 public CStyxFile openFile(String path, int mode) throws StyxException
880 {
881 CStyxFile file = this.getFile(path);
882 file.open(mode);
883 return file;
884 }
885
886 /***
887 * Gets the entire contents of a file as a single string. Opens the file
888 * for reading, reads the contents and closes the file.
889 * This should only be used for relatively short files that can sensibly
890 * fit into a String - do not use for large files as not only will this
891 * method block until the file is read, you may run into memory problems.
892 * @param path The path of the file relative to the server root.
893 */
894 public String getContents(String path) throws StyxException
895 {
896 return this.getFile(path).getContents();
897 }
898
899 /***
900 * Adds a StyxConnectionListener to this connection. The methods of the
901 * listener will be called when events happen, such as the connection being
902 * ready for messages, the connection being closed, and an error in connection.
903 * If this listener is already registered, this method does nothing.
904 */
905 public void addListener(StyxConnectionListener listener)
906 {
907 synchronized(this.listeners)
908 {
909 if (!this.listeners.contains(listener))
910 {
911 this.listeners.add(listener);
912 }
913 }
914 }
915
916 /***
917 * Removes the given listener; does nothing if the listener has not been
918 * registered with this.addListener().
919 */
920 public void removeListener(StyxConnectionListener listener)
921 {
922 this.listeners.remove(listener);
923 }
924
925 /***
926 * Fired when the connection is ready for traffic (i.e. when a Rattach
927 * message arrives).
928 */
929 private void fireStyxConnectionReady()
930 {
931 log.info("***** CONNECTION OPENED TO " + host + ":" + port
932 + " *****");
933 synchronized(this)
934 {
935
936 this.notifyAll();
937 }
938 synchronized(this.listeners)
939 {
940 for (int i = 0; i < this.listeners.size(); i++)
941 {
942 StyxConnectionListener listener =
943 (StyxConnectionListener)this.listeners.get(i);
944 listener.connectionReady(this);
945 }
946 }
947
948 synchronized(this.unsentMessages)
949 {
950 Iterator it = this.unsentMessages.iterator();
951 while(it.hasNext())
952 {
953 StyxMessage message = (StyxMessage)it.next();
954 log.debug("Sending message now connection is ready");
955 this.session.write(message);
956 it.remove();
957 }
958 }
959 }
960
961 /***
962 * Fired when the connection is closed.
963 */
964 private void fireStyxConnectionClosed()
965 {
966 log.info("***** CONNECTION TO " + this.host + ":" + this.port
967 + " CLOSED *****");
968 synchronized(this.listeners)
969 {
970 for (int i = 0; i < this.listeners.size(); i++)
971 {
972 StyxConnectionListener listener =
973 (StyxConnectionListener)this.listeners.get(i);
974 listener.connectionClosed(this);
975 }
976 }
977 }
978
979 /***
980 * Fired when an error occurs (i.e. an exception is caught by MINA)
981 */
982 private void fireStyxConnectionError(Throwable cause)
983 {
984 this.errMsg = cause.getMessage();
985 if (log.isDebugEnabled())
986 {
987 cause.printStackTrace();
988 }
989 log.error("***** ERROR OCCURRED ON CONNECTION TO " + this.host +
990 ":" + this.port + " (" + cause.getClass().getName() + ":" +
991 this.errMsg + ") *****");
992 synchronized(this)
993 {
994 this.notifyAll();
995 }
996 synchronized(this.listeners)
997 {
998 for (int i = 0; i < this.listeners.size(); i++)
999 {
1000 StyxConnectionListener listener =
1001 (StyxConnectionListener)this.listeners.get(i);
1002 listener.connectionError(this, this.errMsg);
1003 }
1004 }
1005
1006 synchronized(this.unsentMessages)
1007 {
1008 Iterator it = this.unsentMessages.iterator();
1009 while(it.hasNext())
1010 {
1011 StyxMessage tMessage = (StyxMessage)it.next();
1012 int tag = tMessage.getTag();
1013 MessageCallback callback =
1014 (MessageCallback)this.msgQueue.remove(new Integer(tag));
1015 callback.error("Could not send message: " + tMessage, tMessage);
1016 it.remove();
1017 }
1018 }
1019
1020 this.close();
1021 }
1022
1023 /***
1024 * Called by MINA when a message has been sent
1025 */
1026 public void messageSent( ProtocolSession session, Object message )
1027 {
1028 if (log.isDebugEnabled())
1029 {
1030 log.debug("SENT: " + message);
1031 }
1032 }
1033
1034 /***
1035 * Required by the ProtocolHandler interface. Does nothing in this class.
1036 */
1037 public void sessionIdle( ProtocolSession session, IdleStatus status )
1038 {
1039 return;
1040 }
1041 }