1 /*********************************************************************** 2 * AUTHOR: Marcus Overhagen 3 * FILE: TimeSource.cpp 4 * DESCR: 5 ***********************************************************************/ 6 #include <TimeSource.h> 7 #include <Autolock.h> 8 #include <string.h> 9 #include "debug.h" 10 #include "DataExchange.h" 11 #include "ServerInterface.h" 12 #include "TimeSourceObject.h" 13 14 #define DEBUG_TIMESOURCE 0 15 16 #if DEBUG_TIMESOURCE 17 #define TRACE_TIMESOURCE printf 18 #else 19 #define TRACE_TIMESOURCE if (1) {} else printf 20 #endif 21 22 namespace BPrivate { namespace media { 23 24 #define _atomic_read(p) atomic_or((p), 0) 25 26 #define TS_AREA_SIZE B_PAGE_SIZE // must be multiple of page size 27 #define TS_INDEX_COUNT 128 // must be power of two 28 29 struct TimeSourceTransmit // sizeof(TimeSourceTransmit) must be <= TS_AREA_SIZE 30 { 31 int32 readindex; 32 int32 writeindex; 33 int32 isrunning; 34 bigtime_t realtime[TS_INDEX_COUNT]; 35 bigtime_t perftime[TS_INDEX_COUNT]; 36 float drift[TS_INDEX_COUNT]; 37 }; 38 39 struct SlaveNodes 40 { 41 #define SLAVE_NODES_COUNT 300 42 BLocker locker; 43 int32 count; 44 media_node_id node_id[SLAVE_NODES_COUNT]; 45 port_id node_port[SLAVE_NODES_COUNT]; 46 }; 47 48 } } 49 50 51 /************************************************************* 52 * protected BTimeSource 53 *************************************************************/ 54 55 BTimeSource::~BTimeSource() 56 { 57 CALLED(); 58 if (fArea > 0) 59 delete_area(fArea); 60 if (fSlaveNodes) 61 free(fSlaveNodes); 62 } 63 64 /************************************************************* 65 * public BTimeSource 66 *************************************************************/ 67 68 status_t 69 BTimeSource::SnoozeUntil(bigtime_t performance_time, 70 bigtime_t with_latency, 71 bool retry_signals) 72 { 73 CALLED(); 74 bigtime_t time; 75 status_t err; 76 do { 77 time = RealTimeFor(performance_time, with_latency); 78 err = snooze_until(time, B_SYSTEM_TIMEBASE); 79 } while (err == B_INTERRUPTED && retry_signals); 80 return err; 81 } 82 83 84 bigtime_t 85 BTimeSource::Now() 86 { 87 PRINT(8, "CALLED BTimeSource::Now()\n"); 88 return PerformanceTimeFor(RealTime()); 89 } 90 91 92 bigtime_t 93 BTimeSource::PerformanceTimeFor(bigtime_t real_time) 94 { 95 PRINT(8, "CALLED BTimeSource::PerformanceTimeFor()\n"); 96 bigtime_t last_perf_time; 97 bigtime_t last_real_time; 98 float last_drift; 99 100 while (GetTime(&last_perf_time, &last_real_time, &last_drift) != B_OK) 101 snooze(1); 102 103 return last_perf_time + (bigtime_t)((real_time - last_real_time) * last_drift); 104 } 105 106 107 bigtime_t 108 BTimeSource::RealTimeFor(bigtime_t performance_time, 109 bigtime_t with_latency) 110 { 111 PRINT(8, "CALLED BTimeSource::RealTimeFor()\n"); 112 113 if (fIsRealtime) { 114 return performance_time - with_latency; 115 } 116 117 bigtime_t last_perf_time; 118 bigtime_t last_real_time; 119 float last_drift; 120 121 while (GetTime(&last_perf_time, &last_real_time, &last_drift) != B_OK) 122 snooze(1); 123 124 return last_real_time - with_latency + (bigtime_t)((performance_time - last_perf_time) / last_drift); 125 } 126 127 128 bool 129 BTimeSource::IsRunning() 130 { 131 PRINT(8, "CALLED BTimeSource::IsRunning()\n"); 132 133 bool isrunning; 134 135 if (fIsRealtime) 136 isrunning = true; // The system time source is always running :) 137 else 138 isrunning = fBuf ? atomic_add(&fBuf->isrunning, 0) : fStarted; 139 140 TRACE_TIMESOURCE("BTimeSource::IsRunning() node %ld, port %ld, %s\n", fNodeID, fControlPort, isrunning ? "yes" : "no"); 141 return isrunning; 142 } 143 144 145 status_t 146 BTimeSource::GetTime(bigtime_t *performance_time, 147 bigtime_t *real_time, 148 float *drift) 149 { 150 PRINT(8, "CALLED BTimeSource::GetTime()\n"); 151 152 if (fIsRealtime) { 153 *performance_time = *real_time = system_time(); 154 *drift = 1.0f; 155 return B_OK; 156 } 157 // if (fBuf == 0) { 158 // PRINT(1, "BTimeSource::GetTime: fBuf == 0, name %s, id %ld\n",Name(),ID()); 159 // *performance_time = *real_time = system_time(); 160 // *drift = 1.0f; 161 // return B_OK; 162 // } 163 164 int32 index; 165 index = _atomic_read(&fBuf->readindex); 166 index &= (TS_INDEX_COUNT - 1); 167 *real_time = fBuf->realtime[index]; 168 *performance_time = fBuf->perftime[index]; 169 *drift = fBuf->drift[index]; 170 171 // if (*real_time == 0) { 172 // *performance_time = *real_time = system_time(); 173 // *drift = 1.0f; 174 // return B_OK; 175 // } 176 // printf("BTimeSource::GetTime timesource %ld, index %ld, perf %16Ld, real %16Ld, drift %2.2f\n", ID(), index, *performance_time, *real_time, *drift); 177 178 TRACE_TIMESOURCE("BTimeSource::GetTime timesource %ld, perf %16Ld, real %16Ld, drift %2.2f\n", ID(), *performance_time, *real_time, *drift); 179 return B_OK; 180 } 181 182 183 bigtime_t 184 BTimeSource::RealTime() 185 { 186 PRINT(8, "CALLED BTimeSource::RealTime()\n"); 187 return system_time(); 188 } 189 190 191 status_t 192 BTimeSource::GetStartLatency(bigtime_t *out_latency) 193 { 194 CALLED(); 195 *out_latency = 0; 196 return B_OK; 197 } 198 199 /************************************************************* 200 * protected BTimeSource 201 *************************************************************/ 202 203 204 BTimeSource::BTimeSource() : 205 BMediaNode("This one is never called"), 206 fStarted(false), 207 fArea(-1), 208 fBuf(NULL), 209 fSlaveNodes((BPrivate::media::SlaveNodes*)malloc(sizeof(BPrivate::media::SlaveNodes))), 210 fIsRealtime(false) 211 { 212 CALLED(); 213 AddNodeKind(B_TIME_SOURCE); 214 // printf("##### BTimeSource::BTimeSource() name %s, id %ld\n", Name(), ID()); 215 216 if (fSlaveNodes == NULL) { 217 ERROR("BTimeSource::BTimeSource() fSlaveNodes == NULL\n"); 218 return; 219 } 220 221 // initialize the slave node storage 222 fSlaveNodes->count = 0; 223 memset(&fSlaveNodes->node_id, 0, SLAVE_NODES_COUNT * sizeof(media_node_id)); 224 memset(&fSlaveNodes->node_port, 0, SLAVE_NODES_COUNT * sizeof(port_id)); 225 226 // This constructor is only called by real time sources that inherit 227 // BTimeSource. We create the communication area in FinishCreate(), 228 // since we don't have a correct ID() until this node is registered. 229 } 230 231 232 status_t 233 BTimeSource::HandleMessage(int32 message, 234 const void *rawdata, 235 size_t size) 236 { 237 PRINT(4, "BTimeSource::HandleMessage %#lx, node %ld\n", message, fNodeID); 238 239 switch (message) { 240 case TIMESOURCE_OP: 241 { 242 const time_source_op_info *data = static_cast<const time_source_op_info *>(rawdata); 243 switch (data->op) { 244 case B_TIMESOURCE_START: 245 DirectStart(data->real_time); 246 break; 247 case B_TIMESOURCE_STOP: 248 DirectStop(data->real_time, false); 249 break; 250 case B_TIMESOURCE_STOP_IMMEDIATELY: 251 DirectStop(data->real_time, true); 252 break; 253 case B_TIMESOURCE_SEEK: 254 DirectSeek(data->performance_time, data->real_time); 255 break; 256 } 257 status_t result; 258 result = TimeSourceOp(*data, NULL); 259 if (result != B_OK) { 260 ERROR("BTimeSource::HandleMessage: TimeSourceOp failed\n"); 261 } 262 return B_OK; 263 } 264 265 case TIMESOURCE_ADD_SLAVE_NODE: 266 { 267 const timesource_add_slave_node_command *data = static_cast<const timesource_add_slave_node_command *>(rawdata); 268 DirectAddMe(data->node); 269 return B_OK; 270 } 271 272 case TIMESOURCE_REMOVE_SLAVE_NODE: 273 { 274 const timesource_remove_slave_node_command *data = static_cast<const timesource_remove_slave_node_command *>(rawdata); 275 DirectRemoveMe(data->node); 276 return B_OK; 277 } 278 } 279 return B_ERROR; 280 } 281 282 283 void 284 BTimeSource::PublishTime(bigtime_t performance_time, 285 bigtime_t real_time, 286 float drift) 287 { 288 TRACE_TIMESOURCE("BTimeSource::PublishTime timesource %ld, perf %16Ld, real %16Ld, drift %2.2f\n", ID(), performance_time, real_time, drift); 289 if (0 == fBuf) { 290 ERROR("BTimeSource::PublishTime timesource %ld, fBuf = NULL\n", ID()); 291 fStarted = true; 292 return; 293 } 294 295 int32 index; 296 index = atomic_add(&fBuf->writeindex, 1); 297 index &= (TS_INDEX_COUNT - 1); 298 fBuf->realtime[index] = real_time; 299 fBuf->perftime[index] = performance_time; 300 fBuf->drift[index] = drift; 301 atomic_add(&fBuf->readindex, 1); 302 303 // printf("BTimeSource::PublishTime timesource %ld, write index %ld, perf %16Ld, real %16Ld, drift %2.2f\n", ID(), index, performance_time, real_time, drift); 304 } 305 306 307 void 308 BTimeSource::BroadcastTimeWarp(bigtime_t at_real_time, 309 bigtime_t new_performance_time) 310 { 311 CALLED(); 312 // calls BMediaNode::TimeWarp() of all slaved nodes 313 314 TRACE("BTimeSource::BroadcastTimeWarp: at_real_time %Ld, new_performance_time %Ld\n", at_real_time, new_performance_time); 315 316 BAutolock lock(fSlaveNodes->locker); 317 318 for (int i = 0, n = 0; i < SLAVE_NODES_COUNT && n != fSlaveNodes->count; i++) { 319 if (fSlaveNodes->node_id[i] != 0) { 320 node_time_warp_command cmd; 321 cmd.at_real_time = at_real_time; 322 cmd.to_performance_time = new_performance_time; 323 SendToPort(fSlaveNodes->node_port[i], NODE_TIME_WARP, &cmd, sizeof(cmd)); 324 n++; 325 } 326 } 327 } 328 329 330 void 331 BTimeSource::SendRunMode(run_mode mode) 332 { 333 CALLED(); 334 // send the run mode change to all slaved nodes 335 336 BAutolock lock(fSlaveNodes->locker); 337 338 for (int i = 0, n = 0; i < SLAVE_NODES_COUNT && n != fSlaveNodes->count; i++) { 339 if (fSlaveNodes->node_id[i] != 0) { 340 node_set_run_mode_command cmd; 341 cmd.mode = mode; 342 SendToPort(fSlaveNodes->node_port[i], NODE_SET_RUN_MODE, &cmd, sizeof(cmd)); 343 n++; 344 } 345 } 346 } 347 348 349 void 350 BTimeSource::SetRunMode(run_mode mode) 351 { 352 CALLED(); 353 BMediaNode::SetRunMode(mode); 354 SendRunMode(mode); 355 } 356 /************************************************************* 357 * private BTimeSource 358 *************************************************************/ 359 360 /* 361 //unimplemented 362 BTimeSource::BTimeSource(const BTimeSource &clone) 363 BTimeSource &BTimeSource::operator=(const BTimeSource &clone) 364 */ 365 366 status_t BTimeSource::_Reserved_TimeSource_0(void *) { return B_ERROR; } 367 status_t BTimeSource::_Reserved_TimeSource_1(void *) { return B_ERROR; } 368 status_t BTimeSource::_Reserved_TimeSource_2(void *) { return B_ERROR; } 369 status_t BTimeSource::_Reserved_TimeSource_3(void *) { return B_ERROR; } 370 status_t BTimeSource::_Reserved_TimeSource_4(void *) { return B_ERROR; } 371 status_t BTimeSource::_Reserved_TimeSource_5(void *) { return B_ERROR; } 372 373 /* explicit */ 374 BTimeSource::BTimeSource(media_node_id id) : 375 BMediaNode("This one is never called"), 376 fStarted(false), 377 fArea(-1), 378 fBuf(NULL), 379 fSlaveNodes(NULL), 380 fIsRealtime(false) 381 { 382 CALLED(); 383 AddNodeKind(B_TIME_SOURCE); 384 ASSERT(id > 0); 385 // printf("###### explicit BTimeSource::BTimeSource() id %ld, name %s\n", id, Name()); 386 387 // This constructor is only called by the derived BPrivate::media::TimeSourceObject objects 388 // We create a clone of the communication area 389 char name[32]; 390 area_id area; 391 sprintf(name, "__timesource_buf_%ld", id); 392 area = find_area(name); 393 if (area <= 0) { 394 ERROR("BTimeSource::BTimeSource couldn't find area, node %ld\n", id); 395 return; 396 } 397 sprintf(name, "__cloned_timesource_buf_%ld", id); 398 fArea = clone_area(name, reinterpret_cast<void **>(const_cast<BPrivate::media::TimeSourceTransmit **>(&fBuf)), B_ANY_ADDRESS, B_READ_AREA | B_WRITE_AREA, area); 399 if (fArea <= 0) { 400 ERROR("BTimeSource::BTimeSource couldn't clone area, node %ld\n", id); 401 return; 402 } 403 } 404 405 406 void 407 BTimeSource::FinishCreate() 408 { 409 CALLED(); 410 //printf("BTimeSource::FinishCreate(), id %ld\n", ID()); 411 412 char name[32]; 413 sprintf(name, "__timesource_buf_%ld", ID()); 414 fArea = create_area(name, reinterpret_cast<void **>(const_cast<BPrivate::media::TimeSourceTransmit **>(&fBuf)), B_ANY_ADDRESS, TS_AREA_SIZE, B_FULL_LOCK, B_READ_AREA | B_WRITE_AREA); 415 if (fArea <= 0) { 416 ERROR("BTimeSource::BTimeSource couldn't create area, node %ld\n", ID()); 417 fBuf = NULL; 418 return; 419 } 420 fBuf->readindex = 0; 421 fBuf->writeindex = 1; 422 fBuf->realtime[0] = 0; 423 fBuf->perftime[0] = 0; 424 fBuf->drift[0] = 1.0f; 425 fBuf->isrunning = fStarted; 426 } 427 428 429 status_t 430 BTimeSource::RemoveMe(BMediaNode *node) 431 { 432 CALLED(); 433 if (fKinds & NODE_KIND_SHADOW_TIMESOURCE) { 434 timesource_remove_slave_node_command cmd; 435 cmd.node = node->Node(); 436 SendToPort(fControlPort, TIMESOURCE_REMOVE_SLAVE_NODE, &cmd, sizeof(cmd)); 437 } else { 438 DirectRemoveMe(node->Node()); 439 } 440 return B_OK; 441 } 442 443 444 status_t 445 BTimeSource::AddMe(BMediaNode *node) 446 { 447 CALLED(); 448 if (fKinds & NODE_KIND_SHADOW_TIMESOURCE) { 449 timesource_add_slave_node_command cmd; 450 cmd.node = node->Node(); 451 SendToPort(fControlPort, TIMESOURCE_ADD_SLAVE_NODE, &cmd, sizeof(cmd)); 452 } else { 453 DirectAddMe(node->Node()); 454 } 455 return B_OK; 456 } 457 458 459 void 460 BTimeSource::DirectAddMe(const media_node &node) 461 { 462 CALLED(); 463 // XXX this code has race conditions and is pretty dumb, and it 464 // XXX won't detect nodes that crash and don't remove themself. 465 BAutolock lock(fSlaveNodes->locker); 466 467 if (fSlaveNodes->count == SLAVE_NODES_COUNT) { 468 ERROR("BTimeSource::DirectAddMe out of slave node slots\n"); 469 return; 470 } 471 for (int i = 0; i < SLAVE_NODES_COUNT; i++) { 472 if (fSlaveNodes->node_id[i] == 0) { 473 fSlaveNodes->node_id[i] = node.node; 474 fSlaveNodes->node_port[i] = node.port; 475 fSlaveNodes->count += 1; 476 if (fSlaveNodes->count == 1) { 477 // start the time source 478 time_source_op_info msg; 479 msg.op = B_TIMESOURCE_START; 480 msg.real_time = RealTime(); 481 TRACE_TIMESOURCE("starting time source %ld\n", ID()); 482 write_port(fControlPort, TIMESOURCE_OP, &msg, sizeof(msg)); 483 } 484 return; 485 } 486 } 487 ERROR("BTimeSource::DirectAddMe failed\n"); 488 } 489 490 void 491 BTimeSource::DirectRemoveMe(const media_node &node) 492 { 493 // XXX this code has race conditions and is pretty dumb, and it 494 // XXX won't detect nodes that crash and don't remove themself. 495 CALLED(); 496 BAutolock lock(fSlaveNodes->locker); 497 498 if (fSlaveNodes->count == 0) { 499 ERROR("BTimeSource::DirectRemoveMe no slots used\n"); 500 return; 501 } 502 for (int i = 0; i < SLAVE_NODES_COUNT; i++) { 503 if (fSlaveNodes->node_id[i] == node.node && fSlaveNodes->node_port[i] == node.port) { 504 fSlaveNodes->node_id[i] = 0; 505 fSlaveNodes->node_port[i] = 0; 506 fSlaveNodes->count -= 1; 507 if (fSlaveNodes->count == 0) { 508 // stop the time source 509 time_source_op_info msg; 510 msg.op = B_TIMESOURCE_STOP_IMMEDIATELY; 511 msg.real_time = RealTime(); 512 TRACE_TIMESOURCE("stopping time source %ld\n", ID()); 513 write_port(fControlPort, TIMESOURCE_OP, &msg, sizeof(msg)); 514 } 515 return; 516 } 517 } 518 ERROR("BTimeSource::DirectRemoveMe failed\n"); 519 } 520 521 void 522 BTimeSource::DirectStart(bigtime_t at) 523 { 524 UNIMPLEMENTED(); 525 if (fBuf) 526 atomic_or(&fBuf->isrunning, 1); 527 else 528 fStarted = true; 529 } 530 531 532 void 533 BTimeSource::DirectStop(bigtime_t at, 534 bool immediate) 535 { 536 UNIMPLEMENTED(); 537 if (fBuf) 538 atomic_and(&fBuf->isrunning, 0); 539 else 540 fStarted = false; 541 } 542 543 544 void 545 BTimeSource::DirectSeek(bigtime_t to, 546 bigtime_t at) 547 { 548 UNIMPLEMENTED(); 549 } 550 551 552 void 553 BTimeSource::DirectSetRunMode(run_mode mode) 554 { 555 UNIMPLEMENTED(); 556 } 557