~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Open Mash Cross Reference
mash/tcl/degas/degasserver/alm-degas-server.tcl

Component: ~ [ mash ] ~ [ apps ] ~ [ gsm ] ~ [ lib ] ~ [ otcl ] ~ [ srm ] ~ [ tcl8.3 ] ~ [ tclcl ] ~ [ tk8.3 ] ~ [ tutorials ] ~

  1 import TCP/Server/DegasControl
  2 import AnnounceListenManager
  3 import DegasAgent
  4 
  5 #------------------------------------------------------------------

  6 # Class AnnounceListenManager/DegasServer

  7 #

  8 # purpose :

  9 #   This is the heart and soul of the Degas server.  

 10 #   It processes the messages from other servers and other 

 11 #       clients.   It also sends "alive" message to other 

 12 #       server.

 13 #

 14 # members :

 15 #

 16 # programs_

 17 #   An array index by addr, port and key. The value of the

 18 #   array contains the value of key submitted by client at

 19 #   addr/port.

 20 #

 21 # alive_ 

 22 #   An array index by addr and port.  alive_($addr,$port) exists

 23 #   iff the server at addr:port is alive.  (The value does not

 24 #   really matters).

 25 #

 26 # alive_timeout_id_

 27 #   This is an array index by addr and port, the id of the 

 28 #   "after" timeout command.  

 29 #

 30 # alive_timeout_

 31 #   Timeout of an alive message, after which the server is

 32 #   consider dead.

 33 #

 34 # alive_interval_id_

 35 #   This is the "after" id of the after event for announcing

 36 #   my own alive message.

 37 #

 38 # alive_interval_

 39 #   This is the interval between announcement of alive message.

 40 #

 41 # num_of_server_

 42 #   Number of other servers on this control session.  This is 

 43 #   incremented everytime an "alive" message is received, and

 44 #   decremented everytime a "bye" is received, or timeout of

 45 #   an alive occurs.

 46 #

 47 # servable_

 48 #   Servable value for server A for client B will be stored in 

 49 #   servable_(A-addr, A-port, B-addr, B-port)

 50 #

 51 # ignore_servable_(addr,port)

 52 #   Servable message for client addr/port will be ignored this is

 53 #   not_defined.  This happens because my servable value is larger 

 54 #   than at least one of the other server, so it is not possible 

 55 #   for me to service the client.

 56 # 

 57 # self_servable_(addr,port)

 58 #   My servable value for client addr/port

 59 #

 60 # servable_count_(addr,port)

 61 #   How many servable value for client addr/port have been received.

 62 #

 63 # servable_id_

 64 #   This is an array index by addr and port, the id of the 

 65 #   "after" timeout command.  

 66 #

 67 # servable_timeout_

 68 #   Timeout of an servable message, after which the server with current

 69 #   minimum start providing service even if not all servable value has

 70 #   been received.

 71 # 

 72 # out_session_(addr,port)

 73 #   Output session for client addr/port.

 74 #

 75 # unique_port_

 76 #   A unique port number. It is initialized to $min_port_, and increments

 77 #   by 4 everytime, wrap around at $max_port_

 78 # 

 79 # port_in_use_(port)

 80 #   A array indexed by all ports in used.

 81 #

 82 # max_port_

 83 #   Maximum port number to use.

 84 #

 85 # min_port_

 86 #   Minimum port number to use.

 87 #

 88 # addr_in_use_(addr1,addr2)

 89 #   This is set iff multicast address 224.3.$addr1.$addr2 is 

 90 #   already in used.  This is to avoid collision between two

 91 #   randomly generated multicast address.

 92 #

 93 #------------------------------------------------------------------

 94 
 95 Class AnnounceListenManager/DegasServer -superclass {AnnounceListenManager}
 96 
 97 #------------------------------------------------------------------

 98 # AnnouceListenManager/DegasServer::init

 99 #

100 # purpose : Constructor for a AnnouceListenManager/DegasServer obj

101 # input   : spec - Specifies where the ALM should announce and listen.

102 #                  Format is <ipaddr>/<port> or <port>.

103 #           mtu  - Maximum size of a message.  A buffer of this size

104 #                  will be allocated for storing message.

105 # output  : none

106 #------------------------------------------------------------------

107 
108 AnnounceListenManager/DegasServer public init {spec mtu} {
109     $self instvar alive_timeout_
110     $self instvar alive_interval_
111     $self instvar servable_timeout_
112     $self instvar num_of_server_
113     $self instvar min_port_
114     $self instvar max_port_
115     $self instvar unique_port_
116 
117     set alive_timeout_ [$self get_option aliveTimeout]
118     set servable_timeout_ [$self get_option servableTimeout]
119     set alive_interval_ [$self get_option aliveInterval]
120     set min_port_ [$self get_option minPort]
121     set max_port_ [$self get_option maxPort]
122     set unique_port_ $min_port_
123     set num_of_server_ 0
124 
125     $self next $spec $mtu
126 }
127 
128 
129 #------------------------------------------------------------------

130 # AnnouceListenManager/DegasServer::recv_announcement

131 #

132 # purpose : This is called whenever an announcement is received.

133 #           Method handle_client_announcement and handle_server_announcemt

134 #           is called to process the message respectively.

135 # input   : addr - the ip address where this message is from.

136 #           port - the port where this message is from

137 #           m    - the announcement message itself, which should

138 #                  have the following format.

139 #           {

140 #               DCP

141 #               [ client | server ]

142 #               <rest of messages>

143 #           }

144 #           l    - length of the message.

145 # output  : none

146 #------------------------------------------------------------------

147 
148 AnnounceListenManager/DegasServer private recv_announcement {addr port m l} {
149     # Check magic word.

150     set magic_word [lindex $m 0]
151     if {$magic_word != "DCP"} {
152         # This message is not for me.

153         return
154     }
155 
156     # Check message type (from server ? or client ?)

157     set type [lindex $m 1]
158     set m [lrange $m 2 end]
159     if {$type == "server"} {
160         $self handle_server_message $addr $port $m
161     } elseif {$type == "client"} {
162         $self handle_client_message $addr $port $m
163     }
164 
165 }
166 
167 
168 #------------------------------------------------------------------

169 # AnnounceListenManager/DegasServer::handle_client_message

170 #

171 # purpose : The client annoucement is parsed, and the infomation

172 #           is stored in an array indexed by address, port and 

173 #           key in the message.  Then based on the action of the

174 #           announcement, the following happens :

175 #           - if a new program is submitted (action == new), then

176 #             it calculate a "servable" value, and announce this

177 #             value to all other servers the client.   

178 #           - update and cancel are not implemented yet. 

179 # input   : addr - the ip address where this message is from.

180 #           port - the port where this message is from

181 #           m    - the announcement message itself, which should

182 #                  have the following format.

183 #           {

184 #               action               [new|update|cancel]

185 #               input_session_spec   <addr>/<port>/<ttl>

186 #               control_channel_port <port>

187 #               sources              <list of sources>

188 #               max_num_of_sources   <int> or "*"

189 #               init_callback        <program>

190 #               recv_frame_callback  <program>

191 #               new_source_callback  <program>

192 #               del_source_callback  <program>

193 #               destroy_callback     <program>

194 #               }

195 # output  : none

196 #------------------------------------------------------------------

197 
198 AnnounceListenManager/DegasServer private handle_client_message {addr port m} {
199 
200     # Determine action

201     if {[lindex $m 0] != "action"} {
202         puts stderr "error parsing announcement. expect \"action\"
203             keyword but found [lindex $m 0]"
204         return
205 
206     }
207     set action [lindex $m 1]
208     set $m [lrange $m 2 end]
209 
210     switch -exact $action {
211         "request" {
212                         puts "request received"
213                 }
214         "new" {
215             # A new service request is received.  Calculate the

216             # servable value for this client.  And announce this

217             # value to all client.

218 
219             $self instvar programs_
220             foreach {key value} $m {
221                 set programs_($addr,$port,$key) $value
222             }
223 
224             $self instvar self_servable_
225             $self instvar ignore_servable_
226             set servable [$self calc_servable $addr $port]
227             set self_servable_($addr,$port) $servable
228 
229             # this is a new request, so we must pay attention

230             # to this client again.

231             if {[info exists ignore_servable_($addr,$port)]} {
232                 unset ignore_servable_($addr,$port)
233             }
234 
235             # announce my servable value to all other server.

236             $self announce_servable $servable $addr $port
237 
238             # timeout - start service if still the minimum even if

239             # not all servable value is received.

240             $self instvar servable_id_
241             $self instvar servable_timeout_
242             set servable_id_ [after $servable_timeout_ "
243                 if {![info exists ignore_servable_($addr,$port)]} {
244                     $self begin_service $addr $port
245             }" ]
246         }
247         "update" {
248             puts stderr "update is not supported yet"
249         }
250         "cancel" {
251             puts stderr "cancel is not supported yet"
252         }
253         default {
254             puts stderr "error parsing announcement. expect
255             \"new\", \"update\", or \"cancel\" but got $action"
256             return
257         }
258     }
259 }
260 
261 
262 #------------------------------------------------------------------

263 # AnnounceListenManager/DegasServer::handle_server_message

264 #

265 # purpose : This is called by recv_annoucnement to process

266 #           a server message.

267 # input   : addr - the server which sent the message.

268 #           port - the port of the server which sent the message.

269 #           m    - the message itself, with the following format :

270 #           {

271 #               [ alive |

272 #                 bye | 

273 #                 servable <servable value> <client_addr> <client_port> |

274 #                 service <client_addr> <client_port> 

275 #               ]

276 #           }

277 # output  : none

278 #------------------------------------------------------------------

279 
280 AnnounceListenManager/DegasServer private handle_server_message {addr port m} {
281     $self instvar alive_
282     $self instvar num_of_server_
283     $self instvar alive_timeout_id_
284     $self instvar alive_timeout_
285 
286     set action [lindex $m 0]
287     switch $action {
288     "alive" {
289         if {![info exists alive_($addr,$port)]} {
290             # new server detected.

291             incr num_of_server_ 
292         }
293         set alive_($addr,$port) 1
294         # cancel the last timeout and add a new one.

295         # if after alive_timeout_, this timeout is not cancel,

296         # then the server is considered dead.               

297         if {[info exists alive_timeout_id_]} {
298             after cancel $alive_timeout_id_
299         }
300         set alive_timeout_id_ [after $alive_timeout_ "$self bye_bye_server $addr $port"]
301 
302     }
303     "bye" {
304         $self bye_bye_server $addr $port
305     }
306     "service" {
307         puts stderr "service message is not processed currently"
308     }
309     "servable" {
310         # We received a servable message from another 

311         # server.  We check if we are the minimum.  If

312         # we are not, we ignore further servable message

313         # for this client.  If we are the minimum after

314         # all messages are received, (we break tie based

315         # on ip address.) we are the lucky one to provide

316         # service to the client.  We announce the good new

317         # to all other servers.

318 
319         $self instvar ignore_servable_
320         $self instvar self_servable_
321         $self instvar servable_count_
322         $self instvar servable_
323         $self instvar num_of_server_
324         $self instvar servable_id_
325 
326         set servable [lindex $m 0]
327         set client_addr [lindex $m 1]
328         set client_port [lindex $m 2]
329         if {$servable < $self_servable_($client_addr,$client_port)} {
330             set ignore_servable_($client_addr,$client_port) 1
331         }
332         set servable_($client_addr,$client_port,$addr,$port) $servable
333         incr servable_count_($client_addr,$client_port) 
334         if {$servable_count_($client_addr,$client_port) >= $num_of_server_ 
335             && ![info exists ignore_servable_($client_addr,$client_port)] } {
336             after cancel servable_id_
337             $self begin_service $client_addr $client_port
338         }
339     }
340     default {
341         puts "unknown token $action in server message"
342     }
343     }
344 }
345 
346 
347 #------------------------------------------------------------------

348 # AnnouceListenManager/DegasServer::create_annoucement

349 #

350 # purpose : Create a new message.  This message can then be send

351 #           by calling "announce".

352 # input   : 

353 # return  : A new message.

354 #------------------------------------------------------------------

355 
356 AnnounceListenManager/DegasServer private create_announcement {} {
357     return "Dummy Message"
358 }
359 
360 
361 
362 #------------------------------------------------------------------

363 # AnnounceListenManager/DegasServer::calc_servable

364 #

365 # purpose : Calculate a value (0 .. 1) to indicate if it is good

366 #           to provide a service to this client.  1 is good and 

367 #           0 is bad.

368 # input   : addr, port - indicate which client are we trying to

369 #                        serve.

370 # return  : the servable value as float.

371 #------------------------------------------------------------------

372 
373 AnnounceListenManager/DegasServer private calc_servable {addr port} {
374     # Not implemented yet.

375     return 1.0
376 }
377 
378 
379 #------------------------------------------------------------------

380 # AnnounceListenManager/DegasServer::bye_bye_server

381 #

382 # purpose : A server is down.  Remove all records of that server,

383 #           and possibly take over it's job, if any.

384 # input   : addr,port - the address and port of the server we are

385 #                       saying bye-bye to.

386 # output  : none.

387 #------------------------------------------------------------------

388 
389 AnnounceListenManager/DegasServer private bye_bye_server {addr port} {
390     $self instvar alive_
391     $self instvar num_of_server_
392 
393     unset alive_($addr,$port)
394     incr num_of_server_ -1
395 }
396 
397 
398 #------------------------------------------------------------------

399 # AnnounceListenManager/DegasServer::announce_servable

400 #

401 # purpose : Create a servable message and announce it to other

402 #       servers.

403 # input   : s - a servable value (float)

404 # output  : none

405 #------------------------------------------------------------------

406 
407 AnnounceListenManager/DegasServer private announce_servable {s addr port} {
408     $self log "announcing servable $s $addr $port"
409     $self announce "DCP\nserver\nservable $s $addr $port\n"
410 }
411 
412 
413 
414 #------------------------------------------------------------------

415 # AnnounceListenManager/DegasServer::annouce_alive

416 #

417 # purpose : Create an alive announcement and annount to other servers.

418 # input   : none

419 # output  : none

420 #------------------------------------------------------------------

421 
422 AnnounceListenManager/DegasServer private announce_alive {} {
423 #$self log "announcing alive"

424     $self announce "DCP\nserver\nalive\n"
425 }
426 
427 
428 
429 #------------------------------------------------------------------

430 # AnnounceListenManager/DegasServer::announce_bye

431 #

432 # purpose : Create a bye announcement and announce to other servers.

433 # input   : none

434 # output  : none

435 #------------------------------------------------------------------

436 
437 AnnounceListenManager/DegasServer private announce_bye {} {
438     $self log "announcing bye"
439     $self announce "DCP\nserver\nbye\n"
440 }
441 
442 
443 
444 #------------------------------------------------------------------

445 # AnnonceListenManager/DegasServer::announce_service

446 #

447 # purpose : Annouce to other servers that I am providing service to

448 #           to some particular client.

449 # input   : addr,port - the client I am going to service.

450 #           session - output multicast session.

451 #           control_port - port where client should connect for control

452 #               channel.

453 # output  : none.

454 #------------------------------------------------------------------

455 
456 AnnounceListenManager/DegasServer private announce_service {addr port session control_port} {
457     $self announce "DCP\nserver\nservice $addr $port $session $control_port"
458 }
459 
460 
461 #------------------------------------------------------------------

462 # AnnouceListenManager/DegasServer::begin_service

463 #

464 # purpose : Start the service for a client

465 # input   : addr,port - the client I am going to service.

466 # output  : none

467 # note    : this should be private, but put as public to make debugging

468 #           simple.

469 #------------------------------------------------------------------

470 
471 AnnounceListenManager/DegasServer public begin_service {addr port} {
472     $self instvar programs_
473     $self instvar output_session_
474     $self instvar in_service_
475 
476     if {[info exists in_service_($addr,$port)]} {
477         $self log "Duplicated service requested for $addr/$port"
478         return
479     }
480     $self log "Starting service for $addr/$port"
481     set in_service_($addr,$port) 1
482 
483     set output_session_($addr,$port) [$self generate_new_session_spec]
484     set ctrl_port [$self generate_unique_port]
485     puts "Output session : $output_session_($addr,$port)"
486     set agent [new DegasAgent \
487         [$self get_program $addr $port] $output_session_($addr,$port)]
488     set ctrl_tcp_ [new TCP/Server/DegasControl $agent]
489     $ctrl_tcp_ open $ctrl_port
490     $self announce_service $addr $port $output_session_($addr,$port) $ctrl_port
491 }
492 
493 
494 
495 #------------------------------------------------------------------

496 # AnnouceListenManager/DegasServer::run

497 #

498 # purpose : Start the annouce listen manager.  This will cause the

499 #           alive message to be multicast periodically.

500 # input   : none

501 # output  : none

502 #------------------------------------------------------------------

503 
504 AnnounceListenManager/DegasServer public run {} {
505     $self instvar send_alive_id_ 
506     $self instvar alive_interval_
507 
508     $self announce_alive
509     set send_alive_id_ [after $alive_interval_ "$self run"]
510 }
511 
512 
513 #------------------------------------------------------------------

514 # AnnounceListenManager/DegasServer::log

515 #

516 # purpose : create a log message of the server.  This can be either

517 #           to the screen or to a file.

518 # input   : msg - a message to log

519 # output  : none

520 #------------------------------------------------------------------

521 
522 AnnounceListenManager/DegasServer private log {msg} {
523     puts "$msg"
524 }
525 
526 
527 
528 #------------------------------------------------------------------

529 # AnnounceListenManager/DegasServer::get_program

530 #

531 # purpose : This is a util to convert the program submitted from a 

532 #           client to a list, so that we can pass it to DegasAgent.

533 # input   : addr,port - the client which submitted the program

534 #           we are getting.

535 # output  : none

536 #------------------------------------------------------------------

537 
538 AnnounceListenManager/DegasServer private get_program {addr port} {
539     $self instvar programs_
540     return [array get programs_ $addr,$port,*]
541 }
542 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.