~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

Open Mash Cross Reference
mash/archive/mb-play.cc

Component: ~ [ mash ] ~ [ apps ] ~ [ gsm ] ~ [ lib ] ~ [ otcl ] ~ [ srm ] ~ [ tcl8.3 ] ~ [ tclcl ] ~ [ tk8.3 ] ~ [ tutorials ] ~

  1 /*
  2  * mb-play.cc --
  3  *
  4  *      MediaBoard Archive Playback
  5  *
  6  * Copyright (c) 1997-2002 The Regents of the University of California.
  7  * All rights reserved.
  8  *
  9  * Redistribution and use in source and binary forms, with or without
 10  * modification, are permitted provided that the following conditions are met:
 11  *
 12  * A. Redistributions of source code must retain the above copyright notice,
 13  *    this list of conditions and the following disclaimer.
 14  * B. Redistributions in binary form must reproduce the above copyright notice,
 15  *    this list of conditions and the following disclaimer in the documentation
 16  *    and/or other materials provided with the distribution.
 17  * C. Neither the names of the copyright holders nor the names of its
 18  *    contributors may be used to endorse or promote products derived from this
 19  *    software without specific prior written permission.
 20  *
 21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS
 22  * IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 23  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 24  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE
 25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 31  * POSSIBILITY OF SUCH DAMAGE.
 32  *
 33  * @(#) $Header: /usr/mash/src/repository/mash/mash-1/archive/mb-play.cc,v 1.23 2003/08/20 20:37:27 aswan Exp $
 34  */
 35 
 36 
 37 #include "archive/mb-play.h"
 38 #include "mb/mb.h"
 39 #include "mb/mb-nethost.h"
 40 #include "mb/mb-cmd.h"
 41 #include "tclcl-mappings.h"
 42 
 43 
 44 DEFINE_OTCL_CLASS(MBPlaybackMgr, "MB_Manager/Play") {
 45         INSTPROC(create_srm_source);
 46 }
 47 
 48 
 49 DEFINE_OTCL_CLASS(MBPlaybackStream, "ArchiveStream/Play/Mediaboard") {
 50 }
 51 
 52 
 53 int
 54 MBPlaybackMgr::create_srm_source(int argc, const char * const *argv)
 55 {
 56         TclObject *streamObj;
 57         const char *uid, *addr, *userName;
 58 
 59         BEGIN_PARSE_ARGS(argc, argv);
 60         ARG(streamObj); // TclObject *
 61         ARG(uid);       // char *
 62         ARG(addr);      // char *
 63         ARG(userName);  // char *
 64         END_PARSE_ARGS;
 65 
 66         if (sources_==NULL) {
 67                 sources_ = session()->source_manager()->local_sources();
 68         }
 69         // reset the indices
 70         nextADUIdx_ = fillSAIdx_ = NULL;
 71 
 72 
 73         MBPlaybackStream *stream = (MBPlaybackStream *) streamObj;
 74         Tcl &tcl = Tcl::instance();
 75         SRM_Source *source;
 76         MBPlaybackRcvr *rcvr;
 77 
 78         // create the SRM source for this stream
 79         if (session()->Invoke("create-local", uid, addr, userName,
 80                                      NULL)==TCL_ERROR) {
 81                 tcl.resultf("error creating SRM source for new stream: %s",
 82                             tcl.result());
 83                 goto error;
 84         }
 85 
 86         // attach the stream and SRM rcvr objects together;
 87         source = (SRM_Source*)tcl.lookup(tcl.result());
 88         if (source!=NULL && (rcvr = (MBPlaybackRcvr*)source->handler())!=NULL){
 89                 stream->attach(rcvr);
 90                 rcvr->attach(stream);
 91         }
 92         else {
 93                 tcl.resultf("invalid SRM source or packet handler object");
 94                 goto error;
 95         }
 96 
 97         return TCL_OK;
 98 
 99 error:
100         tcl.add_errorf("\ninvoked from within MBPlaybackMgr::"
101                        "NewStream()");
102         return TCL_ERROR;
103 }
104 
105 
106 int
107 MBPlaybackMgr::next_ADU(u_char *pb, int len, srm_src &id, int &pkt_type,
108                         int &next)
109 {
110         next=0;
111         pkt_type = APP_DATA;
112         // TODO: should poll all receivers or preq as well
113 
114         if (sources_==NULL) {
115                 MTrace(trcArchive, ("Haven't created any sources"));
116                 return 0;
117         }
118 
119         if (nextADUIdx_==NULL) {
120                 nextADUIdx_ = sources_->getFirst();
121         }
122         else {
123                 nextADUIdx_ = sources_->getNext(nextADUIdx_);
124                 if (nextADUIdx_==NULL) nextADUIdx_ = sources_->getFirst();
125         }
126 
127         int outlen=0;
128         MBPlaybackRcvr *rcvr;
129         while ((outlen==0 && next==0)
130                && sources_->IsDone(nextADUIdx_)==FALSE) {
131                 rcvr = (MBPlaybackRcvr*) (sources_->getData(nextADUIdx_)->
132                                           handler());
133                 outlen = rcvr->NextADU(pb, len, next, NULL);
134                 if (outlen==0 && next==0) {
135                         nextADUIdx_ = sources_->getNext(nextADUIdx_);
136                 } else {
137                         // set the id of this guy
138                         id = rcvr->getSrcId();
139                 }
140         }
141         return outlen;
142 }
143 
144 
145 
146 int
147 MBPlaybackMgr::periodic_update(Byte *pb)
148 {
149         if (sources_==NULL) {
150                 MTrace(trcArchive, ("Haven't created any sources"));
151                 return 0;
152         }
153 
154         if (fillSAIdx_==NULL) {
155                 fillSAIdx_ = sources_->getFirst();
156         }
157         else {
158                 fillSAIdx_ = sources_->getNext(fillSAIdx_);
159                 if (fillSAIdx_==NULL) fillSAIdx_ = sources_->getFirst();
160         }
161 
162         int outlen=0;
163         MBPlaybackRcvr *rcvr;
164         while (outlen==0 && sources_->IsDone(fillSAIdx_)==FALSE) {
165                 rcvr = (MBPlaybackRcvr*) (sources_->getData(fillSAIdx_)->
166                                           handler());
167                 outlen = rcvr->FillSA(pb, getMTU());
168                 if (outlen==0) {
169                         fillSAIdx_ = sources_->getNext(fillSAIdx_);
170                 }
171         }
172 
173         if (outlen==0) {
174                 // I don't have anything to send; I should reset the SA timer
175                 // FIXME: ignoring the return code from this method
176                 session()->sa_timer()->Invoke("reset", NULL);
177         }
178         return outlen;
179 }
180 
181 
182 int
183 MBPlaybackRcvr::handleCmd(MBCmd * /*pCmd*/, MBPageObject * /*pPage*/,
184                           const MBTime& /*oldTime*/, const MBTime& /*newTime*/)
185 {
186         // ignore any packets we get from the network
187         return TRUE;
188 }
189 
190 
191 Bool
192 MBPlaybackRcvr::Dispatch(MBCmd *pCmd, MBPageObject *pPage)
193 {
194         char foo1[100], foo2[100];
195         strcpy(foo1, (char*)pPage->getId().sid);
196         strcpy(foo2, (char*)getSrcId());
197         MTrace(trcArchive|trcExcessive, ("##^^ Page %s:%d (%p) for receiver "
198                                          "(%s, %p) has maxSeqno %d "
199                                          "before Dispatch",
200                                          foo1, pPage->getId().uid, pPage,
201                                          foo2, this,
202                                          pPage->getMaxSeqno()));
203         /* Note: right now handlecmd just calls a dummy executecmd
204          * that does nothing, so the time paramaters does not matter,
205          * we should change it to some sensible value if that is not
206          * the case */
207         if (MBBaseRcvr::handleCmd(pCmd, pPage, cMBTimeAny, cMBTimeAny)
208             != MB_EXE_OK)
209                 return FALSE;
210         MTrace(trcArchive|trcExcessive, ("##^^ Page %s:%d (%p) for receiver "
211                                          "(%s, %p) has maxSeqno %d "
212                                          "before Dispatch for command %d",
213                                          foo1, pPage->getId().uid, pPage,
214                                          foo2, this, pPage->getMaxSeqno(),
215                                          pCmd->getSeqno()));
216         int dataSize = pPage->RequestSendAmount()+sizeof(Pkt_DataHdr);
217         getMgr()->RequestSend(dataSize);
218 
219         // notify all observers
220         bytesSent_ += dataSize;
221         stream_->Invokef("notify_observers bytes_sent %lu",
222                          (unsigned long) bytesSent_);
223         return TRUE;
224 }
225 
226 
227 
228 void
229 MBPlaybackStream::Clip(timeval /*start*/, timeval end)
230 {
231         endTS_ = logical2mb(end.tv_sec, end.tv_usec);
232 }
233 
234 
235 void
236 MBPlaybackStream::LTS_Speed()
237 {
238         PlaybackStream::LTS_Speed();
239 }
240 
241 
242 void
243 MBPlaybackStream::LTS_Reference()
244 {
245         PlaybackStream::LTS_Reference();
246 }
247 
248 
249 int
250 MBPlaybackStream::ReadRecord()
251 {
252         u_int16_t len;
253         DataFile *file;
254         PageId pageId;
255         u_int32_t sn;
256         Tcl &tcl = Tcl::instance();
257 
258         file = DataFile_();
259         if (firstTime_==TRUE) {
260                 if (file->Seek(file->getHeaderSize(), SEEK_SET)==TCL_ERROR)
261                         goto error;
262                 firstTime_ = FALSE;
263         }
264 
265         state.bufferLen = sizeof(Pkt_PageHdr);
266         if (state.buffer.alloc(state.bufferLen)==FALSE) {
267                 tcl.resultf("buffer allocation error");
268                 goto error;
269         }
270 
271         if (file->Read(state.buffer.pb, sizeof(Pkt_PageHdr)) <= 0) {
272                 tcl.resultf("cannot read data file: %s",
273                             strerror(Tcl_GetErrno()));
274                 goto error;
275         }
276 
277         Pkt_PageHdr *pHdr;
278         pHdr = (Pkt_PageHdr*) (state.buffer.pb);
279         net2host(pHdr->pd_page, pageId);
280 
281         // modify the srcId field in the PageId so that it doesn't interfere
282         // with a normal receiver
283         pHdr->pd_page.sid = rcvr_->getSrcId();
284 
285         state.sseq   = net2host(pHdr->pd_sseq);
286         state.eseq   = net2host(pHdr->pd_eseq);
287         MTrace(trcArchive, ("Reading record with seqnos %d to %d (%d)",
288                 state.sseq, state.eseq, MYNUM(this)));
289 
290         for (sn=state.sseq; sn <= state.eseq; sn++) {
291                 if (state.buffer.alloc(state.bufferLen +
292                                        sizeof(Pkt_CmdHdr))==FALSE) {
293                         tcl.resultf("cannot allocate memory");
294                         goto error;
295                 }
296                 if (file->Read(state.buffer.pb + state.bufferLen,
297                                sizeof(Pkt_CmdHdr)) < 0) {
298                         tcl.resultf("cannot read data file: %s",
299                             strerror(Tcl_GetErrno()));
300                         goto error;
301                 }
302 
303                 len = net2host(((Pkt_CmdHdr*)(state.buffer.pb +
304                                               state.bufferLen))->dh_len);
305                 if (state.buffer.alloc(state.bufferLen + len)==FALSE) {
306                         tcl.resultf("cannot allocate memory");
307                         goto error;
308                 }
309                 if (file->Read((state.buffer.pb + state.bufferLen +
310                                 sizeof(Pkt_CmdHdr)),
311                                len - sizeof(Pkt_CmdHdr)) < 0) {
312                         tcl.resultf("cannot read data file: %s",
313                             strerror(Tcl_GetErrno()));
314                         goto error;
315                 }
316                 state.bufferLen += len;
317         }
318 
319         state.pPage = rcvr_->DefinePage(pageId);
320         state.seqno = state.sseq;
321         state.pb    = state.buffer.pb + sizeof(Pkt_PageHdr);
322         MTrace(trcArchive, ("Successfully read the data (%d)", MYNUM(this)));
323         return TCL_OK;
324 
325 error:
326         tcl.add_errorf("\ninvoked from within MBPlaybackStream::ReadRecord()");
327         return TCL_ERROR;
328 }
329 
330 
331 int
332 MBPlaybackStream::NextEvent(timeval &logical)
333 {
334         timeval now;
335         /*, diff_tv;
336         u_int32_t nowTS;*/
337         Pkt_CmdHdr *pHdr;
338         u_int32_t timestamp;
339 
340         if (state.seqno > state.eseq) {
341                 // we are done with the previous record; fetch the next one!
342                 if (DataFile_()->Eof()==TRUE) {
343                         logical.tv_sec = logical.tv_usec = 0;
344                         return TCL_OK;
345                 }
346 
347                 if (ReadRecord()==TCL_ERROR) {
348                         if (DataFile_()->Eof()==TRUE) {
349                                 logical.tv_sec = logical.tv_usec = 0;
350                                 return TCL_OK;
351                         }
352                         else goto error;
353                 }
354         }
355 
356         pHdr = (Pkt_CmdHdr*) state.pb;
357         timestamp = net2host(pHdr->dh_ts);
358         if (endTS_ > 0 && timestamp > endTS_) {
359                 // we are done
360                 MTrace(trcArchive, ("Exceeded ending ts (%lu): %lu (%d)",
361                                     (unsigned long) endTS_,
362                                     (unsigned long) timestamp, MYNUM(this)));
363                 logical.tv_sec = logical.tv_usec = 0;
364                 return TCL_OK;
365         }
366 
367         now = LTS_()->NowLogical();
368         logical = mb2logical(timestamp, now);
369         return TCL_OK;
370 
371 error:
372         Tcl::instance().add_errorf("\ninvoked from within "
373                                    "MBPlaybackStream::NextEvent()");
374         return TCL_ERROR;
375 }
376 
377 
378 void
379 MBPlaybackStream::DoEvent()
380 {
381         MBCmd* pCmd=MBCmd::Create(state.seqno, state.pb);
382         state.seqno++;
383         state.pb += net2host(((Pkt_CmdHdr*)state.pb)->dh_len);
384 
385         MTrace(trcArchive, ("Doing an event for page %ld, cmd %u, type %u "
386                             "(%d)", state.pPage->getId().uid, state.seqno-1,
387                             pCmd->getType(), MYNUM(this)));
388 
389         if (!pCmd) {
390                 // should treat as an error packet and ask for resend?
391                 SignalError(("Received corrupted packet: corrupted command"));
392                 return;
393         }
394 
395         // modify the packet of this command to the current system time!
396         timeval now = LTS_()->NowSystem();
397         pCmd->SetTimeStamp(tod2mb(now));
398 
399         if (!rcvr_->Dispatch(pCmd, state.pPage)) {
400                 SignalError(("Received corrupted or invalid command"));
401                 return;
402         }
403 }
404 

~ [ source navigation ] ~ [ diff markup ] ~ [ identifier search ] ~ [ freetext search ] ~ [ file search ] ~

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.