1 Class DegasAgent
2 import RTP/Video
3 import AddressBlock
4 import DegasSession
5 import Session/RTP/RTPGW
6 import MediaAgent/RTPGW
7
8 #------------------------------------------------------------------
9 # Class DegasAgent
10 #
11 # purpose : This is the class that encapsulate the underlying
12 # network session and provide an interface for querying
13 # underlying sessions, plus a set of callbacks for events
14 # such as joining and leaving sessions. This is analogous
15 # to RTPGWAgent (for MeGa)
16 #
17 # members
18 #
19 # dali_interp_
20 # A C++ DaliInterp object that handles the execution of Dali program.
21 #
22 # out_session_
23 # A Session/RTP/RTPGW/Video object that encapsulates the output
24 # multicast session.
25 #
26 # encoder_
27 # The encoder for the output video stream.
28 #
29 # decoders_(src)
30 # The decoder for the incoming video stream from $src. Note that
31 # since different sources can have different formats, we may have
32 # more than one decoders. I need to find out how to share decoders
33 # between sources, if it is possible.
34 #
35 # encoder_buffer_
36 # The buffers for encoder. This get created with the encoder, and
37 # is not used. We need this as a member so that we can free it
38 # later.
39 #
40 # decoder_buffers_(src)
41 # The buffers for decoding. This get created for each decoders.
42 #
43 # max_num_of_sources_
44 # Maximum number of sources we are suppose to serve.
45 #
46 # source_masks_
47 # The list of masks, a source will be serve iff it's name or ipaddr match
48 # one of the mask. The syntax follows tcl "string match" syntax.
49 #
50 # num_of_sources_
51 # Number of sources currently being served.
52 #
53 # sources_
54 # The list of all active sources.
55 #
56 # rtp_
57 # RTP object that helps with various RTP functions, such as mapping
58 # source type to decoder.
59 #------------------------------------------------------------------
60
61 DegasAgent public init {aglp service program_spec out_spec} {
62
63 # Creates input and output session
64
65 $self instvar out_session_ in_session_ aglp_ service_
66
67 set aglp_ $aglp
68 set service_ $service
69 # XXX : Need to fix this.
70
71 set in_spec [$self get_program_value $program_spec "input_session"]
72
73 set out_session_ [new DegasSession/Video $self $out_spec]
74 set out_srcid [$out_session_ random-srcid [localaddr]]
75 puts "Out session created : $out_session_ $out_spec"
76 set out_src_mgr [new MediaAgent/RTPGW $self]
77 $out_session_ sm $out_src_mgr
78 # [$out_session_ set netmgr_] set-subscription-level -1
79 $out_src_mgr create-local $out_srcid [localaddr]
80
81 $self instvar local_src_
82 set local_src_ [$out_src_mgr local]
83
84 global bps_
85 set client_addr [$service get_client_addr]
86 rate_variable bps_($client_addr) 0.1
87 $self instvar start_nb_
88 set start_nb_($client_addr) 0
89 set bps_($client_addr) 0
90 $self instvar report_rate_timer_
91 set report_rate_timer_($local_src_) [new MashTimer "periodic" 5000 \
92 "$self report_rate $local_src_ $client_addr"]
93
94 set in_session_ [new DegasSession/Video $self $in_spec]
95 set in_srcid [$in_session_ random-srcid [localaddr]]
96 puts "In session created : $in_session_ $in_spec"
97 set in_src_mgr [new MediaAgent/RTPGW $self]
98 $in_session_ sm $in_src_mgr
99 # [$in_session_ set netmgr_] set-subscription-level 0
100 $in_src_mgr create-local $in_srcid [localaddr]
101
102 #set out_session_ [new Session/RTP/RTPGW/Video $self]
103 #set out_buffer_ [new BufferPool]
104 #$out_session_ buffer-pool $out_buffer_
105 #$out_session_ loopback-layer -1
106 #set address_block [new AddressBlock $out_spec/1]
107 #$out_session_ reset $address_block
108 #delete $address_block
109
110 #set in_session_ [new Session/RTP/RTPGW/Video $self]
111 #set in_buffer_ [new BufferPool]
112 #$in_session_ buffer-pool $in_buffer_
113 #$in_session_ loopback-layer -1
114 #set address_block [new AddressBlock $in_spec/1]
115 #$in_session_ reset $address_block
116 #delete $address_block
117
118
119 # Setup the pipline dali_interp->encoder->session
120
121 $self instvar dali_interp_
122 $self instvar encoder_
123 $self instvar encoder_buffer_
124
125 set dali_interp_ [new DaliInterp]
126
127 set res_mgr [DegasResourceManager info instances]
128 $dali_interp_ set_resource_manager $res_mgr
129
130 $self log_cpu_usage $res_mgr
131
132 $self init_interp $program_spec
133 set outfmt [$self get_program_value $program_spec "output_format"]
134 set encoder_ [new Module/VideoEncoder/UncompressedTo${outfmt}]
135 if {$encoder_ == ""} {
136 puts stderr "unable to create encoder for format $outfmt"
137 puts stderr "creating default encoder for H261"
138 set encoder_ [new Module/VideoEncoder/UncompressedToH261]
139 } else {
140 puts stderr "encoder for $outfmt created"
141 }
142 $dali_interp_ target $encoder_
143 $encoder_ target [$out_session_ get dih]
144 set encoder_buffer_ [new BufferPool/RTP]
145 $encoder_ buffer-pool $encoder_buffer_
146
147 $encoder_buffer_ srcid $out_srcid
148
149 $self instvar max_num_of_sources_
150 $self instvar source_masks_
151 $self instvar num_of_sources_
152 $self instvar sources_
153 $self instvar rtp_
154
155 set max_num_of_sources_ [$self get_program_value $program_spec \
156 "max_num_of_sources"]
157 set source_masks_ [$self get_program_value $program_spec \
158 "sources"]
159 set num_of_sources_ 0
160 set sources_ {}
161 set rtp_ [new RTP/Video]
162
163 puts "max number of sources $max_num_of_sources_"
164 }
165
166
167 #------------------------------------------------------------------
168 # DegasAgent::destroy {}
169 #
170 # purpose : This is the destructor for DegasAgent. Agent is deleted
171 # when there are no more client listening.
172 # input : none
173 # output : none
174 #------------------------------------------------------------------
175
176 DegasAgent public destroy {} {
177
178 $self instvar sources_
179 foreach src $sources_ {
180 $self deactivate $src
181 }
182
183 $self instvar rtp_
184 delete $rtp_
185
186 $self instvar out_session_
187 delete [$out_session_ sm]
188 delete $out_session_
189
190 #$self instvar out_buffer_
191 #delete $out_buffer_
192
193 $self instvar in_session_
194 delete [$in_session_ sm]
195 delete $in_session_
196
197 #$self instvar in_buffer_
198 #delete $in_buffer_
199
200 $self instvar dali_interp_
201 delete $dali_interp_
202
203 $self instvar encoder_
204 delete $encoder_
205
206 $self instvar encoder_buffer_
207 delete $encoder_buffer_
208
209 $self instvar report_rate_timer_ local_src_ service_
210 global bps_
211 unset bps_([$service_ get_client_addr])
212 delete $report_rate_timer_($local_src_)
213 }
214
215 #------------------------------------------------------------------
216 # DegasAgent::activate {src}
217 #
218 # purpose : This callback get called everytime a new source joins
219 # the session.
220 # input : src - Source object that just joined the session.
221 # output : none
222 #------------------------------------------------------------------
223
224 DegasAgent public activate {src} {
225 $self instvar decoders_ decoder_buffers_ sources_
226 $self instvar num_of_sources_ dali_interp_ rtp_ service_
227
228 $service_ add_source [$src addr]
229
230 if {![$self accept_source $src]} {
231 $self log "does not accept source $src"
232 $src data-handler [new Module/VideoDecoder/Null]
233 return
234 }
235
236 set ifmt [$rtp_ rtp_type [$src format]]
237 set decoders_($src) [$self create_decoder $ifmt]
238 if {$decoders_($src) == ""} {
239 return
240 }
241 set decoder_buffers_($src) [new VidRep/Uncompressed]
242 $decoders_($src) set_frame_buffer $decoder_buffers_($src)
243 $decoder_buffers_($src) set_source_id $decoders_($src)
244
245 lappend $sources_ $src
246 incr num_of_sources_
247
248 $decoders_($src) set src_ $src
249 $src data-handler $decoders_($src)
250 $decoders_($src) target $dali_interp_
251 $dali_interp_ add_source $decoders_($src)
252 }
253
254
255 DegasAgent public report_rate {src addr} {
256 global bps_
257 $self instvar aglp_
258
259 set bps_($addr) [$src layer-stat nb_]
260 #puts "REPORT RATE $addr $bps_($addr)"
261 $aglp_ set_bandwidth $addr $bps_($addr)
262 }
263
264
265 #------------------------------------------------------------------
266 # DegasAgent::deactivate {src}
267 #
268 # purpose : This is called when a src leaves the session.
269 # input : src - the Source object that leaves the session.
270 # output : none.
271 #------------------------------------------------------------------
272
273 DegasAgent public deactivate {src} {
274 $self instvar dali_interp_
275 $self instvar sources_
276 $self instvar decoders_
277 $self instvar service_
278
279 puts "Deactivating source $src [$src addr]"
280 $service_ del_source [$src addr]
281
282 # Remove the source
283 set src_pos [lsearch -exact $sources_ $src]
284 if {$src_pos == -1} {
285 puts stderr "trying to delete a non-existance source $src"
286 return
287 }
288 set sources_ [lreplace $sources_ $src_pos $src_pos]
289
290 $dali_interp_ delete_source $decoders_($src)
291 unset decoders_($src)
292 }
293
294
295 #------------------------------------------------------------------
296 # DegasAgent::init_interp
297 #
298 # purpose : initialize the interpreter using the program_spec.
299 # input : program_spec : this is a list of strings, representing
300 # the program to execute.
301 # output : none
302 #------------------------------------------------------------------
303
304 DegasAgent public init_interp {program_spec} {
305 $self instvar dali_interp_
306 foreach {key value} $program_spec {
307 switch -glob $key {
308 init_callback {
309 $dali_interp_ set_callback init $value
310 }
311
312 new_source_callback {
313 $dali_interp_ set_callback new_source $value
314 }
315
316 del_source_callback {
317 $dali_interp_ set_callback del_source $value
318 }
319
320 mouse_click_callback {
321 $dali_interp_ set_callback mouse_click $value
322 }
323
324 recv_frame_callback {
325 $dali_interp_ set_callback recv_frame $value
326 }
327
328 destroy_callback {
329 $dali_interp_ set_callback destroy $value
330 }
331
332 input_resize_callback {
333 $dali_interp_ set_callback input_resize $value
334 }
335
336 output_resize_callback {
337 $dali_interp_ set_callback output_resize $value
338 }
339
340 output_grayed_callback {
341 $dali_interp_ set_callback output_grayed $value
342 }
343
344 input_frame {
345 $dali_interp_ set_input_frame [lindex $value 0]
346 $dali_interp_ set_input_width [lindex $value 1]
347 $dali_interp_ set_input_height [lindex $value 2]
348 }
349
350 output_format {
351 $dali_interp_ set_output_format $value
352 }
353
354 output_fps {
355 $dali_interp_ set_output_fps $value
356 }
357
358 output_size {
359 $dali_interp_ set_output_size $value
360 }
361
362 output_frame {
363 $dali_interp_ set_output_frame [lindex $value 0]
364 $dali_interp_ set_output_width [lindex $value 1]
365 $dali_interp_ set_output_height [lindex $value 2]
366 }
367
368 packages {
369 foreach package $value {
370 $dali_interp_ load_package $package
371 }
372 }
373
374 description {
375 $dali_interp_ set_description $value
376 }
377
378 source_id {
379 $dali_interp_ set_source_id $value
380 }
381
382 mouse_coord {
383 $dali_interp_ set_mouse_x_coord [lindex $value 0]
384 $dali_interp_ set_mouse_y_coord [lindex $value 1]
385 }
386
387 resize_output_after {
388 after [lindex $value 0] "$dali_interp_ resize_output_frame"
389 }
390 }
391 }
392 }
393
394
395 #------------------------------------------------------------------
396 # DegasAgent::set_maxchannel
397 # DegasAgent::unregister
398 #
399 # purpose : This is defined because Session/RTP/RTPGW calls them.
400 # I am not sure if we need these callbacks.
401 # input : don't care
402 # output : none
403 #------------------------------------------------------------------
404
405 DegasAgent public set_maxchannel {n} {
406
407 }
408
409 DegasAgent public unregister {src} {
410
411 }
412
413 DegasAgent public check_for_sources {} {
414
415 }
416
417 #------------------------------------------------------------------
418 # DegasAgent::register
419 #
420 # purpose : a new source is detected. creates a new control handler
421 # for this source.
422 # input : src
423 # output : none
424 #------------------------------------------------------------------
425
426 DegasAgent public register {src} {
427
428 $self instvar out_session_
429
430 set ctrl_handler [new Replicator/Packet/Copy]
431 $src ctrl-handler $ctrl_handler
432 $ctrl_handler add-target [$out_session_ get cih]
433 }
434
435
436
437 #------------------------------------------------------------------
438 # DegasAgent::get_program_value
439 #
440 # purpose : Given the program specification as a list, find the list
441 # element that corresponds to the key and return the value.
442 # input : program_spec - the program as a list.
443 # key - a string corresponds to the key we are interested
444 # in. (e.g. "sources", "recv_frame_callback" ..)
445 # output : the value corresponds to the key
446 #------------------------------------------------------------------
447
448 DegasAgent private get_program_value { program_spec key } {
449 set index [lsearch -glob $program_spec "$key"]
450 if {$index == -1} {
451 set value ""
452 } else {
453 set value [lindex $program_spec [expr $index + 1]]
454 }
455 return $value
456 }
457
458
459 #------------------------------------------------------------------
460 # DegasAgent::create_decoder
461 #
462 # purpose : Creates a new decoder for the specified format
463 # input : format - the format for which we needs to creates decoder
464 # This is a string defined in tcl/net/rtp.tcl. Only H261
465 # and JPEG is supported right now.
466 # output : the decoder
467 #------------------------------------------------------------------
468
469 DegasAgent private create_decoder { format } {
470 switch $format {
471 jpeg {
472 return [new Module/VideoDecoder/JPEGToUncompressed]
473 }
474 h261 {
475 return [new Module/VideoDecoder/H261ToUncompressed]
476 }
477 default {
478 puts stderr "unknown input format $format. cannot create decoder."
479 return ""
480 }
481 }
482 }
483
484
485 #------------------------------------------------------------------
486 # DegasAgent::accept_source {src}
487 #
488 # purpose : Check if we should accept this source, there are three
489 # conditions we should check :
490 # - duplicate source ?
491 # - have we exceed the maximum number of source ?
492 # - does this src match the addresses given in mask ?
493 # input : src - the source to check.
494 # output : 1 if source is accepted, 0 otherwise.
495 #------------------------------------------------------------------
496
497 DegasAgent private accept_source {src} {
498 $self instvar num_of_sources_
499 $self instvar max_num_of_sources_
500 $self instvar sources_
501 $self instvar source_masks_
502
503 if {$max_num_of_sources_ != "*" &&
504 [regexp {[0-9]+} $max_num_of_sources_]} {
505 if {$num_of_sources_ >= $max_num_of_sources_} {
506 return 0
507 }
508 }
509
510 if {[lsearch -exact $sources_ $src] != -1} {
511 # duplicate source
512 return 0
513 }
514
515 set src_ip_addr [$src addr]
516 foreach mask $source_masks_ {
517 # check ip address
518 if {[string match $mask $src_ip_addr]} {
519 return 1
520 }
521 # XXX : Need to figure out the hostname.
522 }
523 return 0
524 }
525
526
527 #------------------------------------------------------------------
528 # DegasAgent::mouse_click_callback
529 #
530 # purpose : process mouse click at the client side.
531 # input : x, y - mouse coordinates (0 .. 1)
532 # output : none
533 #------------------------------------------------------------------
534
535 DegasAgent public mouse_click_callback {x y} {
536 $self instvar dali_interp_
537 $dali_interp_ mouse_click_callback $x $y
538 }
539
540 DegasAgent instproc print_mem_usage { res_mgr name } {
541 puts "Memory used by $name is [$res_mgr get_mem_used $name] bytes"
542 after 5000 "$self print_mem_usage $res_mgr $name"
543 }
544
545 DegasAgent instproc log_cpu_usage { res_mgr } {
546 $res_mgr log_cpu_usage
547 after 1000 "$self log_cpu_usage $res_mgr"
548 }
549
550 DegasAgent instproc log { msg } {
551 puts stderr "DegasAgent $self : $msg"
552 }
553
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.