1 /*
2 * mb-play.cc --
3 *
4 * MediaBoard Archive Playback
5 *
6 * Copyright (c) 1997-2002 The Regents of the University of California.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 * A. Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 * B. Redistributions in binary form must reproduce the above copyright notice,
15 * this list of conditions and the following disclaimer in the documentation
16 * and/or other materials provided with the distribution.
17 * C. Neither the names of the copyright holders nor the names of its
18 * contributors may be used to endorse or promote products derived from this
19 * software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS
22 * IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
23 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
25 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 * POSSIBILITY OF SUCH DAMAGE.
32 *
33 * @(#) $Header: /usr/mash/src/repository/mash/mash-1/archive/mb-play.cc,v 1.23 2003/08/20 20:37:27 aswan Exp $
34 */
35
36
37 #include "archive/mb-play.h"
38 #include "mb/mb.h"
39 #include "mb/mb-nethost.h"
40 #include "mb/mb-cmd.h"
41 #include "tclcl-mappings.h"
42
43
44 DEFINE_OTCL_CLASS(MBPlaybackMgr, "MB_Manager/Play") {
45 INSTPROC(create_srm_source);
46 }
47
48
49 DEFINE_OTCL_CLASS(MBPlaybackStream, "ArchiveStream/Play/Mediaboard") {
50 }
51
52
53 int
54 MBPlaybackMgr::create_srm_source(int argc, const char * const *argv)
55 {
56 TclObject *streamObj;
57 const char *uid, *addr, *userName;
58
59 BEGIN_PARSE_ARGS(argc, argv);
60 ARG(streamObj); // TclObject *
61 ARG(uid); // char *
62 ARG(addr); // char *
63 ARG(userName); // char *
64 END_PARSE_ARGS;
65
66 if (sources_==NULL) {
67 sources_ = session()->source_manager()->local_sources();
68 }
69 // reset the indices
70 nextADUIdx_ = fillSAIdx_ = NULL;
71
72
73 MBPlaybackStream *stream = (MBPlaybackStream *) streamObj;
74 Tcl &tcl = Tcl::instance();
75 SRM_Source *source;
76 MBPlaybackRcvr *rcvr;
77
78 // create the SRM source for this stream
79 if (session()->Invoke("create-local", uid, addr, userName,
80 NULL)==TCL_ERROR) {
81 tcl.resultf("error creating SRM source for new stream: %s",
82 tcl.result());
83 goto error;
84 }
85
86 // attach the stream and SRM rcvr objects together;
87 source = (SRM_Source*)tcl.lookup(tcl.result());
88 if (source!=NULL && (rcvr = (MBPlaybackRcvr*)source->handler())!=NULL){
89 stream->attach(rcvr);
90 rcvr->attach(stream);
91 }
92 else {
93 tcl.resultf("invalid SRM source or packet handler object");
94 goto error;
95 }
96
97 return TCL_OK;
98
99 error:
100 tcl.add_errorf("\ninvoked from within MBPlaybackMgr::"
101 "NewStream()");
102 return TCL_ERROR;
103 }
104
105
106 int
107 MBPlaybackMgr::next_ADU(u_char *pb, int len, srm_src &id, int &pkt_type,
108 int &next)
109 {
110 next=0;
111 pkt_type = APP_DATA;
112 // TODO: should poll all receivers or preq as well
113
114 if (sources_==NULL) {
115 MTrace(trcArchive, ("Haven't created any sources"));
116 return 0;
117 }
118
119 if (nextADUIdx_==NULL) {
120 nextADUIdx_ = sources_->getFirst();
121 }
122 else {
123 nextADUIdx_ = sources_->getNext(nextADUIdx_);
124 if (nextADUIdx_==NULL) nextADUIdx_ = sources_->getFirst();
125 }
126
127 int outlen=0;
128 MBPlaybackRcvr *rcvr;
129 while ((outlen==0 && next==0)
130 && sources_->IsDone(nextADUIdx_)==FALSE) {
131 rcvr = (MBPlaybackRcvr*) (sources_->getData(nextADUIdx_)->
132 handler());
133 outlen = rcvr->NextADU(pb, len, next, NULL);
134 if (outlen==0 && next==0) {
135 nextADUIdx_ = sources_->getNext(nextADUIdx_);
136 } else {
137 // set the id of this guy
138 id = rcvr->getSrcId();
139 }
140 }
141 return outlen;
142 }
143
144
145
146 int
147 MBPlaybackMgr::periodic_update(Byte *pb)
148 {
149 if (sources_==NULL) {
150 MTrace(trcArchive, ("Haven't created any sources"));
151 return 0;
152 }
153
154 if (fillSAIdx_==NULL) {
155 fillSAIdx_ = sources_->getFirst();
156 }
157 else {
158 fillSAIdx_ = sources_->getNext(fillSAIdx_);
159 if (fillSAIdx_==NULL) fillSAIdx_ = sources_->getFirst();
160 }
161
162 int outlen=0;
163 MBPlaybackRcvr *rcvr;
164 while (outlen==0 && sources_->IsDone(fillSAIdx_)==FALSE) {
165 rcvr = (MBPlaybackRcvr*) (sources_->getData(fillSAIdx_)->
166 handler());
167 outlen = rcvr->FillSA(pb, getMTU());
168 if (outlen==0) {
169 fillSAIdx_ = sources_->getNext(fillSAIdx_);
170 }
171 }
172
173 if (outlen==0) {
174 // I don't have anything to send; I should reset the SA timer
175 // FIXME: ignoring the return code from this method
176 session()->sa_timer()->Invoke("reset", NULL);
177 }
178 return outlen;
179 }
180
181
182 int
183 MBPlaybackRcvr::handleCmd(MBCmd * /*pCmd*/, MBPageObject * /*pPage*/,
184 const MBTime& /*oldTime*/, const MBTime& /*newTime*/)
185 {
186 // ignore any packets we get from the network
187 return TRUE;
188 }
189
190
191 Bool
192 MBPlaybackRcvr::Dispatch(MBCmd *pCmd, MBPageObject *pPage)
193 {
194 char foo1[100], foo2[100];
195 strcpy(foo1, (char*)pPage->getId().sid);
196 strcpy(foo2, (char*)getSrcId());
197 MTrace(trcArchive|trcExcessive, ("##^^ Page %s:%d (%p) for receiver "
198 "(%s, %p) has maxSeqno %d "
199 "before Dispatch",
200 foo1, pPage->getId().uid, pPage,
201 foo2, this,
202 pPage->getMaxSeqno()));
203 /* Note: right now handlecmd just calls a dummy executecmd
204 * that does nothing, so the time paramaters does not matter,
205 * we should change it to some sensible value if that is not
206 * the case */
207 if (MBBaseRcvr::handleCmd(pCmd, pPage, cMBTimeAny, cMBTimeAny)
208 != MB_EXE_OK)
209 return FALSE;
210 MTrace(trcArchive|trcExcessive, ("##^^ Page %s:%d (%p) for receiver "
211 "(%s, %p) has maxSeqno %d "
212 "before Dispatch for command %d",
213 foo1, pPage->getId().uid, pPage,
214 foo2, this, pPage->getMaxSeqno(),
215 pCmd->getSeqno()));
216 int dataSize = pPage->RequestSendAmount()+sizeof(Pkt_DataHdr);
217 getMgr()->RequestSend(dataSize);
218
219 // notify all observers
220 bytesSent_ += dataSize;
221 stream_->Invokef("notify_observers bytes_sent %lu",
222 (unsigned long) bytesSent_);
223 return TRUE;
224 }
225
226
227
228 void
229 MBPlaybackStream::Clip(timeval /*start*/, timeval end)
230 {
231 endTS_ = logical2mb(end.tv_sec, end.tv_usec);
232 }
233
234
235 void
236 MBPlaybackStream::LTS_Speed()
237 {
238 PlaybackStream::LTS_Speed();
239 }
240
241
242 void
243 MBPlaybackStream::LTS_Reference()
244 {
245 PlaybackStream::LTS_Reference();
246 }
247
248
249 int
250 MBPlaybackStream::ReadRecord()
251 {
252 u_int16_t len;
253 DataFile *file;
254 PageId pageId;
255 u_int32_t sn;
256 Tcl &tcl = Tcl::instance();
257
258 file = DataFile_();
259 if (firstTime_==TRUE) {
260 if (file->Seek(file->getHeaderSize(), SEEK_SET)==TCL_ERROR)
261 goto error;
262 firstTime_ = FALSE;
263 }
264
265 state.bufferLen = sizeof(Pkt_PageHdr);
266 if (state.buffer.alloc(state.bufferLen)==FALSE) {
267 tcl.resultf("buffer allocation error");
268 goto error;
269 }
270
271 if (file->Read(state.buffer.pb, sizeof(Pkt_PageHdr)) <= 0) {
272 tcl.resultf("cannot read data file: %s",
273 strerror(Tcl_GetErrno()));
274 goto error;
275 }
276
277 Pkt_PageHdr *pHdr;
278 pHdr = (Pkt_PageHdr*) (state.buffer.pb);
279 net2host(pHdr->pd_page, pageId);
280
281 // modify the srcId field in the PageId so that it doesn't interfere
282 // with a normal receiver
283 pHdr->pd_page.sid = rcvr_->getSrcId();
284
285 state.sseq = net2host(pHdr->pd_sseq);
286 state.eseq = net2host(pHdr->pd_eseq);
287 MTrace(trcArchive, ("Reading record with seqnos %d to %d (%d)",
288 state.sseq, state.eseq, MYNUM(this)));
289
290 for (sn=state.sseq; sn <= state.eseq; sn++) {
291 if (state.buffer.alloc(state.bufferLen +
292 sizeof(Pkt_CmdHdr))==FALSE) {
293 tcl.resultf("cannot allocate memory");
294 goto error;
295 }
296 if (file->Read(state.buffer.pb + state.bufferLen,
297 sizeof(Pkt_CmdHdr)) < 0) {
298 tcl.resultf("cannot read data file: %s",
299 strerror(Tcl_GetErrno()));
300 goto error;
301 }
302
303 len = net2host(((Pkt_CmdHdr*)(state.buffer.pb +
304 state.bufferLen))->dh_len);
305 if (state.buffer.alloc(state.bufferLen + len)==FALSE) {
306 tcl.resultf("cannot allocate memory");
307 goto error;
308 }
309 if (file->Read((state.buffer.pb + state.bufferLen +
310 sizeof(Pkt_CmdHdr)),
311 len - sizeof(Pkt_CmdHdr)) < 0) {
312 tcl.resultf("cannot read data file: %s",
313 strerror(Tcl_GetErrno()));
314 goto error;
315 }
316 state.bufferLen += len;
317 }
318
319 state.pPage = rcvr_->DefinePage(pageId);
320 state.seqno = state.sseq;
321 state.pb = state.buffer.pb + sizeof(Pkt_PageHdr);
322 MTrace(trcArchive, ("Successfully read the data (%d)", MYNUM(this)));
323 return TCL_OK;
324
325 error:
326 tcl.add_errorf("\ninvoked from within MBPlaybackStream::ReadRecord()");
327 return TCL_ERROR;
328 }
329
330
331 int
332 MBPlaybackStream::NextEvent(timeval &logical)
333 {
334 timeval now;
335 /*, diff_tv;
336 u_int32_t nowTS;*/
337 Pkt_CmdHdr *pHdr;
338 u_int32_t timestamp;
339
340 if (state.seqno > state.eseq) {
341 // we are done with the previous record; fetch the next one!
342 if (DataFile_()->Eof()==TRUE) {
343 logical.tv_sec = logical.tv_usec = 0;
344 return TCL_OK;
345 }
346
347 if (ReadRecord()==TCL_ERROR) {
348 if (DataFile_()->Eof()==TRUE) {
349 logical.tv_sec = logical.tv_usec = 0;
350 return TCL_OK;
351 }
352 else goto error;
353 }
354 }
355
356 pHdr = (Pkt_CmdHdr*) state.pb;
357 timestamp = net2host(pHdr->dh_ts);
358 if (endTS_ > 0 && timestamp > endTS_) {
359 // we are done
360 MTrace(trcArchive, ("Exceeded ending ts (%lu): %lu (%d)",
361 (unsigned long) endTS_,
362 (unsigned long) timestamp, MYNUM(this)));
363 logical.tv_sec = logical.tv_usec = 0;
364 return TCL_OK;
365 }
366
367 now = LTS_()->NowLogical();
368 logical = mb2logical(timestamp, now);
369 return TCL_OK;
370
371 error:
372 Tcl::instance().add_errorf("\ninvoked from within "
373 "MBPlaybackStream::NextEvent()");
374 return TCL_ERROR;
375 }
376
377
378 void
379 MBPlaybackStream::DoEvent()
380 {
381 MBCmd* pCmd=MBCmd::Create(state.seqno, state.pb);
382 state.seqno++;
383 state.pb += net2host(((Pkt_CmdHdr*)state.pb)->dh_len);
384
385 MTrace(trcArchive, ("Doing an event for page %ld, cmd %u, type %u "
386 "(%d)", state.pPage->getId().uid, state.seqno-1,
387 pCmd->getType(), MYNUM(this)));
388
389 if (!pCmd) {
390 // should treat as an error packet and ask for resend?
391 SignalError(("Received corrupted packet: corrupted command"));
392 return;
393 }
394
395 // modify the packet of this command to the current system time!
396 timeval now = LTS_()->NowSystem();
397 pCmd->SetTimeStamp(tod2mb(now));
398
399 if (!rcvr_->Dispatch(pCmd, state.pPage)) {
400 SignalError(("Received corrupted or invalid command"));
401 return;
402 }
403 }
404
This page was automatically generated by the
LXR engine.
Visit the LXR main site for more
information.