~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Open Mash Cross Reference
mash/tcl/net/rendezvous.tcl

Component: ~ [ mash ] ~ [ apps ] ~ [ gsm ] ~ [ lib ] ~ [ otcl ] ~ [ srm ] ~ [ tcl8.3 ] ~ [ tclcl ] ~ [ tk8.3 ] ~ [ tutorials ] ~

  1 # rendezvous.tcl --
  2 #
  3 #       FIXME: This file needs a description here.
  4 #
  5 # Copyright (c) 1998-2002 The Regents of the University of California.
  6 # All rights reserved.
  7 #
  8 # Redistribution and use in source and binary forms, with or without
  9 # modification, are permitted provided that the following conditions are met:
 10 #
 11 # A. Redistributions of source code must retain the above copyright notice,
 12 #    this list of conditions and the following disclaimer.
 13 # B. Redistributions in binary form must reproduce the above copyright notice,
 14 #    this list of conditions and the following disclaimer in the documentation
 15 #    and/or other materials provided with the distribution.
 16 # C. Neither the names of the copyright holders nor the names of its
 17 #    contributors may be used to endorse or promote products derived from this
 18 #    software without specific prior written permission.
 19 #
 20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
 21 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 22 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 23 # ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
 24 # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 25 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 26 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 27 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 28 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 30 #
 31 #  @(#) $Header: /usr/mash/src/repository/mash/mash-1/tcl/net/rendezvous.tcl,v 1.14 2002/02/03 04:28:05 lim Exp $
 32 
 33 
 34 # --------------------------------------------------
 35 # TODO:
 36 # - periodically flush to disk
 37 # - rather than double msg storage (rv and rvm), keep one copy.
 38 # - propogate timeouts to manager then to mngr observers via
 39 #      notify_observers rendez_timeout_TYPE upcalls
 40 # - perform vs intersection alg
 41 # - ...
 42 # --------------------------------------------------
 43 
 44 
 45 import Trace AnnounceListenManager Observer Observable
 46 #Trace on ; Trace add RendezvousManager
 47 
 48 # --------------------------------------------------
 49 # Class RendezvousManager
 50 # --------------------------------------------------
 51 
 52 
 53 # class for managing a set of rendezvous channels
 54 # and the data sent and received on it.  Assumes msgs of the form:
 55 #<br> "type: attr=val attr2=val2 ..." <br>
 56 # and stores them in memory arrays.
 57 #<p>
 58 #Data types:
 59 #<p> rv_() == key: spec of channel; val: Rendezvous objects handling that spec
 60 #<p> rvMsgs_ == list of RVMsg's
 61 #<p>
 62 #
 63 # Observes the individual Rendezvous channels, is observable
 64 # by classes wishing to "register interest" in particular
 65 # types of msgs. They do so by defining a ``rendez_recv_TYPENAME'' method
 66 # for each type of interest.
 67 # To receive all (new, non-refresh) msgs, they should define a
 68 # method named just ``rendez_recv'')
 69 #
 70 #<p> FIXME -- add rendez_timeout notifications, use them in SrvAlloc
 71 Class RendezvousManager -superclass {Observer Observable}
 72 
 73 
 74 #
 75 RendezvousManager public init {{speclist ""}} {
 76     Trc $class "--> ${class}::$proc"
 77     $self next
 78     $self instvar scopes_ rvMsgs_
 79     set rvMsgs_ ""
 80 
 81     if {$speclist == ""} {set speclist [$self get_option rendez]}
 82     if {$speclist == ""} {
 83         # none specified, use global spec (and refine from there)
 84         set s 224.2.127.253/1202/32
 85         $self add_spec $s
 86         set scopes_($s) "global"
 87         #puts "Rendevous messages sent/received on default global rv: $s"
 88     } else {
 89         foreach i [split $speclist ,] {
 90             $self add_spec $i
 91         }
 92         #puts "Rendevous messages sent/received on $speclist"
 93     }
 94 
 95 }
 96 
 97 
 98 #
 99 RendezvousManager public add_spec {s} {
100     Trc $class "--> ${class}::$proc"
101     $self instvar rv_ local_rv_
102 
103     if [info exists rv_($s)] {return}
104 
105     set r [new Rendezvous $s]
106     set rv_($s) $r
107     $r attach_observer $self
108     if ![info exists local_rv_] {
109         set local_rv_ $s
110     }
111 }
112 
113 #
114 RendezvousManager public rm_spec {s} {
115     Trc $class "--> ${class}::$proc"
116     $self instvar rv_
117     if {[array names rv_ $s] != ""} {
118         $rv_($s) detach_observer $self
119         delete $rv_($s)
120         unset rv_($s)
121     } else {
122         puts "Error: attempted to remove bad spec `$s'"
123     }
124 }
125 
126 
127 #
128 RendezvousManager public get_specs {} {
129     $self instvar rv_
130     return [array names rv_]
131 }
132 
133 # return the spec of the "most-refined/smallest" virtual scope
134 # discovered so far. (This is the "local" or "current" vs.)
135 #
136 RendezvousManager public get_local_rv {} {
137     Trc $class "--> ${class}::$proc"
138     $self instvar local_rv_
139     return $local_rv_
140 }
141 
142 # accept a query for a substring of a rendezvous msg
143 #<p>
144 # `queryString' is of the form
145 # <br> dog & !cat
146 # <br> or
147 # <br> name=janet | !type=mother | grr=hmm
148 # <br> etc....
149 # <br> i.e., special characters are &, |, and !
150 # <br> currently parentheses are NOT supported, so only all & or
151 # all | clauses make sense
152 #
153 #<br>FIXME add parens
154 #<br>FIXME this is really unoptimized for now .. linear in both
155 # number of search terms and number of entries
156 #
157 RendezvousManager public query {queryString} {
158     Trc $class "--> ${class}::$proc"
159 
160     set msgs [$self query_msgs $queryString]
161 
162     if {$msgs == ""} {
163         return ""
164     } else {
165         #FIXME only replies with *first* matching response
166         #foreach i $indicies {lappend reply $rvData_($i)}
167         return [lindex $msgs 0]
168     }
169 }
170 
171 
172 # query as in RendezvousManager::query, but return metadata
173 # instead
174 #
175 RendezvousManager public query_metadata {queryString} {
176     Trc $class "--> ${class}::$proc"
177 
178     set msgs [$self query_msgs $queryString]
179 
180     if {$msgs == ""} {
181         return ""
182     } else {
183         #FIXME only reply with *one* response (first one)
184         return [lindex $msgs 0]
185     }
186 }
187 
188 #
189 RendezvousManager public query_msgs {queryString} {
190     Trc $class "--> ${class}::$proc $queryString"
191 
192     set and [string match "* & *" $queryString]
193     set or [string match "* | *" $queryString]
194     if {$and && $or } {
195         puts "queries with both `and' (&) and `or' (|) \
196                 is not currently supported... returning {}."
197         return ""
198     }
199     if {$and} {
200         # compute and (&) clause
201         set msgs ""
202         set q [split $queryString "&"]
203         foreach field $q {
204             set field [string trim $field]
205             set msgs [$self field_query $field $msgs]
206             if {$msgs==""} {return ""}
207         }
208     } else {
209         # compute or (|) clause or single-field clause
210         set msgs ""
211         set q [split $queryString "|"]
212         foreach field $q {
213             set field [string trim $field]
214             foreach msg [$self field_query $field] {
215                 if {$msg != ""} {lappend msgs $msg}
216             }
217         }
218         # pull out non-unique indicies due to multiple matches
219         set msgs [$self uniq $msgs]
220     }
221     Trc $class "matching msgs = $msgs"
222     return $msgs
223 }
224 
225 
226 # returns a list of RVMsgs matching the query for
227 # `qField'.  If `msgList' is specified, only those msgs
228 # are searched (allowing this to be iteratively refined for AND
229 # clauses)
230 #
231 RendezvousManager private field_query {qField {msgList ""}} {
232     Trc $class "--> ${class}::$proc $qField $msgList"
233     $self instvar rvMsgs_
234 
235     if {$msgList == ""} {
236         set msgList $rvMsgs_
237     }
238     set results ""
239 
240     # do `not' query by performing normal query,
241     # then inverting the results
242     set is_not_query 0
243     if {[string match !* $qField]} {
244         set qField [string range $qField 1 end]
245         set is_not_query 1
246     }
247 
248     foreach m $msgList {
249         if {[string first "$qField" [$m get_msg]] != -1} {
250             lappend results $m
251         }
252     }
253 
254     if {$is_not_query} {
255         set newResults ""
256         foreach m $msgList {
257             if {[lsearch -exact $results $m] == -1} {
258                 lappend newResults $m
259             }
260         }
261         Trace $class "-- field_query pre-NOT'd results: $results"
262         set results $newResults
263     }
264     Trc $class "-- field_query results: $results"
265     return $results
266 }
267 
268 
269 # removes duplicates from input list `l' and return the new list
270 #
271 RendezvousManager private uniq {l} {
272     Trc $class "--> ${class}::$proc"
273     set uniqL ""
274     foreach i $l {
275         if {[lsearch -exact $i $uniqL] == -1} {
276             lappend uniqL $i
277         }
278     }
279     return $uniqL
280 }
281 
282 
283 # called when a msg `data' is received on channel `rspec';
284 # stores it into the rvData_ array and updates the rvMetadata_
285 # array.  If the msg was received before, only update the meta-data.
286 # Splits all received msgs into individual lines and treats each
287 # line as a separate msg.
288 #
289 RendezvousManager public recv_msg {rspec addr port data size} {
290     Trc $class "--> ${class}::$proc $rspec $data"
291     $self instvar rv_ rvMsgs_
292     #set rv_obj $rv_($rspec)
293 
294     foreach d [split $data \n] {
295         set d [string trim $d]
296         if {$d == ""} {continue}
297 
298         set newrvmsg [new RVMsg $data $rspec $addr/$port]
299         $newrvmsg update_meta_field "time=[clock seconds]"
300 
301         set type [$newrvmsg get_type]
302 
303         # a repeat msg?
304         set dupmsg -1
305         foreach rv $rvMsgs_ {
306             if {[$newrvmsg get_msg] == [$rv get_msg]} {
307                 set dupmsg $rv
308             }
309         }
310 
311         # if new, store it
312         if {$dupmsg == -1} {
313             # check for things that shouldn't be cached
314             switch $type {
315                 "query" {set cache_it 0}
316                 default {set cache_it 1}
317             }
318             if $cache_it {
319                 lappend rvMsgs_ $newrvmsg
320             }
321 
322             # notify ourself of "scope" msgs
323             if {$type == "scope"} {
324                 $self recv_scope $newrvmsg
325             }
326         } else {
327             $dupmsg update_meta_field "time=[clock seconds]"
328             delete $newrvmsg
329             set newrvmsg $dupmsg
330         }
331 
332         # notify interested observers
333         $self notify_observers rendez_recv $newrvmsg
334         $self notify_observers rendez_recv_$type $newrvmsg
335     }
336 }
337 
338 # Handle a msg with type "scope"
339 #  -- start listening on new scope
340 #  -- update local_rv_ if necessary/possible
341 #  -- perform intersection operations [FIXME intersection not yet implemented]
342 #
343 RendezvousManager private recv_scope {rv_msg} {
344     Trc $class "--> ${class}::$proc"
345     $self instvar local_rv_ scopes_
346     set sname [$rv_msg get_field name]
347     set sspec [$rv_msg get_field spec]
348     if {$sname == "" || $sspec == ""} {
349         puts "Improperly formatted scope msg: [$rv_msg get_msg]"
350     }
351     set scopes_($sspec) $sname
352     $self add_spec $sspec
353     if {$local_rv_ == [$rv_msg rspec]} {
354         set local_rv_ $sspec
355     }
356 }
357 
358 # returns the name associated with `spec'. If it is not
359 # known, returns the `spec' unchanged.
360 #
361 RendezvousManager public get_spec_name {spec} {
362     Trc $class "--> ${class}::$proc"
363     set r [$self query "scope: & spec=$spec"]
364     set n [$r get_field name]
365     if {$n == ""} {return $spec}
366     return $n
367 }
368 
369 
370 
371 # start announcing `msg' on rv with spec `spec'. If `spec'=={},
372 # announce on the local/current rv.
373 RendezvousManager public start {spec msg} {
374     Trc $class "--> ${class}::$proc `$spec' announcing `$msg'"
375     $self instvar rv_
376     if {$spec == ""} {set spec [$self get_local_rv]}
377     if [info exists rv_($spec)] {
378         $rv_($spec) start $msg
379     } else {
380         puts "Error: not connected to `$spec': won't send msg to that addr."
381     }
382 }
383 
384 # stop announcing `msg' on rv with spec `spec'. If `spec'=={},
385 # stop the msgs on the current/local rv.
386 RendezvousManager public stop {spec msg} {
387     Trc $class "--> ${class}::$proc `$spec' stop announcing `$msg'"
388     $self instvar rv_
389     if {$spec == ""} {set spec [$self get_local_rv]}
390     if [info exists rv_($spec)] {
391         $rv_($spec) stop $msg
392     } else {
393         puts "Error: not connected to `$spec': can't stop msgs there."
394     }
395 }
396 
397 # --------------------------------------------------
398 # Class Rendezvous
399 # --------------------------------------------------
400 
401 import Timer/Adaptive/ConstBW
402 
403 # a rendezvous channel monitor that listens on *one* addr/port
404 # and maintains info about that channel only. Propogates msgs upward
405 # to a interested observers such as a RendezvousManager
406 #
407 Class Rendezvous -superclass {Observable AnnounceListenManager}
408 
409 
410 #
411 Rendezvous public init {spec} {
412     Trc $class "--> ${class}::$proc"
413     eval [list $self] next $spec
414     $self instvar msgs_ spec_ snet_ rnet_
415     set spec_ $spec
416     #FIXME
417     if {$snet_ != ""} {$self ttl 16}
418     $self set_timeout 600
419     set msgs_ ""
420 
421     set t [new Timer/Adaptive/ConstBW 10000]
422     $t randomize
423     $self timer $t
424 
425     $self process_timeouts
426 }
427 
428 # receive rendezvous ads, store
429 Rendezvous private recv_announcement {addr port data size} {
430     Trc $class "--> ${class}::$proc $data"
431     $self instvar spec_
432 
433     set t [$self get_timer]
434     $t sample_size $size
435 
436     foreach msg [split $data \n] {
437         $self update_msg $msg
438         $self notify_observers recv_msg $spec_ $addr $port $msg $size
439     }
440 }
441 
442 # insert/update new msg
443 Rendezvous private update_msg {newMsg} {
444     Trc $class "--> ${class}::$proc"
445     $self instvar msgs_ msgtimestamps_
446 
447     set msgtimestamps_($newMsg) [clock seconds]
448     if {[lsearch  $msgs_ $newMsg] != -1} {
449         lappend $msgs_ $newMsg
450         [$self get_timer] incr_nsrcs
451     }
452 }
453 
454 # clean out old msgs through a timout process
455 Rendezvous private process_timeouts {} {
456     Trc $class "--> ${class}::$proc"
457     $self instvar msgs_ msgtimestamps_ timeout_
458     if {$timeout_ <= 0} {
459         return
460     }
461     set currTime [clock seconds]
462     foreach i $msgs_ {
463         set t $msgtimestamps_($i)
464         if {[expr $currTime - $t] > $timeout_} {
465             puts "Rendezvous: timing out msg $i"
466             set ind [lindex $i $msgs_]
467             set msgs_ [lreplace $msgs_ $ind $ind]
468             unset msgtimestamps_($i)
469             [$self get_timer] incr_nsrcs -1
470         }
471     }
472     # catch in case we're deleted
473     after 5000 "catch {$self process_timeouts}"
474 }
475 
476 # set timeout ; <= 0 means no timeout
477 Rendezvous public set_timeout {seconds} {
478     Trc $class "--> ${class}::$proc"
479     $self instvar timeout_
480     set timeout_ $seconds
481 }
482 
483 
484 
485 
486 
487 # --------------------------------------------------
488 # Class RVMsg
489 # --------------------------------------------------
490 
491 # a rendezvous msg that was pulled off a rendezvous channel
492 #
493 Class RVMsg
494 
495 # create RVMsg from text string <i>msg</i>, received on rv channel
496 # <i>rspec</i>, sent from sender <i>sender_spec</i>
497 RVMsg public init {msg rspec sender_spec} {
498     $self instvar msg_ rspec_ sender_spec_ metadata_
499     set msg_ $msg
500     set rspec_ $rspec
501     set sender_spec_ $sender_spec
502     set metadata_ "time=[clock seconds]"
503 }
504 
505 # grabs inital colon-terminated portion of this msg's text
506 # string, by convention the the "type" of the msg.
507 # <p>
508 # i.e. for msg
509 # "will-provide: mash-object=RemoteVicApplication spec=Z ctrlspec=Y"
510 # <p>
511 # <br> `get_type' <br> returns "will-provide"
512 #
513 RVMsg public get_type {} {
514     Trc $class "--> ${class}::$proc"
515     $self instvar msg_
516     set t [string trim [lindex $msg_ 0]]
517     set lst [split $t :]
518     if {[lindex $lst end] == ""} {
519         return [lindex [split $t :] 0]
520     }
521     return ""
522 }
523 
524 #
525 RVMsg public fields {} {
526     Trc $class "--> ${class}::$proc"
527     $self instvar msg_
528 
529     set flist ""
530     set m [lrange $msg_ 1 end]
531     foreach i $m {
532         lappend flist [lindex [split $i =] 0]
533     }
534     return $flist
535 }
536 
537 # gets the value of field 'field' in this msg
538 # <p>
539 # i.e. for msg text:
540 # "will-provide: mash-object=RemoteVicApplication spec=Z ctrlspec=Y"
541 # <p>
542 # <br> `get_field ctrlspec' <br> returns Y
543 # <br> `get_field spec' <br> returns Z
544 # <br> `get_field yomamaspec' <br> returns {}
545 #
546 RVMsg public get_field {field} {
547     Trc $class "--> ${class}::$proc"
548     $self instvar msg_
549     set i [lsearch $msg_ "*$field=*"]
550     if {$i == -1} {
551         return ""
552     } else {
553         set attVal [lindex $msg_ $i]
554         set idx [string first = $attVal]
555         return [string range $attVal [expr $idx+1] end]
556     }
557 }
558 
559 #
560 RVMsg public has_field {f} {
561     Trc $class "--> ${class}::$proc"
562     $self instvar msg_
563     set i [lsearch $msg_ "*$field=*"]
564     if {$i == -1} {
565         return 0
566     } else {
567         return 1
568     }
569 }
570 
571 #
572 RVMsg public get_msg {} {
573     $self instvar msg_
574     return $msg_
575 }
576 
577 #
578 RVMsg public rspec {} {
579     $self instvar rspec_
580     return $rspec_
581 }
582 
583 #
584 RVMsg public sender_spec {} {
585     $self instvar sender_spec_
586     return $sender_spec_
587 }
588 
589 #
590 RVMsg public sender_addr {} {
591     $self instvar sender_spec_
592     return [lindex [split $sender_spec_ /] 0]
593 }
594 
595 #
596 RVMsg public sender_port {} {
597     $self instvar sender_spec_
598     return [lindex [split $sender_spec_ /] 1]
599 }
600 
601 #
602 RVMsg public get_metadata {} {
603     $self instvar metadata_
604     return $metadata_
605 }
606 
607 #
608 RVMsg public set_metadata {m} {
609     $self instvar metadata_
610     set metsdata_ $m
611 }
612 
613 #
614 RVMsg public update_meta_fields {fields} {
615     foreach attval $fields {
616         $self update_meta_field $attval
617     }
618 }
619 
620 #
621 RVMsg public update_meta_field {m} {
622     $self instvar metadata_
623     set f [lindex [split $m =] 0]
624     set i [lsearch $metadata_ "*$f=*"]
625     if {$i == -1} {
626         set metsdata_ "$metadata_ $m"
627     } else {
628         set metadata_ [lreplace $metadata_ $i $i $m]
629     }
630 }
631 
632 #
633 RVMsg public rm_meta_field {f} {
634     $self instvar metadata_
635     set i [lsearch $metadata_ "*$f=*"]
636      if {$i == -1} {
637          return 0
638     } else {
639         set metadata_ [lreplace $metadata_ $i $i]
640         return 1
641     }
642 }
643 
644 #
645 RVMsg public get_meta_field {f} {
646     $self instvar metadata_
647     set i [lsearch $metadata_ "*$f=*"]
648     if {$i == -1} {
649         return ""
650     } else {
651         return [lindex [split [lindex $metadata_ $i] =] 1]
652     }
653 
654 }
655 
656 #
657 RVMsg public has_meta_field {f} {
658     Trc $class "--> ${class}::$proc"
659     $self instvar metadata_
660     set i [lsearch $metadata_ "*$field=*"]
661     if {$i == -1} {
662         return 0
663     } else {
664         return 1
665     }
666 }
667 
668 # dump string of all data about this msg
669 RVMsg private data {} {
670     $self instvar msg_ rspec_ sender_spec_
671     return "$rspec_ $sender_spec_ $msg_"
672 }
673 
674 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.