~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Open Mash Cross Reference
mash/tcl/degas/degasserver/aglp_gateway.tcl

Component: ~ [ mash ] ~ [ apps ] ~ [ gsm ] ~ [ lib ] ~ [ otcl ] ~ [ srm ] ~ [ tcl8.3 ] ~ [ tclcl ] ~ [ tk8.3 ] ~ [ tutorials ] ~

  1 import AGLP
  2 import MashTimer
  3 import MashSoftState
  4 import MashSoftTable
  5 import TCP/Server/DegasControl
  6 import TCP/Client/DegasControl
  7 import DegasAgent
  8 import DegletService
  9 import RTCPListener
 10 
 11 Class AGLP/Gateway -superclass AGLP
 12 
 13 AGLP/Gateway private generate_wait_time { send_time } {
 14         set t [expr 200*([gettimeofday]-$send_time)]
 15         if {$t < 0} {
 16                 set t [expr [MashRNG integer 500] + 1500]
 17         }
 18         return $t
 19 }
 20 
 21 AGLP/Gateway public init { group mtu } {
 22         $self instvar config_ service_id_ 
 23         $self instvar better_counters_
 24 
 25 #   LOGGING
 26 #       $self instvar log_ load_log_
 27 #       set hostname [lindex [split [info hostname] .] 0]
 28 #       set log_ [open "$hostname.log" w]
 29 #       set load_log_ [open "$hostname.load" w]
 30 #       after idle "$self log_load"
 31 
 32         $self instvar bandwidth_table_ my_latency_table_
 33 
 34         $self next "G" $group $mtu
 35 
 36         set config_(ANNOUNCE_SERVICE_PERIOD) 30000
 37         set config_(MAX_PING_PERIOD) 30000
 38         set config_(STOP_SERVICE_PERIOD) 80000
 39         set config_(PICK_BEST_PERIOD) 5000
 40         set config_(KEEP_SOURCE_PERIOD) 80000
 41         set config_(NO_REPLACE_PERIOD) 120000
 42         set config_(REPLACE_THRESHOLD) 200
 43         set config_(NUM_OF_EVALUATION) 4
 44         set config_(REPLACE_CONSTANT) 1000000
 45         set config_(REMEMBER_SENT_REPLACE_PERIOD) 20000
 46         set config_(REMEMBER_SENT_HANDOFF_PERIOD) 20000
 47 
 48 
 49         set service_id_ 0
 50 
 51         set bandwidth_table_  [new MashSoftTable 60000]
 52         set my_latency_table_ [new MashSoftTable 60000]
 53         set better_counters_  [new MashSoftTable 60000]
 54 
 55 }
 56 
 57 
 58 AGLP/Gateway instproc destroy { } {
 59 #       LOGGING
 60         $self instvar log_
 61         close $log_
 62 }
 63 
 64 
 65 #--------------------------------------------------------------------
 66 # AGLP/Gateway send_xxx
 67 # 
 68 # These functions are probably timer callback that send some message
 69 # to some guys.
 70 #--------------------------------------------------------------------
 71 
 72 AGLP/Gateway private send { msg } {
 73         $self announce "DCP g $msg\n"
 74 }
 75 
 76 AGLP/Gateway private send_offer_service { client_addr pid } {
 77         $self instvar accept_timer_ client_ctrl_tcp_
 78 
 79     set ctrl_port [$self generate_unique_port]
 80     set client_ctrl_tcp_($client_addr,$pid) [new TCP/Server/DegasControl $self ]
 81     $client_ctrl_tcp_($client_addr,$pid) open $ctrl_port
 82 
 83         after 500 "
 84                 $self send \"offer_service $client_addr $pid $ctrl_port\"
 85         "
 86         delete $accept_timer_($client_addr,$pid)
 87         unset accept_timer_($client_addr,$pid)
 88 
 89 }
 90 
 91 AGLP/Gateway private construct_source_list { client_addr port } {
 92         $self instvar addr_ service_ 
 93         $self instvar bandwidth_table_ my_latency_table_
 94         set src_list ""
 95         foreach src [$service_($client_addr,$port) get_sources]  {
 96                 set l  [$my_latency_table_ get $src]
 97                 set bw [$bandwidth_table_ get $src] 
 98                 if {$l != "" && $bw != ""} {
 99                         set src_list [format "$src_list $src %.3f %.1f" $l $bw]
100                 }
101         }
102         set l  [$my_latency_table_ get $client_addr]
103         set bw [$bandwidth_table_ get $client_addr] 
104         if {$l != "" && $bw != ""} {
105                 set src_list [format "$src_list $client_addr %.2f %.2f" $l $bw]
106         }
107         return $src_list
108 }
109 
110 AGLP/Gateway private send_serve { client_addr pid } {
111         $self instvar addr_ node_ service_ just_started_
112         
113         if {![$self is_servicing $client_addr $pid]} {
114                 return
115         }
116 
117 #   We do not construct the source list for the first
118 #   config_(NO_REPLACE_PERIOD).  Because when the server
119 #   first starts, the information collected are incomplete,
120 #   and inaccurate.   During the first NO_REPLACE_PERIOD,
121 #   the variable just_started_ is set to 1.
122 
123         if {$just_started_ == 1} {
124                 set src_list ""
125         } else {
126                 set src_list [$self construct_source_list $client_addr $pid]
127         }
128 
129         set recv_group [$service_($client_addr,$pid) get_recv_session]
130         set send_group [$service_($client_addr,$pid) get_send_session]
131 
132         set count [expr {[llength $src_list]/3}]
133 #   XXX : WEITSANG FOR EXPERIMENT ONLY..
134         set count 0
135 
136         if {$count == 0} {
137                 $self send "serve $client_addr $pid $recv_group $send_group"
138                 return
139         }
140 
141 #   If the serve message is too long, we split it into several small 
142 #   messages.
143 
144         set start 0
145         while {$count > 3} {
146                 set sub_list [lrange $src_list $start [expr $start + 8]]
147                 $self debug "SERVE $sub_list"
148                 $self send "serve $client_addr $pid $recv_group $send_group $sub_list"
149                 incr start 9
150                 set count [expr $count - 3]
151         }
152         set sub_list [lrange $src_list $start end]
153         if {$sub_list != ""} {
154 #               $self debug "SERVE $sub_list"
155                 $self send "serve $client_addr $pid $recv_group $send_group $sub_list"
156         }
157 }
158 
159 
160 AGLP/Gateway private send_handoff_complete { client_addr pid send_to gateway_addr} {
161         $self instvar client_ctrl_tcp_ replace_addr_
162         set ctrl_port [$self generate_unique_port]
163     set client_ctrl_tcp_($client_addr,$pid) [new TCP/Server/DegasControl $self ]
164     $client_ctrl_tcp_($client_addr,$pid) open $ctrl_port
165         after 500 "
166                 $self send \"handoff_complete $client_addr $pid $send_to $gateway_addr $ctrl_port\"
167                 "
168 
169         set replace_addr_($client_addr,$pid) $gateway_addr
170 }
171 
172 
173 AGLP/Gateway private send_replace { client_addr pid gateway_addr score } {
174         $self send "replace $client_addr $pid $gateway_addr $score"
175         $self debug "-- send replace"
176         $self cancel_replace_timer $client_addr $pid $gateway_addr
177 
178         # We do not want to send multiple replace message within a short period
179         # of time, so we remember the fact that we already sent a replace message.
180         # We forget that fact after a few seconds.
181         $self remember_sent_replace $client_addr $pid 
182 }
183 
184 
185 #--------------------------------------------------------------------
186 # AGLP/Gateway remember_sent_replace 
187 # AGLP/Gateway forget_sent_replace 
188 # 
189 # input : client_addr and gateway_addr : you know what these are !
190 # purpose : a convinient interface to a softstate to indicates whether
191 #           we have sent a replace (replace) message recently.
192 #           We remember that after we sent the message, and forget it
193 #           after a period of time
194 #--------------------------------------------------------------------
195 
196 AGLP/Gateway private remember_sent_replace { client_addr pid } {
197         $self instvar sent_replace_  config_
198         set sent_replace_($client_addr,$pid) [new MashSoftState \
199                 1 $config_(REMEMBER_SENT_REPLACE_PERIOD) \
200                 "$self forget_sent_replace $client_addr $pid"]
201 }
202 
203 AGLP/Gateway private forget_sent_replace { client_addr pid } {
204         $self instvar sent_replace_ 
205         delete $sent_replace_($client_addr,$pid) 
206         unset sent_replace_($client_addr,$pid)
207 }
208 
209 
210 AGLP/Gateway private remember_sent_handoff { client_addr pid } {
211         $self instvar sent_handoff_  config_
212         set sent_handoff_($client_addr,$pid) [new MashSoftState \
213                 1 $config_(REMEMBER_SENT_HANDOFF_PERIOD) \
214                 "$self forget_sent_handoff $client_addr $pid"]
215 }
216 
217 AGLP/Gateway private forget_sent_handoff { client_addr pid } {
218         $self instvar sent_handoff_ 
219         delete $sent_handoff_($client_addr,$pid) 
220         unset sent_handoff_($client_addr,$pid)
221 }
222 
223 
224 
225 #--------------------------------------------------------------------
226 # AGLP/Gateway start_service
227 # 
228 # input : client_addr : address of the client to start servicing.
229 # purpose : This is called by recv{} whenever a "client ack_service" 
230 #           message is received.  It starts a periodic timer that 
231 #           annouce it's service for client client_addr, and notify 
232 #           the application to start servicing client.
233 #--------------------------------------------------------------------
234 
235 AGLP/Gateway public start_service { client_addr pid out_session } {
236 
237         #-------------------------------------------------------
238         # If I am already serving this client, ignore the call 
239         #-------------------------------------------------------
240 
241         if {[$self is_servicing $client_addr $pid]} {
242         $self debug "Duplicated service requested for $client_addr/$pid"
243                 return
244         }
245 
246         $self debug "** start servicing $client_addr $pid $out_session"
247         $self instvar config_ service_ service_id_ program_
248 
249 #   we set just_started_ to 1 initially.  And set it to 0
250 #   after NO_REPLACE_PERIOD
251 
252         $self set just_started_ 1
253         after $config_(NO_REPLACE_PERIOD) "$self set just_started_ 0"
254 
255 #   If this is a result of a handoff, 
256 #   We can close the tcp port used to retriving deglet.
257 
258         $self instvar get_deglet_tcp_
259         if {[info exists get_deglet_tcp_($client_addr,$pid)]} {
260                 delete $get_deglet_tcp_($client_addr,$pid)
261                 unset get_deglet_tcp_($client_addr,$pid)
262         }
263 
264 
265         incr service_id_ 
266 
267         #-------------------------------------------------------
268         # Create a new DegletService object.  This object 
269         # maintain all information about a client that is being
270         # served.
271         #-------------------------------------------------------
272 
273         set service_($client_addr,$pid) [new DegletService $client_addr $service_id_]
274         set service $service_($client_addr,$pid)
275         $service set_service_state [new MashSoftState 1 \
276                 $config_(STOP_SERVICE_PERIOD) "$self stop_service $client_addr $pid"]
277 
278         set program [$self get_program $client_addr $pid]
279         set agent [new DegasAgent $self $service $program $out_session]
280         set in_session [$agent get_program_value $program "input_session"]
281 
282         $self instvar client_ctrl_tcp_
283         $client_ctrl_tcp_($client_addr,$pid) set agent_ $agent
284 
285         $service set_input_session  $in_session
286         $service set_output_session $out_session
287         $service set_agent $agent
288 
289         # Starts a timer to periodically announce that I am serving this client
290         $self send_serve $client_addr $pid
291         $self start_service_announcement $client_addr $pid
292 
293         #-------------------------------------------------------
294         # Check if we have any replace message pending, for client_addr.  
295         # If we do, cancel these messages.  If I am already serving this client, 
296         # no need to try replacing others. 
297         #-------------------------------------------------------
298 
299         $self cancel_all_replace_timer $client_addr
300 
301         #-------------------------------------------------------
302         # If this service is a result of a handoff, we delete
303         # our TCP channel that we used to retrieve deglet.
304         #-------------------------------------------------------
305         
306         $self instvar get_deglet_tcp_
307         if [info exists get_deglet_tcp_($client_addr,$pid)] {
308                 delete $get_deglet_tcp_($client_addr,$pid)
309                 unset get_deglet_tcp_($client_addr,$pid)
310         }
311 
312 }
313 
314 
315 AGLP/Gateway private stop_service { client_addr pid } {
316         $self instvar service_
317 
318         $self debug "** stop servicing $client_addr"
319 
320         $self stop_service_announcement $client_addr $pid
321         delete $service_($client_addr,$pid)
322         unset service_($client_addr,$pid)
323 
324         $self instvar client_ctrl_tcp_
325         if {[info exists client_ctrl_tcp_($client_addr,$pid)]} {
326                 puts "closing channel $client_addr,$pid"
327                 delete $client_ctrl_tcp_($client_addr,$pid)
328                 unset client_ctrl_tcp_($client_addr,$pid)
329         }
330 }
331 
332 
333 #-------------------------------------------------------------------------
334 # AGLP/Gateway recv
335 # input : msg - a message received by this agent.
336 # purpose : this is _the_ place where things happen.  We parse the message
337 #           and act according to the type of message we received.
338 #-------------------------------------------------------------------------
339 
340 AGLP/Gateway private recv {addr port msg length} {
341         set magic_word [lindex $msg 0]
342         if {$magic_word != "DCP"} {
343                 return
344         }
345 
346         set from [lindex $msg 1]
347         set type [lindex $msg 2]
348         set rest [lrange $msg 3 end]
349 
350         eval $self recv_$type $addr $port $rest
351 }
352 
353 
354 AGLP/Gateway private collect_latencies { session_spec } {
355         # start a small rtcp agent, to listen to rtcp messages
356         $self instvar node_ rtcp_agents_ application_ collecting_
357 
358         set l [split $session_spec /]
359         set group [lindex $l 0]
360         set port [lindex $l 1]
361         set ttl [lindex $l 2]
362 #   Convert it to control address by adding one to port number
363         set session_spec "$group/[expr $port + 1]/$ttl"
364         
365         if {![info exists rtcp_agents_($group)]} {
366                 set net [new Network]
367                 $net open $group [expr $port + 1] $ttl
368                 set rtcp_agents_($group) [new RTCPListener $self $net] 
369 #               $self debug "COLLECTING LATENCIES for $group : $rtcp_agents_($group)"
370         }
371         if {![info exists collecting_($group)] || $collecting_($group) == 0}  {
372                 # $group refers to the data session. add 1 to mcast addr
373                 # to get control session 
374                 set collecting_($group) 1
375                 $self schedule_stop_collection $group
376         } 
377 }
378 
379 
380 AGLP/Gateway private schedule_stop_collection { group } {
381         $self instvar collecting_timers_  config_
382 
383         set collecting_timers_($group) [new MashTimer "once" \
384                 $config_(MAX_PING_PERIOD) "$self stop_collecting_latencies $group"]
385 }
386 
387 
388 AGLP/Gateway private stop_collecting_latencies { group } {
389         $self instvar collecting_timers_ rtcp_agents_ collecting_ node_
390 
391         if {[info exists collecting_($group)] && $collecting_($group) == 1} {
392                 delete $collecting_timers_($group)
393                 # $group refers to the data session. add 1 to mcast addr
394                 # to get control session 
395                 if {[info exists rtcp_agents_($group)]} {
396                         delete $rtcp_agents_($group)
397                         unset rtcp_agents_($group)
398                 }
399                 set collecting_($group) 0
400         }
401 }
402 
403 
404 #--------------------------------------------------------------------
405 # AGLP/Gateway handoff
406 #
407 # input : client_addr : the address of a client
408 # purpose : handoff the service for this client to a better gateway.
409 #-------------------------------------------------------------------
410 
411 AGLP/Gateway private handoff { client_addr pid } {
412         $self instvar addr_ application_ bandwidth_table_ 
413         $self instvar better_counter_ 
414         $self instvar service_ handoff_tcp_
415         
416         if {[$self is_servicing $client_addr $pid]} {
417                 set service $service_($client_addr,$pid)
418                 set best [$service get_best_gateway]
419                 if {$best != ""} {
420                         set best_score [$service get_best_score]
421                         set recv_group [$service get_recv_session]
422                         set bandwidth [$bandwidth_table_ get $client_addr]
423                         set load 0.1
424 
425                         set ctrl_port [$self generate_unique_port]
426                         set handoff_tcp_($best) [new TCP/Server/DegasControl $self ]
427                         $handoff_tcp_($best) open $ctrl_port
428 
429                         set msg "handoff $client_addr $pid $best $recv_group $bandwidth $load $ctrl_port"
430                         $self send $msg
431                         $self debug "The best guy to serve $client_addr is $best : $best_score"
432 
433                         $self remember_sent_handoff $client_addr $pid
434 
435 #                       global gstats_
436 #                       set gstats_(TOTAL_SCORE) [expr $gstats_(TOTAL_SCORE) + $best_score]
437 
438                         $service reset_best_gateway
439                         $self cancel_pick_best_timer $client_addr $pid
440                 } 
441         } else {
442                 $self debug "UH OH ! no best_gateway defined !"
443         }
444 }
445 
446 
447 #--------------------------------------------------------------------
448 # AGLP/Gateway calc_score
449 #
450 # input : client_addr : the address of a client
451 #         gateway_addr : the address of a gateway
452 # purpose : calcuate our score with respect to replacing the gateway
453 #           as the server for client.
454 #-------------------------------------------------------------------
455 
456 AGLP/Gateway private calc_score { client_addr gateway_addr } {
457         $self instvar addr_ application_ latency_table_ my_latency_table_ bandwidth_table_
458         $self instvar score_
459         set U_self 0
460         set U_curr 0
461         set s ""
462 #   LOG 
463         foreach src [$self get_sources $gateway_addr $client_addr -1] {
464                 set bw [$bandwidth_table_  get $src]
465                 set ml [$my_latency_table_ get $src]
466                 if {[info exists latency_table_($gateway_addr,$src)] && $bw != "" && $ml != ""} {
467                         set s "$s $src"
468                         set d_self $ml
469                         set d_curr $latency_table_($gateway_addr,$src)
470                         set U_self [expr {$U_self + $bw*$d_self}]
471                         set U_curr [expr {$U_curr + $bw*$d_curr}]
472                         $self debug "CALC SCORE $src : $d_curr \t $d_self \t $bw"
473                 } else {
474                         # Do not calc score if we do not have all the info.
475                         return 0
476                 }
477         }
478 
479         set sum [expr {$U_curr - $U_self}]
480 
481         # Smooth out the scores
482         if [info exists score_($client_addr,$gateway_addr)] {
483                 set score_($client_addr,$gateway_addr) [expr 0.1*$sum + 0.9*$score_($client_addr,$gateway_addr)]
484         } else {
485                 set score_($client_addr,$gateway_addr) $sum
486         }
487         # LOG
488         set log_str ""
489         foreach src {128.84.223.144 128.84.223.156 128.84.96.120} {
490                 set bw [$bandwidth_table_  get $src]
491                 set ml [$my_latency_table_ get $src]
492                 if {[info exists latency_table_($gateway_addr,$src)] && $bw != "" && $ml != ""} {
493                         set d_curr $latency_table_($gateway_addr,$src)
494                         set log_str "$log_str $bw $ml $d_curr "
495                 } else {
496                         set log_str "$log_str - - - "
497                 }
498         }
499         $self instvar log_
500         puts $log_ "[$self time] $log_str $sum $score_($client_addr,$gateway_addr)"
501 
502         return $score_($client_addr,$gateway_addr)
503 }
504 
505 
506 #--------------------------------------------------------------------
507 # AGLP/Gateway evaluate
508 # input : client_addr : the address of a client
509 #         gateway_addr : the address of a gateway
510 # purpose : evaluate ourself to see if we are better then the gateway
511 #           for servicing client 
512 #-------------------------------------------------------------------
513 
514 AGLP/Gateway private evaluate { client_addr pid gateway_addr } {
515 
516         $self instvar sent_replace_ config_ 
517 
518         if [info exists sent_replace_($client_addr,$pid)] {
519 
520                 # haven't get any reply from previous replace message.
521 
522                 $self debug "no reply from prev replace yet."
523                 return
524         }
525 
526         if {[$self is_servicing $client_addr $pid]} {
527 
528                 # if already serving the client, no need to evaluate
529 
530                 return
531         }
532 
533         $self instvar better_counters_
534 
535         set score [$self calc_score $client_addr $gateway_addr]
536         $self debug "## my score is $score"
537 
538         set better_count [$better_counters_ get "$client_addr,$pid,$gateway_addr"]
539         if {$better_count == ""} {
540                 set better_count 0
541         }
542 
543         if {$score > $config_(REPLACE_THRESHOLD)} {
544 
545                 # The threshold is set to a constant instead of 0, because 
546                 # we do not want to handoff if the current gateway is a 
547                 # only slightly better alternative.
548 
549                 # We only try to replace others if we are better than the 
550                 # current gateway $NUM_OF_EVALUATION times in a row, within
551                 # a minute.
552 
553                 if {$better_count == 3} {
554                 
555                         # Let the current gateway know that I am a better alternative
556                         # for serving the client.  We wait an amount of time that is
557                         # inversely proportional to the measurement of "how much better"
558                         # I am compare to the current gateway.
559 
560                         $self instvar replace_timer_
561                         set waiting_time [expr {int($config_(REPLACE_CONSTANT)/$score)}]
562                         $self debug "## wait for $waiting_time ms"
563 
564                         if {[info exists replace_timer_($client_addr,$pid,$gateway_addr)]} {
565                                 return
566                         } 
567                         set replace_timer_($client_addr,$pid,$gateway_addr) [new MashTimer  \
568                                 "once" $waiting_time \
569                                 "$self send_replace $client_addr $pid $gateway_addr $score"]
570                         $self instvar my_score_
571                         set my_score_($client_addr,$gateway_addr) $score
572                         
573                         $better_counters_ set "$client_addr,$pid,$gateway_addr" 0
574 
575                 } else {
576 
577                         incr better_count
578                         $better_counters_ set "$client_addr,$pid,$gateway_addr" $better_count
579 
580                 }
581 
582         } else {
583 
584                 $better_counters_ set "$client_addr,$pid,$gateway_addr" 0
585         }
586 }
587 
588 
589 #--------------------------------------------------------------------
590 # AGLP/Gateway is_servicing
591 # input : client_addr : a client
592 # purpose : return true if we are servicing this client, return false
593 #           otherwise.
594 #-------------------------------------------------------------------
595 
596 AGLP/Gateway public is_servicing { client_addr port } {
597         $self instvar service_
598         return [info exists service_($client_addr,$port)] 
599 }
600 
601 
602 #--------------------------------------------------------------------
603 # AGLP/Gateway cancel_replace_timer
604 # 
605 # input : client_addr : address of the client we are trying to offer
606 #         our service to.
607 #         current_gateway : address of the gateway we are trying to
608 #         replace.
609 # purpose : cancel the timer to offer our service replace the gateway
610 #           for serving the client. This is called when the timer 
611 #           timeout, or when another replace message with better score
612 #           is received, or when another gateway has take over the
613 #           service (handoff_complete), or when this gateway has
614 #           started serving the client (called through
615 #           cancel_all_replace_timer).
616 #-------------------------------------------------------------------
617 
618 AGLP/Gateway private cancel_replace_timer { client_addr pid current_gateway } {
619         $self instvar replace_timer_
620         if {[info exists replace_timer_($client_addr,$pid,$current_gateway)]} {
621                 delete $replace_timer_($client_addr,$pid,$current_gateway)
622                 unset replace_timer_($client_addr,$pid,$current_gateway)
623         }
624 }
625 
626 
627 #--------------------------------------------------------------------
628 # AGLP/Gateway cancel_all_replace_timer
629 # 
630 # input : client_addr : address of the client we are trying to offer
631 #         our service to.
632 # purpose : cancel all pending replace timer related to client_addr.
633 #           This is called when we start servicing the client.
634 #--------------------------------------------------------------------
635 
636 AGLP/Gateway private cancel_all_replace_timer { client_addr } {
637         $self instvar replace_timer_
638         if {[array exists replace_timer_]} {
639                 foreach {key value} [array get replace_timer_ $client_addr,*] {
640                         delete $value
641                         unset replace_timer_($key)
642                 }
643         }
644 }
645 
646 
647 #--------------------------------------------------------------------
648 # AGLP/Gateway cancel_offer_service_timer
649 # 
650 # input : client_addr : address of the client we are try to offer
651 #         our service to.
652 # purpose : cancel the timer to offer our service to the client. This
653 #           is called when the timer timeout, or when another offer
654 #           from another gateway is received.
655 #--------------------------------------------------------------------
656 
657 AGLP/Gateway private cancel_offer_service_timer { client_addr pid } {
658         $self instvar accept_timer_
659         if {[info exists accept_timer_($client_addr,$pid)]} {
660                 delete $accept_timer_($client_addr,$pid)
661                 unset accept_timer_($client_addr,$pid)
662         } 
663 }
664 
665 
666 #--------------------------------------------------------------------
667 # AGLP/Gateway cancel_pick_best_timer
668 # 
669 # input : client_addr : address of the client we are try to find 
670 #         a replacement gateway
671 # purpose : cancel the timer to select a replacement gateway.  This
672 #           is called when the timer timeout, when we stop servicing
673 #           the client, and when we are notified that we have been
674 #           replaced (by handoff_complete) message.
675 #--------------------------------------------------------------------
676 
677 AGLP/Gateway private cancel_pick_best_timer { client_addr pid } {
678         $self instvar pick_best_timer_
679         if {[info exists pick_best_timer_($client_addr,$pid)]} {
680                 delete $pick_best_timer_($client_addr,$pid)
681                 unset pick_best_timer_($client_addr,$pid)
682         } else {
683                 $self debug "no pick best timer $client_addr $pid !!"
684         }
685 }
686 
687 
688 #--------------------------------------------------------------------
689 # AGLP/Gateway start_service_announcement
690 # 
691 # input : client_addr : address of the client to start servicing.
692 # purpose : Starts the periodic "serve" announcement.  This is done
693 #           when we start servicing $client_addr.
694 #--------------------------------------------------------------------
695 
696 AGLP/Gateway private start_service_announcement { client_addr port } {
697         $self instvar announce_service_timer_ config_
698         set announce_service_timer_($client_addr,$port) [new MashTimer \
699                 "random_periodic"  \
700                 $config_(ANNOUNCE_SERVICE_PERIOD) \
701                 "$self send_serve $client_addr $port "]
702 }
703 
704 
705 #--------------------------------------------------------------------
706 # AGLP/Gateway stop_service_announcement
707 # 
708 # input : client_addr : address of the client to stop servicing.
709 # purpose : Stop the periodic "serve" announcement.  This is done
710 #           when we stop servicing $client_addr.
711 #--------------------------------------------------------------------
712 
713 AGLP/Gateway private stop_service_announcement { client_addr port } {
714         $self instvar announce_service_timer_ 
715         if [info exists announce_service_timer_($client_addr,$port)] {
716                 delete $announce_service_timer_($client_addr,$port) 
717                 unset announce_service_timer_
718         }
719 }
720 
721 
722 proc  update_better_counter {n} {
723 }
724 
725 
726 AGLP/Gateway public recv_request_service { client_addr port args } {
727 
728         $self instvar application_ accept_timer_
729         scan $args "%d %d" pid send_time
730 
731         set cl [$self get_option serveOnly]
732         set hostname [lindex [split [lookup_host_name $client_addr] .] 0]
733         if {[lsearch $cl $hostname] == -1 && $cl != ""} {
734                 return
735         }
736 
737         if {![info exists accept_timer_($client_addr,$pid)]} {
738                 # No timer pending for this client yet, start a timer
739                 # that will send an offer.
740                 set waiting_time [$self generate_wait_time $send_time]
741                 set accept_timer_($client_addr,$pid) [new MashTimer \
742                         "once" $waiting_time "$self send_offer_service $client_addr $pid"]
743         }
744 }
745 
746 
747 AGLP/Gateway public recv_served_by { client_addr port args } {
748 
749         $self instvar addr_ service_ 
750 
751         scan $args "%s %s %f" pid gateway_addr timestamp
752 
753         if {$gateway_addr == $addr_} {
754                 if {[$self is_servicing $client_addr $pid]} {
755                         #$self debug "-- refresh servicing $client_addr $pid"
756                         $service_($client_addr,$pid) refresh_service_state
757                 }
758         } else {
759                 $self instvar client_ctrl_tcp_ replace_addr_
760                 if {[info exists replace_addr_($client_addr,$pid)]} {
761                         if {$gateway_addr != $replace_addr_($client_addr,$pid)} {
762                                 # We are sure that the client is being served by some guy
763                                 # other than me and the one I am trying to replace.  We can
764                                 # now close the channel 
765                                 if {[info exists client_ctrl_tcp_($client_addr,$pid)]} {
766                                         puts "closing channel $client_addr,$pid"
767                                         delete $client_ctrl_tcp_($client_addr,$pid)
768                                         unset client_ctrl_tcp_($client_addr,$pid)
769                                 }
770                         }
771                 } else {
772                         if {[info exists client_ctrl_tcp_($client_addr,$pid)]} {
773                                 puts "closing channel $client_addr,$pid"
774                                 delete $client_ctrl_tcp_($client_addr,$pid)
775                                 unset client_ctrl_tcp_($client_addr,$pid)
776                         }
777                 }
778         }
779         set now [gettimeofday]
780         $self set_latency $client_addr [expr $now - $timestamp]
781 }
782 
783 
784 AGLP/Gateway public recv_offer_service { gateway_addr port args } {
785 
786         scan $args "%s %s" client_addr pid
787         $self cancel_offer_service_timer $client_addr $pid
788 
789 }
790 
791 AGLP/Gateway public recv_serve { gateway_addr port args } {
792 
793         $self instvar application_ latency_table_ bandwidth_table_ config_ addr_
794 
795         if {$gateway_addr == $addr_} {
796                 return
797         }
798 
799         set client_addr [lindex $args 0]
800         set pid [lindex $args 1]
801         set recv_spec [lindex $args 2]
802         set send_spec [lindex $args 3]
803         set src_list [lrange $args 4 end]
804 
805         foreach {src latency bandwidth} $src_list {
806                 $self add_source $gateway_addr $client_addr -1 $src
807                 #$self add_rtcp_source $src
808                 set latency_table_($gateway_addr,$src) $latency
809                 $bandwidth_table_ set $src $bandwidth
810         }
811         if {$src_list != ""} {
812                 foreach g $recv_spec {
813                         $self collect_latencies $g
814                 }
815                 after $config_(MAX_PING_PERIOD) "$self evaluate $client_addr $pid $gateway_addr"
816         }
817 }
818 
819 
820 AGLP/Gateway public recv_replace { gateway_addr port args } {
821                 
822         $self instvar better_counter_ 
823         $self instvar my_score_ config_ service_ addr_
824 
825         #--------------------------------------------------------
826         # The other gateway thinks that it is better than the
827         # current gateway.  If we are the current gateway, we
828         # wait for a period of time, before picking gateway that
829         # is the best so far.  If we are not the current gateway,
830         # and we are also better than the current gateway, we compare
831         # the score of the other gateway with our score.  If our
832         # score is lower, we keep quiet and do not send out our
833         # "i am better" message. If our score is higher, we do not
834         # do anything. (i.e. we proceed with sending our "i am 
835         # better" message).
836         #--------------------------------------------------------
837 
838         $self debug "-- recv replace $args"
839         scan $args "%s %s %s %d" client_addr pid current_gateway score
840 
841         if {$current_gateway == $addr_ && [$self is_servicing $client_addr $pid]} {
842                 
843                 $self instvar pick_best_timer_
844                 if {![info exists pick_best_timer_($client_addr,$pid)]} {
845                         set better_counter_ 1
846                         set pick_best_timer_($client_addr,$pid) [new MashTimer \
847                                 "once" $config_(PICK_BEST_PERIOD) \
848                                 "$self handoff $client_addr $pid"]
849                         $self debug "sched pick best .. $client_addr $pid"
850                 } else {
851                         incr better_counter_
852                         $self debug "$better_counter_ replace received."
853                 }
854 
855                 $service_($client_addr,$pid) update_best_gateway $score $gateway_addr
856 
857         } elseif {[info exists my_score_($client_addr,$current_gateway)]} {
858                 if {$my_score_($client_addr,$current_gateway) < $score} {
859                         # Doh ! We lost.  Let's cancel our "i am better"
860                         # message before it is too late ! 
861                         $self cancel_replace_timer $client_addr $pid $current_gateway
862                 } 
863         }
864 }
865 
866 AGLP/Gateway public recv_handoff { gateway_addr port args } {
867 
868         $self instvar application_ addr_
869 
870         if {$gateway_addr == $addr_} {
871                 return
872         }
873 
874         scan $args "%s %d %s %s %s %s %s" client_addr pid better_gateway recv_group bandwidth load ctrl_port
875         $self debug "## recv handoff $args"
876 
877         if {$better_gateway == $addr_} {
878                 # I am chosen to replace the current gateway.
879                 if {![$self is_servicing $client_addr $pid]} {
880                         $self prepare_handoff $client_addr $pid $gateway_addr $ctrl_port
881                 }
882         } else {
883 #   Added 10 SEP 2000
884                 $self cancel_replace_timer $client_addr $pid $gateway_addr
885         }       
886 }
887 
888 
889 AGLP/Gateway public prepare_handoff { client_addr pid gateway_addr port } {
890         $self debug "## prepare to handoff.."
891         $self instvar get_deglet_tcp_
892     set get_deglet_tcp_($client_addr,$pid) [new TCP/Client/DegasControl]
893         set tcp $get_deglet_tcp_($client_addr,$pid)
894         $tcp set_aglp $self
895         puts "tcp: $tcp aglp: [$tcp set aglp_]"
896 
897 #   XXX : Should check for error here..
898         $tcp open $gateway_addr $port
899         $tcp send "get $client_addr $pid\n"
900 }
901 
902 
903 AGLP/Gateway public recv_handoff_complete { gateway_addr port args } {
904 
905         $self instvar addr_
906 
907         set client_addr [lindex $args 0]
908         set pid [lindex $args 1]
909         set curr_gateway [lindex $args 3]
910         if {$curr_gateway == $addr_} {
911                 #$self debug "recv $args"
912                 # If I am done handing off to another.
913                 $self cancel_pick_best_timer $client_addr $pid
914                 $self instvar handoff_tcp_
915                 delete $handoff_tcp_($gateway_addr)
916         } else {
917                 # if we have sent a replace message for this client,
918                 # we cancel the timer.
919                 $self cancel_replace_timer $client_addr $pid $curr_gateway
920         }
921 }
922 
923 
924 #----------------------------------------------------------------
925 # AGLP/Gateway add_sources
926 #
927 # Add source to the list of sources for subdeglet (client,id)
928 #----------------------------------------------------------------
929 
930 AGLP/Gateway instproc add_source { gateway client id source } {
931         $self instvar source_table_ source_state_ config_
932         if [info exists source_table_($gateway,$client,$id)] {
933                 if {[lsearch $source_table_($gateway,$client,$id) $source] == -1} {
934                         lappend source_table_($gateway,$client,$id) $source
935                         set source_state_($gateway,$client,$id,$source) [new MashSoftState $source \
936                                 $config_(KEEP_SOURCE_PERIOD) "$self del_source $gateway $client $id $source"]
937                 } else {
938                         $source_state_($gateway,$client,$id,$source) refresh
939                 }
940         } else {