1 import TCP/Server/DegasControl
2 import AnnounceListenManager
3 import DegasAgent
4
5 #------------------------------------------------------------------
6 # Class AnnounceListenManager/DegasServer
7 #
8 # purpose :
9 # This is the heart and soul of the Degas server.
10 # It processes the messages from other servers and other
11 # clients. It also sends "alive" message to other
12 # server.
13 #
14 # members :
15 #
16 # programs_
17 # An array index by addr, port and key. The value of the
18 # array contains the value of key submitted by client at
19 # addr/port.
20 #
21 # alive_
22 # An array index by addr and port. alive_($addr,$port) exists
23 # iff the server at addr:port is alive. (The value does not
24 # really matters).
25 #
26 # alive_timeout_id_
27 # This is an array index by addr and port, the id of the
28 # "after" timeout command.
29 #
30 # alive_timeout_
31 # Timeout of an alive message, after which the server is
32 # consider dead.
33 #
34 # alive_interval_id_
35 # This is the "after" id of the after event for announcing
36 # my own alive message.
37 #
38 # alive_interval_
39 # This is the interval between announcement of alive message.
40 #
41 # num_of_server_
42 # Number of other servers on this control session. This is
43 # incremented everytime an "alive" message is received, and
44 # decremented everytime a "bye" is received, or timeout of
45 # an alive occurs.
46 #
47 # servable_
48 # Servable value for server A for client B will be stored in
49 # servable_(A-addr, A-port, B-addr, B-port)
50 #
51 # ignore_servable_(addr,port)
52 # Servable message for client addr/port will be ignored this is
53 # not_defined. This happens because my servable value is larger
54 # than at least one of the other server, so it is not possible
55 # for me to service the client.
56 #
57 # self_servable_(addr,port)
58 # My servable value for client addr/port
59 #
60 # servable_count_(addr,port)
61 # How many servable value for client addr/port have been received.
62 #
63 # servable_id_
64 # This is an array index by addr and port, the id of the
65 # "after" timeout command.
66 #
67 # servable_timeout_
68 # Timeout of an servable message, after which the server with current
69 # minimum start providing service even if not all servable value has
70 # been received.
71 #
72 # out_session_(addr,port)
73 # Output session for client addr/port.
74 #
75 # unique_port_
76 # A unique port number. It is initialized to $min_port_, and increments
77 # by 4 everytime, wrap around at $max_port_
78 #
79 # port_in_use_(port)
80 # A array indexed by all ports in used.
81 #
82 # max_port_
83 # Maximum port number to use.
84 #
85 # min_port_
86 # Minimum port number to use.
87 #
88 # addr_in_use_(addr1,addr2)
89 # This is set iff multicast address 224.3.$addr1.$addr2 is
90 # already in used. This is to avoid collision between two
91 # randomly generated multicast address.
92 #
93 #------------------------------------------------------------------
94
95 Class AnnounceListenManager/DegasServer -superclass {AnnounceListenManager}
96
97 #------------------------------------------------------------------
98 # AnnouceListenManager/DegasServer::init
99 #
100 # purpose : Constructor for a AnnouceListenManager/DegasServer obj
101 # input : spec - Specifies where the ALM should announce and listen.
102 # Format is <ipaddr>/<port> or <port>.
103 # mtu - Maximum size of a message. A buffer of this size
104 # will be allocated for storing message.
105 # output : none
106 #------------------------------------------------------------------
107
108 AnnounceListenManager/DegasServer public init {spec mtu} {
109 $self instvar alive_timeout_
110 $self instvar alive_interval_
111 $self instvar servable_timeout_
112 $self instvar num_of_server_
113 $self instvar min_port_
114 $self instvar max_port_
115 $self instvar unique_port_
116
117 set alive_timeout_ [$self get_option aliveTimeout]
118 set servable_timeout_ [$self get_option servableTimeout]
119 set alive_interval_ [$self get_option aliveInterval]
120 set min_port_ [$self get_option minPort]
121 set max_port_ [$self get_option maxPort]
122 set unique_port_ $min_port_
123 set num_of_server_ 0
124
125 $self next $spec $mtu
126 }
127
128
129 #------------------------------------------------------------------
130 # AnnouceListenManager/DegasServer::recv_announcement
131 #
132 # purpose : This is called whenever an announcement is received.
133 # Method handle_client_announcement and handle_server_announcemt
134 # is called to process the message respectively.
135 # input : addr - the ip address where this message is from.
136 # port - the port where this message is from
137 # m - the announcement message itself, which should
138 # have the following format.
139 # {
140 # DCP
141 # [ client | server ]
142 # <rest of messages>
143 # }
144 # l - length of the message.
145 # output : none
146 #------------------------------------------------------------------
147
148 AnnounceListenManager/DegasServer private recv_announcement {addr port m l} {
149 # Check magic word.
150 set magic_word [lindex $m 0]
151 if {$magic_word != "DCP"} {
152 # This message is not for me.
153 return
154 }
155
156 # Check message type (from server ? or client ?)
157 set type [lindex $m 1]
158 set m [lrange $m 2 end]
159 if {$type == "server"} {
160 $self handle_server_message $addr $port $m
161 } elseif {$type == "client"} {
162 $self handle_client_message $addr $port $m
163 }
164
165 }
166
167
168 #------------------------------------------------------------------
169 # AnnounceListenManager/DegasServer::handle_client_message
170 #
171 # purpose : The client annoucement is parsed, and the infomation
172 # is stored in an array indexed by address, port and
173 # key in the message. Then based on the action of the
174 # announcement, the following happens :
175 # - if a new program is submitted (action == new), then
176 # it calculate a "servable" value, and announce this
177 # value to all other servers the client.
178 # - update and cancel are not implemented yet.
179 # input : addr - the ip address where this message is from.
180 # port - the port where this message is from
181 # m - the announcement message itself, which should
182 # have the following format.
183 # {
184 # action [new|update|cancel]
185 # input_session_spec <addr>/<port>/<ttl>
186 # control_channel_port <port>
187 # sources <list of sources>
188 # max_num_of_sources <int> or "*"
189 # init_callback <program>
190 # recv_frame_callback <program>
191 # new_source_callback <program>
192 # del_source_callback <program>
193 # destroy_callback <program>
194 # }
195 # output : none
196 #------------------------------------------------------------------
197
198 AnnounceListenManager/DegasServer private handle_client_message {addr port m} {
199
200 # Determine action
201 if {[lindex $m 0] != "action"} {
202 puts stderr "error parsing announcement. expect \"action\"
203 keyword but found [lindex $m 0]"
204 return
205
206 }
207 set action [lindex $m 1]
208 set $m [lrange $m 2 end]
209
210 switch -exact $action {
211 "request" {
212 puts "request received"
213 }
214 "new" {
215 # A new service request is received. Calculate the
216 # servable value for this client. And announce this
217 # value to all client.
218
219 $self instvar programs_
220 foreach {key value} $m {
221 set programs_($addr,$port,$key) $value
222 }
223
224 $self instvar self_servable_
225 $self instvar ignore_servable_
226 set servable [$self calc_servable $addr $port]
227 set self_servable_($addr,$port) $servable
228
229 # this is a new request, so we must pay attention
230 # to this client again.
231 if {[info exists ignore_servable_($addr,$port)]} {
232 unset ignore_servable_($addr,$port)
233 }
234
235 # announce my servable value to all other server.
236 $self announce_servable $servable $addr $port
237
238 # timeout - start service if still the minimum even if
239 # not all servable value is received.
240 $self instvar servable_id_
241 $self instvar servable_timeout_
242 set servable_id_ [after $servable_timeout_ "
243 if {![info exists ignore_servable_($addr,$port)]} {
244 $self begin_service $addr $port
245 }" ]
246 }
247 "update" {
248 puts stderr "update is not supported yet"
249 }
250 "cancel" {
251 puts stderr "cancel is not supported yet"
252 }
253 default {
254 puts stderr "error parsing announcement. expect
255 \"new\", \"update\", or \"cancel\" but got $action"
256 return
257 }
258 }
259 }
260
261
262 #------------------------------------------------------------------
263 # AnnounceListenManager/DegasServer::handle_server_message
264 #
265 # purpose : This is called by recv_annoucnement to process
266 # a server message.
267 # input : addr - the server which sent the message.
268 # port - the port of the server which sent the message.
269 # m - the message itself, with the following format :
270 # {
271 # [ alive |
272 # bye |
273 # servable <servable value> <client_addr> <client_port> |
274 # service <client_addr> <client_port>
275 # ]
276 # }
277 # output : none
278 #------------------------------------------------------------------
279
280 AnnounceListenManager/DegasServer private handle_server_message {addr port m} {
281 $self instvar alive_
282 $self instvar num_of_server_
283 $self instvar alive_timeout_id_
284 $self instvar alive_timeout_
285
286 set action [lindex $m 0]
287 switch $action {
288 "alive" {
289 if {![info exists alive_($addr,$port)]} {
290 # new server detected.
291 incr num_of_server_
292 }
293 set alive_($addr,$port) 1
294 # cancel the last timeout and add a new one.
295 # if after alive_timeout_, this timeout is not cancel,
296 # then the server is considered dead.
297 if {[info exists alive_timeout_id_]} {
298 after cancel $alive_timeout_id_
299 }
300 set alive_timeout_id_ [after $alive_timeout_ "$self bye_bye_server $addr $port"]
301
302 }
303 "bye" {
304 $self bye_bye_server $addr $port
305 }
306 "service" {
307 puts stderr "service message is not processed currently"
308 }
309 "servable" {
310 # We received a servable message from another
311 # server. We check if we are the minimum. If
312 # we are not, we ignore further servable message
313 # for this client. If we are the minimum after
314 # all messages are received, (we break tie based
315 # on ip address.) we are the lucky one to provide
316 # service to the client. We announce the good new
317 # to all other servers.
318
319 $self instvar ignore_servable_
320 $self instvar self_servable_
321 $self instvar servable_count_
322 $self instvar servable_
323 $self instvar num_of_server_
324 $self instvar servable_id_
325
326 set servable [lindex $m 0]
327 set client_addr [lindex $m 1]
328 set client_port [lindex $m 2]
329 if {$servable < $self_servable_($client_addr,$client_port)} {
330 set ignore_servable_($client_addr,$client_port) 1
331 }
332 set servable_($client_addr,$client_port,$addr,$port) $servable
333 incr servable_count_($client_addr,$client_port)
334 if {$servable_count_($client_addr,$client_port) >= $num_of_server_
335 && ![info exists ignore_servable_($client_addr,$client_port)] } {
336 after cancel servable_id_
337 $self begin_service $client_addr $client_port
338 }
339 }
340 default {
341 puts "unknown token $action in server message"
342 }
343 }
344 }
345
346
347 #------------------------------------------------------------------
348 # AnnouceListenManager/DegasServer::create_annoucement
349 #
350 # purpose : Create a new message. This message can then be send
351 # by calling "announce".
352 # input :
353 # return : A new message.
354 #------------------------------------------------------------------
355
356 AnnounceListenManager/DegasServer private create_announcement {} {
357 return "Dummy Message"
358 }
359
360
361
362 #------------------------------------------------------------------
363 # AnnounceListenManager/DegasServer::calc_servable
364 #
365 # purpose : Calculate a value (0 .. 1) to indicate if it is good
366 # to provide a service to this client. 1 is good and
367 # 0 is bad.
368 # input : addr, port - indicate which client are we trying to
369 # serve.
370 # return : the servable value as float.
371 #------------------------------------------------------------------
372
373 AnnounceListenManager/DegasServer private calc_servable {addr port} {
374 # Not implemented yet.
375 return 1.0
376 }
377
378
379 #------------------------------------------------------------------
380 # AnnounceListenManager/DegasServer::bye_bye_server
381 #
382 # purpose : A server is down. Remove all records of that server,
383 # and possibly take over it's job, if any.
384 # input : addr,port - the address and port of the server we are
385 # saying bye-bye to.
386 # output : none.
387 #------------------------------------------------------------------
388
389 AnnounceListenManager/DegasServer private bye_bye_server {addr port} {
390 $self instvar alive_
391 $self instvar num_of_server_
392
393 unset alive_($addr,$port)
394 incr num_of_server_ -1
395 }
396
397
398 #------------------------------------------------------------------
399 # AnnounceListenManager/DegasServer::announce_servable
400 #
401 # purpose : Create a servable message and announce it to other
402 # servers.
403 # input : s - a servable value (float)
404 # output : none
405 #------------------------------------------------------------------
406
407 AnnounceListenManager/DegasServer private announce_servable {s addr port} {
408 $self log "announcing servable $s $addr $port"
409 $self announce "DCP\nserver\nservable $s $addr $port\n"
410 }
411
412
413
414 #------------------------------------------------------------------
415 # AnnounceListenManager/DegasServer::annouce_alive
416 #
417 # purpose : Create an alive announcement and annount to other servers.
418 # input : none
419 # output : none
420 #------------------------------------------------------------------
421
422 AnnounceListenManager/DegasServer private announce_alive {} {
423 #$self log "announcing alive"
424 $self announce "DCP\nserver\nalive\n"
425 }
426
427
428
429 #------------------------------------------------------------------
430 # AnnounceListenManager/DegasServer::announce_bye
431 #
432 # purpose : Create a bye announcement and announce to other servers.
433 # input : none
434 # output : none
435 #------------------------------------------------------------------
436
437 AnnounceListenManager/DegasServer private announce_bye {} {
438 $self log "announcing bye"
439 $self announce "DCP\nserver\nbye\n"
440 }
441
442
443
444 #------------------------------------------------------------------
445 # AnnonceListenManager/DegasServer::announce_service
446 #
447 # purpose : Annouce to other servers that I am providing service to
448 # to some particular client.
449 # input : addr,port - the client I am going to service.
450 # session - output multicast session.
451 # control_port - port where client should connect for control
452 # channel.
453 # output : none.
454 #------------------------------------------------------------------
455
456 AnnounceListenManager/DegasServer private announce_service {addr port session control_port} {
457 $self announce "DCP\nserver\nservice $addr $port $session $control_port"
458 }
459
460
461 #------------------------------------------------------------------
462 # AnnouceListenManager/DegasServer::begin_service
463 #
464 # purpose : Start the service for a client
465 # input : addr,port - the client I am going to service.
466 # output : none
467 # note : this should be private, but put as public to make debugging
468 # simple.
469 #------------------------------------------------------------------
470
471 AnnounceListenManager/DegasServer public begin_service {addr port} {
472 $self instvar programs_
473 $self instvar output_session_
474 $self instvar in_service_
475
476 if {[info exists in_service_($addr,$port)]} {
477 $self log "Duplicated service requested for $addr/$port"
478 return
479 }
480 $self log "Starting service for $addr/$port"
481 set in_service_($addr,$port) 1
482
483 set output_session_($addr,$port) [$self generate_new_session_spec]
484 set ctrl_port [$self generate_unique_port]
485 puts "Output session : $output_session_($addr,$port)"
486 set agent [new DegasAgent \
487 [$self get_program $addr $port] $output_session_($addr,$port)]
488 set ctrl_tcp_ [new TCP/Server/DegasControl $agent]
489 $ctrl_tcp_ open $ctrl_port
490 $self announce_service $addr $port $output_session_($addr,$port) $ctrl_port
491 }
492
493
494
495 #------------------------------------------------------------------
496 # AnnouceListenManager/DegasServer::run
497 #
498 # purpose : Start the annouce listen manager. This will cause the
499 # alive message to be multicast periodically.
500 # input : none
501 # output : none
502 #------------------------------------------------------------------
503
504 AnnounceListenManager/DegasServer public run {} {
505 $self instvar send_alive_id_
506 $self instvar alive_interval_
507
508 $self announce_alive
509 set send_alive_id_ [after $alive_interval_ "$self run"]
510 }
511
512
513 #------------------------------------------------------------------
514 # AnnounceListenManager/DegasServer::log
515 #
516 # purpose : create a log message of the server. This can be either
517 # to the screen or to a file.
518 # input : msg - a message to log
519 # output : none
520 #------------------------------------------------------------------
521
522 AnnounceListenManager/DegasServer private log {msg} {
523 puts "$msg"
524 }
525
526
527
528 #------------------------------------------------------------------
529 # AnnounceListenManager/DegasServer::get_program
530 #
531 # purpose : This is a util to convert the program submitted from a
532 # client to a list, so that we can pass it to DegasAgent.
533 # input : addr,port - the client which submitted the program
534 # we are getting.
535 # output : none
536 #------------------------------------------------------------------
537
538 AnnounceListenManager/DegasServer private get_program {addr port} {
539 $self instvar programs_
540 return [array get programs_ $addr,$port,*]
541 }
542
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.