View Javadoc

1   /*
2    * Copyright (c) 2005 The University of Reading
3    * All rights reserved.
4    *
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions
7    * are met:
8    * 1. Redistributions of source code must retain the above copyright
9    *    notice, this list of conditions and the following disclaimer.
10   * 2. Redistributions in binary form must reproduce the above copyright
11   *    notice, this list of conditions and the following disclaimer in the
12   *    documentation and/or other materials provided with the distribution.
13   * 3. Neither the name of the University of Reading, nor the names of the
14   *    authors or contributors may be used to endorse or promote products
15   *    derived from this software without specific prior written permission.
16   * 
17   * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18   * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20   * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21   * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24   * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26   * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27   */
28  
29  package uk.ac.rdg.resc.jstyx.server;
30  
31  import org.apache.mina.common.ByteBuffer;
32  
33  import java.util.Hashtable;
34  import java.util.Vector;
35  import java.util.Enumeration;
36  import java.util.Iterator;
37  
38  import uk.ac.rdg.resc.jstyx.StyxException;
39  import uk.ac.rdg.resc.jstyx.types.ULong;
40  import uk.ac.rdg.resc.jstyx.server.StyxSessionState;
41  import uk.ac.rdg.resc.jstyx.messages.RerrorMessage;
42  
43  /***
44   * A wrapper for a StyxFile that implements asynchronous behaviour: the first
45   * read from a given client returns the file's data as normal. If the file is not
46   * closed, subsequent reads from the same client block until the file content is
47   * changed.
48   *
49   * @author Jon Blower
50   * $Revision: 609 $
51   * $Date: 2006-03-31 18:09:42 +0100 (Fri, 31 Mar 2006) $
52   * $Log$
53   * Revision 1.13  2005/11/14 21:32:30  jonblower
54   * Got SGSRun working for SC2005 demo
55   *
56   * Revision 1.12  2005/11/04 19:33:41  jonblower
57   * Changed contentsChanged() to fileContentsChanged() in StyxFileChangeListener
58   *
59   * Revision 1.11  2005/11/04 09:12:05  jonblower
60   * Made baseFile a protected field (instead of private)
61   *
62   * Revision 1.10  2005/09/08 07:08:59  jonblower
63   * Removed "String user" from list of parameters to StyxFile.write()
64   *
65   * Revision 1.9  2005/08/10 18:35:28  jonblower
66   * Added simple test main() function
67   *
68   * Revision 1.8  2005/06/20 07:17:35  jonblower
69   * Wrapped SGSParamFile as AsyncStyxFile
70   *
71   * Revision 1.7  2005/05/11 10:33:50  jonblower
72   * Implemented MonitoredFileOnDisk.java
73   *
74   * Revision 1.6  2005/05/10 08:02:18  jonblower
75   * Changes related to implementing MonitoredFileOnDisk
76   *
77   * Revision 1.5  2005/05/09 07:10:37  jonblower
78   * Minor changes
79   *
80   * Revision 1.4  2005/03/24 09:48:31  jonblower
81   * Changed 'count' from long to int throughout for reading and writing
82   *
83   * Revision 1.3  2005/03/16 17:56:23  jonblower
84   * Replaced use of java.nio.ByteBuffer with MINA's ByteBuffer to minimise copying of buffers
85   *
86   * Revision 1.2  2005/03/11 14:02:16  jonblower
87   * Merged MINA-Test_20059309 into main line of development
88   *
89   * Revision 1.1.1.1.2.1  2005/03/09 19:44:18  jonblower
90   * Changes concerned with migration to MINA
91   *
92   * Revision 1.1.1.1  2005/02/16 18:58:31  jonblower
93   * Initial import
94   *
95   */
96  public class AsyncStyxFile extends StyxFile implements StyxFileChangeListener
97  {
98      
99      protected StyxFile baseFile;       // The file to wrap
100     private Hashtable knownClients;  // Clients that have connected to this file before
101     private Vector clientQueue;      // Clients that are awaiting a reply
102     private long minReplyInterval;   // The minimum amount of time in ms between replies to the same client
103                                      // (prevents loading of server in case of rapidly-changing
104                                      // data)
105     
106     /***
107      * Creates a new AsyncStyxFile with the same name as the underlying file.
108      * (Note: this must be placed in a different directory from that of the
109      * underlying file to prevent name conflicts.
110      */
111     public AsyncStyxFile(StyxFile file) throws StyxException
112     {
113         this(file, file.getName());
114     }
115     
116     public AsyncStyxFile(StyxFile file, String name) throws StyxException
117     {
118         this(file, name, 0666);
119     }
120     
121     public AsyncStyxFile(StyxFile file, String name, int permissions) throws StyxException
122     {
123         this(file, name, permissions, false, false);
124     }
125     
126     public AsyncStyxFile(StyxFile file, String name, int permissions,
127         boolean isAppendOnly, boolean isExclusive)
128         throws StyxException
129     {
130         super(name, permissions, isAppendOnly, isExclusive);
131         this.baseFile = file;
132         this.knownClients = new Hashtable();
133         this.clientQueue = new Vector();
134         this.minReplyInterval = 0;
135         // Register our interest in changes to the underlying StyxFile
136         this.baseFile.addChangeListener(this);
137     }
138     
139     /***
140      * Sets a minimum reply interval, i.e. the minimum amount of time that must
141      * elapse between successive replies to the same client. This is most useful
142      * when using this AsyncStyxFile to provide notification of a very
143      * rapidly-changing quantity as it can save a lot of unnecessary messages
144      * being sent. (This is set to zero when the AsyncStyxFile is first created)
145      */
146     public void setMinReplyInterval(float seconds)
147     {
148         this.minReplyInterval = new Float(seconds * 1000).longValue();
149     }
150     
151     public synchronized void read(StyxFileClient client, long offset,
152         int count, int tag) throws StyxException
153     {
154         // Check to see if this client has connected before
155         ClientInfo cinfo = (ClientInfo)this.knownClients.get(client);
156         if (cinfo == null)
157         {
158             // this client hasn't connected before. Reply immediately.
159             this.replyClient(cinfo, client, offset, count, tag);
160         }
161         else
162         {
163             long now = System.currentTimeMillis();
164             // This client has read from this file before.  If the client is 
165             // requesting data from an offset greater than when it last read
166             // this file, return the data and update the offset
167             if (offset > cinfo.offset)
168             {
169                 this.replyClient(cinfo, client, offset, count, tag);
170             }
171             else
172             {
173                 // The client is requesting data that is has received already.
174                 // Check to see if the file has changed since the last read and
175                 // if the requisite amount of time has elapsed since the last read
176                 long timeSinceLastReply = now - cinfo.timeLastReply;
177                 if (cinfo.versionLastRead != this.getVersion() 
178                     && timeSinceLastReply >= this.minReplyInterval)
179                 {
180                     this.replyClient(cinfo, client, offset, count, tag);
181                 }
182                 else
183                 {
184                     // File has not changed since the last read, or the required
185                     // amount of time has not passed since the last read. Add the client to
186                     // the queue of clients awaiting reply, making sure that the
187                     // tag is set correctly
188                     cinfo.offset = offset;
189                     cinfo.count = count;
190                     cinfo.tag = tag;
191                     this.clientQueue.add(cinfo);
192                 }
193             }
194         }
195     }
196     
197     /***
198      * This simply calls write() in the contained StyxFile object. This will
199      * cause the contentsChanged() method to be called, notifying any waiting
200      * clients of the change to the file
201      */
202     public synchronized void write(StyxFileClient client, long offset,
203         int count, ByteBuffer data, boolean truncate, int tag)
204         throws StyxException
205     {
206         this.baseFile.write(client, offset, count, data, truncate, tag);
207     }
208     
209     /***
210      * Called to notify that the underlying data have changed.
211      * @param force If this is true, clients will be notified of changes to 
212      * the underlying data, irrespective of how long they have waited.
213      */
214     public void fileContentsChanged(boolean force)
215     {
216         this.incrementVersion();
217         // Reply to all waiting clients with the file's new data
218         synchronized (this.clientQueue)
219         {
220             Iterator it = this.clientQueue.iterator();
221             while(it.hasNext())
222             {
223                 ClientInfo cinfo = (ClientInfo)it.next();
224                 // Only reply to the client if the requisite amount of time has
225                 // elapsed since the last reply
226                 long now = System.currentTimeMillis();
227                 long timeSinceLastReply = now - cinfo.timeLastReply;
228                 if (force || timeSinceLastReply > this.minReplyInterval)
229                 {
230                     try
231                     {
232                         this.replyClient(cinfo, cinfo.client, cinfo.offset, cinfo.count, cinfo.tag);
233                     }
234                     catch(StyxException se)
235                     {
236                         // This exception is thrown if there was an error reading
237                         // the underlying StyxFile.
238                         StyxServerProtocolHandler.reply(cinfo.client.getSession(),
239                             new RerrorMessage(se.getMessage()), cinfo.tag);
240                     }
241                     it.remove();
242                 }
243             }
244         }
245         // Now the clientQueue is empty        
246     }
247     
248     /***
249      * Gets the StyxFile that is wrapped by this AsyncStyxFile
250      */
251     public StyxFile getBaseFile()
252     {
253         return this.baseFile;
254     }
255     
256     /***
257      * @return the length of the StyxFile that is wrapped by this AsyncStyxFile
258      */
259     public ULong getLength()
260     {
261         return this.baseFile.getLength();
262     }
263     
264     /***
265      * This is called when the contents of the underlying StyxFile are changed
266      * (required by StyxFileChangeListener interface).
267      */
268     public void fileContentsChanged()
269     {
270         this.fileContentsChanged(false);
271     }
272     
273     /***
274      * Free all resources associated with this file
275      */
276     public synchronized void delete()
277     {
278         super.delete();
279         this.knownClients.clear();
280         this.clientQueue.clear();
281     }
282     
283     /***
284      * Called when a client disconnects from the file. We remove all references
285      * to this client
286      */
287     protected synchronized void clientDisconnected(StyxFileClient client)
288     {
289         this.knownClients.remove(client);
290         synchronized (this.clientQueue)
291         {
292             Enumeration en = this.clientQueue.elements();
293             while(en.hasMoreElements())
294             {
295                 ClientInfo cinfo = (ClientInfo)en.nextElement();
296                 // We're checking for equality of object references - this is OK
297                 // because we're looking for two references to the same object
298                 if (client == cinfo.client)
299                 {
300                     this.knownClients.remove(cinfo);
301                 }
302             }
303         }
304     }
305     
306     /***
307      * Replies to a client and sets the fields of the ClientInfo
308      * @throws StyxException if there was an error reading the underlying StyxFile
309      */
310     private void replyClient(ClientInfo cinfo, StyxFileClient client, long offset,
311         int count, int tag) throws StyxException
312     {
313         long now = System.currentTimeMillis();
314         if (cinfo == null)
315         {
316             cinfo = new ClientInfo(client, tag, offset, count, this.getVersion(), now);
317             this.knownClients.put(client, cinfo);
318         }
319         cinfo.offset = offset;
320         cinfo.count = count;
321         cinfo.versionLastRead = this.getVersion();
322         cinfo.timeLastReply = now;
323         cinfo.tag = tag;
324         this.baseFile.read(client, offset, count, tag);
325     }
326     
327     /***
328      * Class representing a client that is waiting for a reply from an
329      * asynchronous read/write
330      */
331     private static class ClientInfo
332     {
333         private StyxFileClient client;
334         private int tag;
335         private long offset; // The offset requested by the client
336         private int count; // The number of bytes requested by the client
337         private long versionLastRead; // the version of the file on the last read
338         private long timeLastReply; // the time when data was last sent to this client
339         
340         private ClientInfo(StyxFileClient client, int tag, long offset, int count,
341             long versionLastRead, long timeLastReply)
342         {
343             this.client = client;
344             this.tag = tag;
345             this.offset = offset;
346             this.count = count;
347             this.versionLastRead = versionLastRead;
348             this.timeLastReply = timeLastReply;
349         }
350     }
351     
352     /***
353      * Simple test function
354      */
355     public static void main(String[] args) throws Exception
356     {
357         AsyncStyxFile asf = new AsyncStyxFile(new InMemoryFile("test"));
358         StyxDirectory root = new StyxDirectory("/");
359         root.addChild(asf);
360         new StyxServer(2911, root).start();
361     }
362     
363 }