1 import AGLP
2 import MashTimer
3 import MashSoftState
4 import MashSoftTable
5 import TCP/Server/DegasControl
6 import TCP/Client/DegasControl
7 import DegasAgent
8 import DegletService
9 import RTCPListener
10
11 Class AGLP/Gateway -superclass AGLP
12
13 AGLP/Gateway private generate_wait_time { send_time } {
14 set t [expr 200*([gettimeofday]-$send_time)]
15 if {$t < 0} {
16 set t [expr [MashRNG integer 500] + 1500]
17 }
18 return $t
19 }
20
21 AGLP/Gateway public init { group mtu } {
22 $self instvar config_ service_id_
23 $self instvar better_counters_
24
25 # LOGGING
26 # $self instvar log_ load_log_
27 # set hostname [lindex [split [info hostname] .] 0]
28 # set log_ [open "$hostname.log" w]
29 # set load_log_ [open "$hostname.load" w]
30 # after idle "$self log_load"
31
32 $self instvar bandwidth_table_ my_latency_table_
33
34 $self next "G" $group $mtu
35
36 set config_(ANNOUNCE_SERVICE_PERIOD) 30000
37 set config_(MAX_PING_PERIOD) 30000
38 set config_(STOP_SERVICE_PERIOD) 80000
39 set config_(PICK_BEST_PERIOD) 5000
40 set config_(KEEP_SOURCE_PERIOD) 80000
41 set config_(NO_REPLACE_PERIOD) 120000
42 set config_(REPLACE_THRESHOLD) 200
43 set config_(NUM_OF_EVALUATION) 4
44 set config_(REPLACE_CONSTANT) 1000000
45 set config_(REMEMBER_SENT_REPLACE_PERIOD) 20000
46 set config_(REMEMBER_SENT_HANDOFF_PERIOD) 20000
47
48
49 set service_id_ 0
50
51 set bandwidth_table_ [new MashSoftTable 60000]
52 set my_latency_table_ [new MashSoftTable 60000]
53 set better_counters_ [new MashSoftTable 60000]
54
55 }
56
57
58 AGLP/Gateway instproc destroy { } {
59 # LOGGING
60 $self instvar log_
61 close $log_
62 }
63
64
65 #--------------------------------------------------------------------
66 # AGLP/Gateway send_xxx
67 #
68 # These functions are probably timer callback that send some message
69 # to some guys.
70 #--------------------------------------------------------------------
71
72 AGLP/Gateway private send { msg } {
73 $self announce "DCP g $msg\n"
74 }
75
76 AGLP/Gateway private send_offer_service { client_addr pid } {
77 $self instvar accept_timer_ client_ctrl_tcp_
78
79 set ctrl_port [$self generate_unique_port]
80 set client_ctrl_tcp_($client_addr,$pid) [new TCP/Server/DegasControl $self ]
81 $client_ctrl_tcp_($client_addr,$pid) open $ctrl_port
82
83 after 500 "
84 $self send \"offer_service $client_addr $pid $ctrl_port\"
85 "
86 delete $accept_timer_($client_addr,$pid)
87 unset accept_timer_($client_addr,$pid)
88
89 }
90
91 AGLP/Gateway private construct_source_list { client_addr port } {
92 $self instvar addr_ service_
93 $self instvar bandwidth_table_ my_latency_table_
94 set src_list ""
95 foreach src [$service_($client_addr,$port) get_sources] {
96 set l [$my_latency_table_ get $src]
97 set bw [$bandwidth_table_ get $src]
98 if {$l != "" && $bw != ""} {
99 set src_list [format "$src_list $src %.3f %.1f" $l $bw]
100 }
101 }
102 set l [$my_latency_table_ get $client_addr]
103 set bw [$bandwidth_table_ get $client_addr]
104 if {$l != "" && $bw != ""} {
105 set src_list [format "$src_list $client_addr %.2f %.2f" $l $bw]
106 }
107 return $src_list
108 }
109
110 AGLP/Gateway private send_serve { client_addr pid } {
111 $self instvar addr_ node_ service_ just_started_
112
113 if {![$self is_servicing $client_addr $pid]} {
114 return
115 }
116
117 # We do not construct the source list for the first
118 # config_(NO_REPLACE_PERIOD). Because when the server
119 # first starts, the information collected are incomplete,
120 # and inaccurate. During the first NO_REPLACE_PERIOD,
121 # the variable just_started_ is set to 1.
122
123 if {$just_started_ == 1} {
124 set src_list ""
125 } else {
126 set src_list [$self construct_source_list $client_addr $pid]
127 }
128
129 set recv_group [$service_($client_addr,$pid) get_recv_session]
130 set send_group [$service_($client_addr,$pid) get_send_session]
131
132 set count [expr {[llength $src_list]/3}]
133 # XXX : WEITSANG FOR EXPERIMENT ONLY..
134 set count 0
135
136 if {$count == 0} {
137 $self send "serve $client_addr $pid $recv_group $send_group"
138 return
139 }
140
141 # If the serve message is too long, we split it into several small
142 # messages.
143
144 set start 0
145 while {$count > 3} {
146 set sub_list [lrange $src_list $start [expr $start + 8]]
147 $self debug "SERVE $sub_list"
148 $self send "serve $client_addr $pid $recv_group $send_group $sub_list"
149 incr start 9
150 set count [expr $count - 3]
151 }
152 set sub_list [lrange $src_list $start end]
153 if {$sub_list != ""} {
154 # $self debug "SERVE $sub_list"
155 $self send "serve $client_addr $pid $recv_group $send_group $sub_list"
156 }
157 }
158
159
160 AGLP/Gateway private send_handoff_complete { client_addr pid send_to gateway_addr} {
161 $self instvar client_ctrl_tcp_ replace_addr_
162 set ctrl_port [$self generate_unique_port]
163 set client_ctrl_tcp_($client_addr,$pid) [new TCP/Server/DegasControl $self ]
164 $client_ctrl_tcp_($client_addr,$pid) open $ctrl_port
165 after 500 "
166 $self send \"handoff_complete $client_addr $pid $send_to $gateway_addr $ctrl_port\"
167 "
168
169 set replace_addr_($client_addr,$pid) $gateway_addr
170 }
171
172
173 AGLP/Gateway private send_replace { client_addr pid gateway_addr score } {
174 $self send "replace $client_addr $pid $gateway_addr $score"
175 $self debug "-- send replace"
176 $self cancel_replace_timer $client_addr $pid $gateway_addr
177
178 # We do not want to send multiple replace message within a short period
179 # of time, so we remember the fact that we already sent a replace message.
180 # We forget that fact after a few seconds.
181 $self remember_sent_replace $client_addr $pid
182 }
183
184
185 #--------------------------------------------------------------------
186 # AGLP/Gateway remember_sent_replace
187 # AGLP/Gateway forget_sent_replace
188 #
189 # input : client_addr and gateway_addr : you know what these are !
190 # purpose : a convinient interface to a softstate to indicates whether
191 # we have sent a replace (replace) message recently.
192 # We remember that after we sent the message, and forget it
193 # after a period of time
194 #--------------------------------------------------------------------
195
196 AGLP/Gateway private remember_sent_replace { client_addr pid } {
197 $self instvar sent_replace_ config_
198 set sent_replace_($client_addr,$pid) [new MashSoftState \
199 1 $config_(REMEMBER_SENT_REPLACE_PERIOD) \
200 "$self forget_sent_replace $client_addr $pid"]
201 }
202
203 AGLP/Gateway private forget_sent_replace { client_addr pid } {
204 $self instvar sent_replace_
205 delete $sent_replace_($client_addr,$pid)
206 unset sent_replace_($client_addr,$pid)
207 }
208
209
210 AGLP/Gateway private remember_sent_handoff { client_addr pid } {
211 $self instvar sent_handoff_ config_
212 set sent_handoff_($client_addr,$pid) [new MashSoftState \
213 1 $config_(REMEMBER_SENT_HANDOFF_PERIOD) \
214 "$self forget_sent_handoff $client_addr $pid"]
215 }
216
217 AGLP/Gateway private forget_sent_handoff { client_addr pid } {
218 $self instvar sent_handoff_
219 delete $sent_handoff_($client_addr,$pid)
220 unset sent_handoff_($client_addr,$pid)
221 }
222
223
224
225 #--------------------------------------------------------------------
226 # AGLP/Gateway start_service
227 #
228 # input : client_addr : address of the client to start servicing.
229 # purpose : This is called by recv{} whenever a "client ack_service"
230 # message is received. It starts a periodic timer that
231 # annouce it's service for client client_addr, and notify
232 # the application to start servicing client.
233 #--------------------------------------------------------------------
234
235 AGLP/Gateway public start_service { client_addr pid out_session } {
236
237 #-------------------------------------------------------
238 # If I am already serving this client, ignore the call
239 #-------------------------------------------------------
240
241 if {[$self is_servicing $client_addr $pid]} {
242 $self debug "Duplicated service requested for $client_addr/$pid"
243 return
244 }
245
246 $self debug "** start servicing $client_addr $pid $out_session"
247 $self instvar config_ service_ service_id_ program_
248
249 # we set just_started_ to 1 initially. And set it to 0
250 # after NO_REPLACE_PERIOD
251
252 $self set just_started_ 1
253 after $config_(NO_REPLACE_PERIOD) "$self set just_started_ 0"
254
255 # If this is a result of a handoff,
256 # We can close the tcp port used to retriving deglet.
257
258 $self instvar get_deglet_tcp_
259 if {[info exists get_deglet_tcp_($client_addr,$pid)]} {
260 delete $get_deglet_tcp_($client_addr,$pid)
261 unset get_deglet_tcp_($client_addr,$pid)
262 }
263
264
265 incr service_id_
266
267 #-------------------------------------------------------
268 # Create a new DegletService object. This object
269 # maintain all information about a client that is being
270 # served.
271 #-------------------------------------------------------
272
273 set service_($client_addr,$pid) [new DegletService $client_addr $service_id_]
274 set service $service_($client_addr,$pid)
275 $service set_service_state [new MashSoftState 1 \
276 $config_(STOP_SERVICE_PERIOD) "$self stop_service $client_addr $pid"]
277
278 set program [$self get_program $client_addr $pid]
279 set agent [new DegasAgent $self $service $program $out_session]
280 set in_session [$agent get_program_value $program "input_session"]
281
282 $self instvar client_ctrl_tcp_
283 $client_ctrl_tcp_($client_addr,$pid) set agent_ $agent
284
285 $service set_input_session $in_session
286 $service set_output_session $out_session
287 $service set_agent $agent
288
289 # Starts a timer to periodically announce that I am serving this client
290 $self send_serve $client_addr $pid
291 $self start_service_announcement $client_addr $pid
292
293 #-------------------------------------------------------
294 # Check if we have any replace message pending, for client_addr.
295 # If we do, cancel these messages. If I am already serving this client,
296 # no need to try replacing others.
297 #-------------------------------------------------------
298
299 $self cancel_all_replace_timer $client_addr
300
301 #-------------------------------------------------------
302 # If this service is a result of a handoff, we delete
303 # our TCP channel that we used to retrieve deglet.
304 #-------------------------------------------------------
305
306 $self instvar get_deglet_tcp_
307 if [info exists get_deglet_tcp_($client_addr,$pid)] {
308 delete $get_deglet_tcp_($client_addr,$pid)
309 unset get_deglet_tcp_($client_addr,$pid)
310 }
311
312 }
313
314
315 AGLP/Gateway private stop_service { client_addr pid } {
316 $self instvar service_
317
318 $self debug "** stop servicing $client_addr"
319
320 $self stop_service_announcement $client_addr $pid
321 delete $service_($client_addr,$pid)
322 unset service_($client_addr,$pid)
323
324 $self instvar client_ctrl_tcp_
325 if {[info exists client_ctrl_tcp_($client_addr,$pid)]} {
326 puts "closing channel $client_addr,$pid"
327 delete $client_ctrl_tcp_($client_addr,$pid)
328 unset client_ctrl_tcp_($client_addr,$pid)
329 }
330 }
331
332
333 #-------------------------------------------------------------------------
334 # AGLP/Gateway recv
335 # input : msg - a message received by this agent.
336 # purpose : this is _the_ place where things happen. We parse the message
337 # and act according to the type of message we received.
338 #-------------------------------------------------------------------------
339
340 AGLP/Gateway private recv {addr port msg length} {
341 set magic_word [lindex $msg 0]
342 if {$magic_word != "DCP"} {
343 return
344 }
345
346 set from [lindex $msg 1]
347 set type [lindex $msg 2]
348 set rest [lrange $msg 3 end]
349
350 eval $self recv_$type $addr $port $rest
351 }
352
353
354 AGLP/Gateway private collect_latencies { session_spec } {
355 # start a small rtcp agent, to listen to rtcp messages
356 $self instvar node_ rtcp_agents_ application_ collecting_
357
358 set l [split $session_spec /]
359 set group [lindex $l 0]
360 set port [lindex $l 1]
361 set ttl [lindex $l 2]
362 # Convert it to control address by adding one to port number
363 set session_spec "$group/[expr $port + 1]/$ttl"
364
365 if {![info exists rtcp_agents_($group)]} {
366 set net [new Network]
367 $net open $group [expr $port + 1] $ttl
368 set rtcp_agents_($group) [new RTCPListener $self $net]
369 # $self debug "COLLECTING LATENCIES for $group : $rtcp_agents_($group)"
370 }
371 if {![info exists collecting_($group)] || $collecting_($group) == 0} {
372 # $group refers to the data session. add 1 to mcast addr
373 # to get control session
374 set collecting_($group) 1
375 $self schedule_stop_collection $group
376 }
377 }
378
379
380 AGLP/Gateway private schedule_stop_collection { group } {
381 $self instvar collecting_timers_ config_
382
383 set collecting_timers_($group) [new MashTimer "once" \
384 $config_(MAX_PING_PERIOD) "$self stop_collecting_latencies $group"]
385 }
386
387
388 AGLP/Gateway private stop_collecting_latencies { group } {
389 $self instvar collecting_timers_ rtcp_agents_ collecting_ node_
390
391 if {[info exists collecting_($group)] && $collecting_($group) == 1} {
392 delete $collecting_timers_($group)
393 # $group refers to the data session. add 1 to mcast addr
394 # to get control session
395 if {[info exists rtcp_agents_($group)]} {
396 delete $rtcp_agents_($group)
397 unset rtcp_agents_($group)
398 }
399 set collecting_($group) 0
400 }
401 }
402
403
404 #--------------------------------------------------------------------
405 # AGLP/Gateway handoff
406 #
407 # input : client_addr : the address of a client
408 # purpose : handoff the service for this client to a better gateway.
409 #-------------------------------------------------------------------
410
411 AGLP/Gateway private handoff { client_addr pid } {
412 $self instvar addr_ application_ bandwidth_table_
413 $self instvar better_counter_
414 $self instvar service_ handoff_tcp_
415
416 if {[$self is_servicing $client_addr $pid]} {
417 set service $service_($client_addr,$pid)
418 set best [$service get_best_gateway]
419 if {$best != ""} {
420 set best_score [$service get_best_score]
421 set recv_group [$service get_recv_session]
422 set bandwidth [$bandwidth_table_ get $client_addr]
423 set load 0.1
424
425 set ctrl_port [$self generate_unique_port]
426 set handoff_tcp_($best) [new TCP/Server/DegasControl $self ]
427 $handoff_tcp_($best) open $ctrl_port
428
429 set msg "handoff $client_addr $pid $best $recv_group $bandwidth $load $ctrl_port"
430 $self send $msg
431 $self debug "The best guy to serve $client_addr is $best : $best_score"
432
433 $self remember_sent_handoff $client_addr $pid
434
435 # global gstats_
436 # set gstats_(TOTAL_SCORE) [expr $gstats_(TOTAL_SCORE) + $best_score]
437
438 $service reset_best_gateway
439 $self cancel_pick_best_timer $client_addr $pid
440 }
441 } else {
442 $self debug "UH OH ! no best_gateway defined !"
443 }
444 }
445
446
447 #--------------------------------------------------------------------
448 # AGLP/Gateway calc_score
449 #
450 # input : client_addr : the address of a client
451 # gateway_addr : the address of a gateway
452 # purpose : calcuate our score with respect to replacing the gateway
453 # as the server for client.
454 #-------------------------------------------------------------------
455
456 AGLP/Gateway private calc_score { client_addr gateway_addr } {
457 $self instvar addr_ application_ latency_table_ my_latency_table_ bandwidth_table_
458 $self instvar score_
459 set U_self 0
460 set U_curr 0
461 set s ""
462 # LOG
463 foreach src [$self get_sources $gateway_addr $client_addr -1] {
464 set bw [$bandwidth_table_ get $src]
465 set ml [$my_latency_table_ get $src]
466 if {[info exists latency_table_($gateway_addr,$src)] && $bw != "" && $ml != ""} {
467 set s "$s $src"
468 set d_self $ml
469 set d_curr $latency_table_($gateway_addr,$src)
470 set U_self [expr {$U_self + $bw*$d_self}]
471 set U_curr [expr {$U_curr + $bw*$d_curr}]
472 $self debug "CALC SCORE $src : $d_curr \t $d_self \t $bw"
473 } else {
474 # Do not calc score if we do not have all the info.
475 return 0
476 }
477 }
478
479 set sum [expr {$U_curr - $U_self}]
480
481 # Smooth out the scores
482 if [info exists score_($client_addr,$gateway_addr)] {
483 set score_($client_addr,$gateway_addr) [expr 0.1*$sum + 0.9*$score_($client_addr,$gateway_addr)]
484 } else {
485 set score_($client_addr,$gateway_addr) $sum
486 }
487 # LOG
488 set log_str ""
489 foreach src {128.84.223.144 128.84.223.156 128.84.96.120} {
490 set bw [$bandwidth_table_ get $src]
491 set ml [$my_latency_table_ get $src]
492 if {[info exists latency_table_($gateway_addr,$src)] && $bw != "" && $ml != ""} {
493 set d_curr $latency_table_($gateway_addr,$src)
494 set log_str "$log_str $bw $ml $d_curr "
495 } else {
496 set log_str "$log_str - - - "
497 }
498 }
499 $self instvar log_
500 puts $log_ "[$self time] $log_str $sum $score_($client_addr,$gateway_addr)"
501
502 return $score_($client_addr,$gateway_addr)
503 }
504
505
506 #--------------------------------------------------------------------
507 # AGLP/Gateway evaluate
508 # input : client_addr : the address of a client
509 # gateway_addr : the address of a gateway
510 # purpose : evaluate ourself to see if we are better then the gateway
511 # for servicing client
512 #-------------------------------------------------------------------
513
514 AGLP/Gateway private evaluate { client_addr pid gateway_addr } {
515
516 $self instvar sent_replace_ config_
517
518 if [info exists sent_replace_($client_addr,$pid)] {
519
520 # haven't get any reply from previous replace message.
521
522 $self debug "no reply from prev replace yet."
523 return
524 }
525
526 if {[$self is_servicing $client_addr $pid]} {
527
528 # if already serving the client, no need to evaluate
529
530 return
531 }
532
533 $self instvar better_counters_
534
535 set score [$self calc_score $client_addr $gateway_addr]
536 $self debug "## my score is $score"
537
538 set better_count [$better_counters_ get "$client_addr,$pid,$gateway_addr"]
539 if {$better_count == ""} {
540 set better_count 0
541 }
542
543 if {$score > $config_(REPLACE_THRESHOLD)} {
544
545 # The threshold is set to a constant instead of 0, because
546 # we do not want to handoff if the current gateway is a
547 # only slightly better alternative.
548
549 # We only try to replace others if we are better than the
550 # current gateway $NUM_OF_EVALUATION times in a row, within
551 # a minute.
552
553 if {$better_count == 3} {
554
555 # Let the current gateway know that I am a better alternative
556 # for serving the client. We wait an amount of time that is
557 # inversely proportional to the measurement of "how much better"
558 # I am compare to the current gateway.
559
560 $self instvar replace_timer_
561 set waiting_time [expr {int($config_(REPLACE_CONSTANT)/$score)}]
562 $self debug "## wait for $waiting_time ms"
563
564 if {[info exists replace_timer_($client_addr,$pid,$gateway_addr)]} {
565 return
566 }
567 set replace_timer_($client_addr,$pid,$gateway_addr) [new MashTimer \
568 "once" $waiting_time \
569 "$self send_replace $client_addr $pid $gateway_addr $score"]
570 $self instvar my_score_
571 set my_score_($client_addr,$gateway_addr) $score
572
573 $better_counters_ set "$client_addr,$pid,$gateway_addr" 0
574
575 } else {
576
577 incr better_count
578 $better_counters_ set "$client_addr,$pid,$gateway_addr" $better_count
579
580 }
581
582 } else {
583
584 $better_counters_ set "$client_addr,$pid,$gateway_addr" 0
585 }
586 }
587
588
589 #--------------------------------------------------------------------
590 # AGLP/Gateway is_servicing
591 # input : client_addr : a client
592 # purpose : return true if we are servicing this client, return false
593 # otherwise.
594 #-------------------------------------------------------------------
595
596 AGLP/Gateway public is_servicing { client_addr port } {
597 $self instvar service_
598 return [info exists service_($client_addr,$port)]
599 }
600
601
602 #--------------------------------------------------------------------
603 # AGLP/Gateway cancel_replace_timer
604 #
605 # input : client_addr : address of the client we are trying to offer
606 # our service to.
607 # current_gateway : address of the gateway we are trying to
608 # replace.
609 # purpose : cancel the timer to offer our service replace the gateway
610 # for serving the client. This is called when the timer
611 # timeout, or when another replace message with better score
612 # is received, or when another gateway has take over the
613 # service (handoff_complete), or when this gateway has
614 # started serving the client (called through
615 # cancel_all_replace_timer).
616 #-------------------------------------------------------------------
617
618 AGLP/Gateway private cancel_replace_timer { client_addr pid current_gateway } {
619 $self instvar replace_timer_
620 if {[info exists replace_timer_($client_addr,$pid,$current_gateway)]} {
621 delete $replace_timer_($client_addr,$pid,$current_gateway)
622 unset replace_timer_($client_addr,$pid,$current_gateway)
623 }
624 }
625
626
627 #--------------------------------------------------------------------
628 # AGLP/Gateway cancel_all_replace_timer
629 #
630 # input : client_addr : address of the client we are trying to offer
631 # our service to.
632 # purpose : cancel all pending replace timer related to client_addr.
633 # This is called when we start servicing the client.
634 #--------------------------------------------------------------------
635
636 AGLP/Gateway private cancel_all_replace_timer { client_addr } {
637 $self instvar replace_timer_
638 if {[array exists replace_timer_]} {
639 foreach {key value} [array get replace_timer_ $client_addr,*] {
640 delete $value
641 unset replace_timer_($key)
642 }
643 }
644 }
645
646
647 #--------------------------------------------------------------------
648 # AGLP/Gateway cancel_offer_service_timer
649 #
650 # input : client_addr : address of the client we are try to offer
651 # our service to.
652 # purpose : cancel the timer to offer our service to the client. This
653 # is called when the timer timeout, or when another offer
654 # from another gateway is received.
655 #--------------------------------------------------------------------
656
657 AGLP/Gateway private cancel_offer_service_timer { client_addr pid } {
658 $self instvar accept_timer_
659 if {[info exists accept_timer_($client_addr,$pid)]} {
660 delete $accept_timer_($client_addr,$pid)
661 unset accept_timer_($client_addr,$pid)
662 }
663 }
664
665
666 #--------------------------------------------------------------------
667 # AGLP/Gateway cancel_pick_best_timer
668 #
669 # input : client_addr : address of the client we are try to find
670 # a replacement gateway
671 # purpose : cancel the timer to select a replacement gateway. This
672 # is called when the timer timeout, when we stop servicing
673 # the client, and when we are notified that we have been
674 # replaced (by handoff_complete) message.
675 #--------------------------------------------------------------------
676
677 AGLP/Gateway private cancel_pick_best_timer { client_addr pid } {
678 $self instvar pick_best_timer_
679 if {[info exists pick_best_timer_($client_addr,$pid)]} {
680 delete $pick_best_timer_($client_addr,$pid)
681 unset pick_best_timer_($client_addr,$pid)
682 } else {
683 $self debug "no pick best timer $client_addr $pid !!"
684 }
685 }
686
687
688 #--------------------------------------------------------------------
689 # AGLP/Gateway start_service_announcement
690 #
691 # input : client_addr : address of the client to start servicing.
692 # purpose : Starts the periodic "serve" announcement. This is done
693 # when we start servicing $client_addr.
694 #--------------------------------------------------------------------
695
696 AGLP/Gateway private start_service_announcement { client_addr port } {
697 $self instvar announce_service_timer_ config_
698 set announce_service_timer_($client_addr,$port) [new MashTimer \
699 "random_periodic" \
700 $config_(ANNOUNCE_SERVICE_PERIOD) \
701 "$self send_serve $client_addr $port "]
702 }
703
704
705 #--------------------------------------------------------------------
706 # AGLP/Gateway stop_service_announcement
707 #
708 # input : client_addr : address of the client to stop servicing.
709 # purpose : Stop the periodic "serve" announcement. This is done
710 # when we stop servicing $client_addr.
711 #--------------------------------------------------------------------
712
713 AGLP/Gateway private stop_service_announcement { client_addr port } {
714 $self instvar announce_service_timer_
715 if [info exists announce_service_timer_($client_addr,$port)] {
716 delete $announce_service_timer_($client_addr,$port)
717 unset announce_service_timer_
718 }
719 }
720
721
722 proc update_better_counter {n} {
723 }
724
725
726 AGLP/Gateway public recv_request_service { client_addr port args } {
727
728 $self instvar application_ accept_timer_
729 scan $args "%d %d" pid send_time
730
731 set cl [$self get_option serveOnly]
732 set hostname [lindex [split [lookup_host_name $client_addr] .] 0]
733 if {[lsearch $cl $hostname] == -1 && $cl != ""} {
734 return
735 }
736
737 if {![info exists accept_timer_($client_addr,$pid)]} {
738 # No timer pending for this client yet, start a timer
739 # that will send an offer.
740 set waiting_time [$self generate_wait_time $send_time]
741 set accept_timer_($client_addr,$pid) [new MashTimer \
742 "once" $waiting_time "$self send_offer_service $client_addr $pid"]
743 }
744 }
745
746
747 AGLP/Gateway public recv_served_by { client_addr port args } {
748
749 $self instvar addr_ service_
750
751 scan $args "%s %s %f" pid gateway_addr timestamp
752
753 if {$gateway_addr == $addr_} {
754 if {[$self is_servicing $client_addr $pid]} {
755 #$self debug "-- refresh servicing $client_addr $pid"
756 $service_($client_addr,$pid) refresh_service_state
757 }
758 } else {
759 $self instvar client_ctrl_tcp_ replace_addr_
760 if {[info exists replace_addr_($client_addr,$pid)]} {
761 if {$gateway_addr != $replace_addr_($client_addr,$pid)} {
762 # We are sure that the client is being served by some guy
763 # other than me and the one I am trying to replace. We can
764 # now close the channel
765 if {[info exists client_ctrl_tcp_($client_addr,$pid)]} {
766 puts "closing channel $client_addr,$pid"
767 delete $client_ctrl_tcp_($client_addr,$pid)
768 unset client_ctrl_tcp_($client_addr,$pid)
769 }
770 }
771 } else {
772 if {[info exists client_ctrl_tcp_($client_addr,$pid)]} {
773 puts "closing channel $client_addr,$pid"
774 delete $client_ctrl_tcp_($client_addr,$pid)
775 unset client_ctrl_tcp_($client_addr,$pid)
776 }
777 }
778 }
779 set now [gettimeofday]
780 $self set_latency $client_addr [expr $now - $timestamp]
781 }
782
783
784 AGLP/Gateway public recv_offer_service { gateway_addr port args } {
785
786 scan $args "%s %s" client_addr pid
787 $self cancel_offer_service_timer $client_addr $pid
788
789 }
790
791 AGLP/Gateway public recv_serve { gateway_addr port args } {
792
793 $self instvar application_ latency_table_ bandwidth_table_ config_ addr_
794
795 if {$gateway_addr == $addr_} {
796 return
797 }
798
799 set client_addr [lindex $args 0]
800 set pid [lindex $args 1]
801 set recv_spec [lindex $args 2]
802 set send_spec [lindex $args 3]
803 set src_list [lrange $args 4 end]
804
805 foreach {src latency bandwidth} $src_list {
806 $self add_source $gateway_addr $client_addr -1 $src
807 #$self add_rtcp_source $src
808 set latency_table_($gateway_addr,$src) $latency
809 $bandwidth_table_ set $src $bandwidth
810 }
811 if {$src_list != ""} {
812 foreach g $recv_spec {
813 $self collect_latencies $g
814 }
815 after $config_(MAX_PING_PERIOD) "$self evaluate $client_addr $pid $gateway_addr"
816 }
817 }
818
819
820 AGLP/Gateway public recv_replace { gateway_addr port args } {
821
822 $self instvar better_counter_
823 $self instvar my_score_ config_ service_ addr_
824
825 #--------------------------------------------------------
826 # The other gateway thinks that it is better than the
827 # current gateway. If we are the current gateway, we
828 # wait for a period of time, before picking gateway that
829 # is the best so far. If we are not the current gateway,
830 # and we are also better than the current gateway, we compare
831 # the score of the other gateway with our score. If our
832 # score is lower, we keep quiet and do not send out our
833 # "i am better" message. If our score is higher, we do not
834 # do anything. (i.e. we proceed with sending our "i am
835 # better" message).
836 #--------------------------------------------------------
837
838 $self debug "-- recv replace $args"
839 scan $args "%s %s %s %d" client_addr pid current_gateway score
840
841 if {$current_gateway == $addr_ && [$self is_servicing $client_addr $pid]} {
842
843 $self instvar pick_best_timer_
844 if {![info exists pick_best_timer_($client_addr,$pid)]} {
845 set better_counter_ 1
846 set pick_best_timer_($client_addr,$pid) [new MashTimer \
847 "once" $config_(PICK_BEST_PERIOD) \
848 "$self handoff $client_addr $pid"]
849 $self debug "sched pick best .. $client_addr $pid"
850 } else {
851 incr better_counter_
852 $self debug "$better_counter_ replace received."
853 }
854
855 $service_($client_addr,$pid) update_best_gateway $score $gateway_addr
856
857 } elseif {[info exists my_score_($client_addr,$current_gateway)]} {
858 if {$my_score_($client_addr,$current_gateway) < $score} {
859 # Doh ! We lost. Let's cancel our "i am better"
860 # message before it is too late !
861 $self cancel_replace_timer $client_addr $pid $current_gateway
862 }
863 }
864 }
865
866 AGLP/Gateway public recv_handoff { gateway_addr port args } {
867
868 $self instvar application_ addr_
869
870 if {$gateway_addr == $addr_} {
871 return
872 }
873
874 scan $args "%s %d %s %s %s %s %s" client_addr pid better_gateway recv_group bandwidth load ctrl_port
875 $self debug "## recv handoff $args"
876
877 if {$better_gateway == $addr_} {
878 # I am chosen to replace the current gateway.
879 if {![$self is_servicing $client_addr $pid]} {
880 $self prepare_handoff $client_addr $pid $gateway_addr $ctrl_port
881 }
882 } else {
883 # Added 10 SEP 2000
884 $self cancel_replace_timer $client_addr $pid $gateway_addr
885 }
886 }
887
888
889 AGLP/Gateway public prepare_handoff { client_addr pid gateway_addr port } {
890 $self debug "## prepare to handoff.."
891 $self instvar get_deglet_tcp_
892 set get_deglet_tcp_($client_addr,$pid) [new TCP/Client/DegasControl]
893 set tcp $get_deglet_tcp_($client_addr,$pid)
894 $tcp set_aglp $self
895 puts "tcp: $tcp aglp: [$tcp set aglp_]"
896
897 # XXX : Should check for error here..
898 $tcp open $gateway_addr $port
899 $tcp send "get $client_addr $pid\n"
900 }
901
902
903 AGLP/Gateway public recv_handoff_complete { gateway_addr port args } {
904
905 $self instvar addr_
906
907 set client_addr [lindex $args 0]
908 set pid [lindex $args 1]
909 set curr_gateway [lindex $args 3]
910 if {$curr_gateway == $addr_} {
911 #$self debug "recv $args"
912 # If I am done handing off to another.
913 $self cancel_pick_best_timer $client_addr $pid
914 $self instvar handoff_tcp_
915 delete $handoff_tcp_($gateway_addr)
916 } else {
917 # if we have sent a replace message for this client,
918 # we cancel the timer.
919 $self cancel_replace_timer $client_addr $pid $curr_gateway
920 }
921 }
922
923
924 #----------------------------------------------------------------
925 # AGLP/Gateway add_sources
926 #
927 # Add source to the list of sources for subdeglet (client,id)
928 #----------------------------------------------------------------
929
930 AGLP/Gateway instproc add_source { gateway client id source } {
931 $self instvar source_table_ source_state_ config_
932 if [info exists source_table_($gateway,$client,$id)] {
933 if {[lsearch $source_table_($gateway,$client,$id) $source] == -1} {
934 lappend source_table_($gateway,$client,$id) $source
935 set source_state_($gateway,$client,$id,$source) [new MashSoftState $source \
936 $config_(KEEP_SOURCE_PERIOD) "$self del_source $gateway $client $id $source"]
937 } else {
938 $source_state_($gateway,$client,$id,$source) refresh
939 }
940 } else {