1 /* 2 * Copyright 2012-2014, Rene Gollent, rene@gollent.com. 3 * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de. 4 * Distributed under the terms of the MIT License. 5 */ 6 7 #include "Worker.h" 8 9 #include <AutoDeleter.h> 10 #include <AutoLocker.h> 11 12 13 // pragma mark - JobKey 14 15 16 JobKey::~JobKey() 17 { 18 } 19 20 21 // pragma mark - SimpleJobKey 22 23 24 SimpleJobKey::SimpleJobKey(const void* object, uint32 type) 25 : 26 object(object), 27 type(type) 28 { 29 } 30 31 32 SimpleJobKey::SimpleJobKey(const SimpleJobKey& other) 33 : 34 object(other.object), 35 type(other.type) 36 { 37 } 38 39 40 size_t 41 SimpleJobKey::HashValue() const 42 { 43 return (size_t)(addr_t)object ^ (size_t)type; 44 } 45 46 47 bool 48 SimpleJobKey::operator==(const JobKey& other) const 49 { 50 const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other); 51 return otherKey != NULL && object == otherKey->object 52 && type == otherKey->type; 53 } 54 55 56 SimpleJobKey& 57 SimpleJobKey::operator=(const SimpleJobKey& other) 58 { 59 object = other.object; 60 type = other.type; 61 return *this; 62 } 63 64 65 // #pragma mark - JobListener 66 67 68 JobListener::~JobListener() 69 { 70 } 71 72 73 void 74 JobListener::JobStarted(Job* job) 75 { 76 } 77 78 79 void 80 JobListener::JobDone(Job* job) 81 { 82 } 83 84 85 void 86 JobListener::JobWaitingForInput(Job* job) 87 { 88 } 89 90 91 void 92 JobListener::JobFailed(Job* job) 93 { 94 } 95 96 97 void 98 JobListener::JobAborted(Job* job) 99 { 100 } 101 102 103 // #pragma mark - Job 104 105 106 Job::Job() 107 : 108 fWorker(NULL), 109 fState(JOB_STATE_UNSCHEDULED), 110 fDependency(NULL), 111 fWaitStatus(JOB_DEPENDENCY_NOT_FOUND), 112 fListeners(10) 113 { 114 } 115 116 117 Job::~Job() 118 { 119 } 120 121 122 job_wait_status 123 Job::WaitFor(const JobKey& key) 124 { 125 return fWorker->WaitForJob(this, key); 126 } 127 128 129 status_t 130 Job::WaitForUserInput() 131 { 132 return fWorker->WaitForUserInput(this); 133 } 134 135 136 void 137 Job::SetDescription(const char* format, ...) 138 { 139 va_list args; 140 va_start(args, format); 141 fDescription.SetToFormatVarArgs(format, args); 142 } 143 144 145 void 146 Job::SetWorker(Worker* worker) 147 { 148 fWorker = worker; 149 } 150 151 152 void 153 Job::SetState(job_state state) 154 { 155 fState = state; 156 } 157 158 159 void 160 Job::SetDependency(Job* job) 161 { 162 fDependency = job; 163 } 164 165 166 void 167 Job::SetWaitStatus(job_wait_status status) 168 { 169 fWaitStatus = status; 170 switch (fWaitStatus) { 171 case JOB_DEPENDENCY_ACTIVE: 172 case JOB_USER_INPUT_WAITING: 173 fState = JOB_STATE_WAITING; 174 break; 175 default: 176 fState = JOB_STATE_ACTIVE; 177 break; 178 } 179 } 180 181 182 status_t 183 Job::AddListener(JobListener* listener) 184 { 185 return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY; 186 } 187 188 189 void 190 Job::RemoveListener(JobListener* listener) 191 { 192 fListeners.RemoveItem(listener); 193 } 194 195 196 void 197 Job::NotifyListeners() 198 { 199 int32 count = fListeners.CountItems(); 200 for (int32 i = count - 1; i >= 0; i--) { 201 JobListener* listener = fListeners.ItemAt(i); 202 switch (fState) { 203 case JOB_STATE_ACTIVE: 204 listener->JobStarted(this); 205 break; 206 case JOB_STATE_WAITING: 207 if (fWaitStatus == JOB_USER_INPUT_WAITING) 208 listener->JobWaitingForInput(this); 209 break; 210 case JOB_STATE_SUCCEEDED: 211 listener->JobDone(this); 212 break; 213 case JOB_STATE_FAILED: 214 listener->JobFailed(this); 215 break; 216 case JOB_STATE_ABORTED: 217 default: 218 listener->JobAborted(this); 219 break; 220 } 221 } 222 } 223 224 225 // #pragma mark - Worker 226 227 228 Worker::Worker() 229 : 230 fLock("worker"), 231 fWorkerThread(-1), 232 fTerminating(false) 233 { 234 } 235 236 237 Worker::~Worker() 238 { 239 ShutDown(); 240 241 if (fWorkerThread >= 0) 242 wait_for_thread(fWorkerThread, NULL); 243 } 244 245 246 status_t 247 Worker::Init() 248 { 249 // check lock 250 status_t error = fLock.InitCheck(); 251 if (error != B_OK) 252 return error; 253 254 // init jobs table 255 error = fJobs.Init(); 256 if (error != B_OK) 257 return error; 258 259 // create semaphore for the worker 260 fWorkToDoSem = create_sem(0, "work to do"); 261 if (fWorkToDoSem < 0) 262 return fWorkToDoSem; 263 264 // spawn worker thread 265 fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY, 266 this); 267 if (fWorkerThread < 0) 268 return fWorkerThread; 269 270 resume_thread(fWorkerThread); 271 272 return B_OK; 273 } 274 275 276 void 277 Worker::ShutDown() 278 { 279 AutoLocker<Worker> locker(this); 280 281 if (fTerminating) 282 return; 283 284 fTerminating = true; 285 286 // abort all jobs 287 Job* job = fJobs.Clear(true); 288 while (job != NULL) { 289 Job* nextJob = job->fNext; 290 _AbortJob(job, false); 291 job = nextJob; 292 293 } 294 295 // let the work thread terminate 296 delete_sem(fWorkToDoSem); 297 fWorkToDoSem = -1; 298 } 299 300 301 status_t 302 Worker::ScheduleJob(Job* job, JobListener* listener) 303 { 304 if (job == NULL) 305 return B_NO_MEMORY; 306 307 BReference<Job> jobReference(job, true); 308 AutoLocker<Worker> locker(this); 309 310 if (fTerminating) 311 return B_ERROR; 312 313 if (listener != NULL) { 314 status_t error = job->AddListener(listener); 315 if (error != B_OK) 316 return error; 317 } 318 319 bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty(); 320 321 job->SetWorker(this); 322 job->SetState(JOB_STATE_UNSCHEDULED); 323 fJobs.Insert(job); 324 fUnscheduledJobs.Add(jobReference.Detach()); 325 326 if (notify) 327 release_sem(fWorkToDoSem); 328 329 return B_OK; 330 } 331 332 333 void 334 Worker::AbortJob(const JobKey& key) 335 { 336 AutoLocker<Worker> locker(this); 337 338 Job* job = fJobs.Lookup(key); 339 if (job == NULL) 340 return; 341 342 _AbortJob(job, true); 343 } 344 345 346 Job* 347 Worker::GetJob(const JobKey& key) 348 { 349 AutoLocker<Worker> locker(this); 350 return fJobs.Lookup(key); 351 } 352 353 354 status_t 355 Worker::ResumeJob(Job* job) 356 { 357 AutoLocker<Worker> locker(this); 358 359 for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) { 360 if (it.Current() == job) { 361 it.Remove(); 362 job->SetState(JOB_STATE_UNSCHEDULED); 363 fUnscheduledJobs.Add(job); 364 release_sem(fWorkToDoSem); 365 return B_OK; 366 } 367 } 368 369 return B_ENTRY_NOT_FOUND; 370 } 371 372 373 bool 374 Worker::HasPendingJobs() 375 { 376 AutoLocker<Worker> locker(this); 377 return !fJobs.IsEmpty(); 378 } 379 380 381 status_t 382 Worker::AddListener(const JobKey& key, JobListener* listener) 383 { 384 AutoLocker<Worker> locker(this); 385 386 Job* job = fJobs.Lookup(key); 387 if (job == NULL) 388 return B_ENTRY_NOT_FOUND; 389 390 return job->AddListener(listener); 391 } 392 393 394 void 395 Worker::RemoveListener(const JobKey& key, JobListener* listener) 396 { 397 AutoLocker<Worker> locker(this); 398 399 if (Job* job = fJobs.Lookup(key)) 400 job->RemoveListener(listener); 401 } 402 403 404 job_wait_status 405 Worker::WaitForJob(Job* waitingJob, const JobKey& key) 406 { 407 AutoLocker<Worker> locker(this); 408 409 // don't wait when the game is over anyway 410 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED) 411 return JOB_DEPENDENCY_ABORTED; 412 413 Job* job = fJobs.Lookup(key); 414 if (job == NULL) 415 return JOB_DEPENDENCY_NOT_FOUND; 416 417 waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE); 418 waitingJob->SetDependency(job); 419 job->DependentJobs().Add(waitingJob); 420 421 return waitingJob->WaitStatus(); 422 } 423 424 425 status_t 426 Worker::WaitForUserInput(Job* waitingJob) 427 { 428 AutoLocker<Worker> locker(this); 429 430 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED) 431 return B_INTERRUPTED; 432 433 waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING); 434 waitingJob->NotifyListeners(); 435 fSuspendedJobs.Add(waitingJob); 436 437 return B_OK; 438 } 439 440 441 /*static*/ status_t 442 Worker::_WorkerLoopEntry(void* data) 443 { 444 return ((Worker*)data)->_WorkerLoop(); 445 } 446 447 448 status_t 449 Worker::_WorkerLoop() 450 { 451 _ProcessJobs(); 452 453 // clean up aborted jobs 454 AutoLocker<Worker> locker(this); 455 while (Job* job = fAbortedJobs.RemoveHead()) 456 _FinishJob(job); 457 458 return B_OK; 459 } 460 461 462 void 463 Worker::_ProcessJobs() 464 { 465 while (true) { 466 AutoLocker<Worker> locker(this); 467 468 // wait for next job 469 if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) { 470 locker.Unlock(); 471 472 status_t error = acquire_sem(fWorkToDoSem); 473 if (error != B_OK) { 474 if (error == B_INTERRUPTED) { 475 locker.Lock(); 476 continue; 477 } 478 break; 479 } 480 481 locker.Lock(); 482 } 483 484 // clean up aborted jobs 485 while (Job* job = fAbortedJobs.RemoveHead()) 486 _FinishJob(job); 487 488 // process the next job 489 if (Job* job = fUnscheduledJobs.RemoveHead()) { 490 job->SetState(JOB_STATE_ACTIVE); 491 job->NotifyListeners(); 492 493 locker.Unlock(); 494 status_t error = job->Do(); 495 locker.Lock(); 496 497 if (job->State() == JOB_STATE_ACTIVE) { 498 job->SetState( 499 error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED); 500 } else if (job->State() == JOB_STATE_WAITING) 501 continue; 502 503 _FinishJob(job); 504 } 505 } 506 } 507 508 509 void 510 Worker::_AbortJob(Job* job, bool removeFromTable) 511 { 512 switch (job->State()) { 513 case JOB_STATE_ABORTED: 514 return; 515 516 case JOB_STATE_UNSCHEDULED: 517 fUnscheduledJobs.Remove(job); 518 fAbortedJobs.Add(job); 519 break; 520 521 case JOB_STATE_WAITING: 522 { 523 Job* dependency = job->Dependency(); 524 if (dependency != NULL) 525 dependency->DependentJobs().Remove(job); 526 job->SetDependency(NULL); 527 break; 528 } 529 case JOB_STATE_ACTIVE: 530 case JOB_STATE_FAILED: 531 case JOB_STATE_SUCCEEDED: 532 default: 533 break; 534 } 535 536 job->SetState(JOB_STATE_ABORTED); 537 if (removeFromTable) 538 fJobs.Remove(job); 539 } 540 541 542 void 543 Worker::_FinishJob(Job* job) 544 { 545 // wake up dependent jobs 546 if (!job->DependentJobs().IsEmpty()) { 547 job_wait_status waitStatus; 548 switch (job->State()) { 549 case JOB_STATE_ABORTED: 550 waitStatus = JOB_DEPENDENCY_ABORTED; 551 break; 552 case JOB_STATE_FAILED: 553 waitStatus = JOB_DEPENDENCY_FAILED; 554 break; 555 case JOB_STATE_SUCCEEDED: 556 waitStatus = JOB_DEPENDENCY_SUCCEEDED; 557 break; 558 559 case JOB_STATE_UNSCHEDULED: 560 case JOB_STATE_WAITING: 561 case JOB_STATE_ACTIVE: 562 default: 563 // should never happen 564 waitStatus = JOB_DEPENDENCY_NOT_FOUND; 565 break; 566 } 567 568 while (Job* dependentJob = job->DependentJobs().RemoveHead()) { 569 dependentJob->SetDependency(NULL); 570 dependentJob->SetWaitStatus(waitStatus); 571 fUnscheduledJobs.Add(dependentJob); 572 } 573 574 release_sem(fWorkToDoSem); 575 } 576 577 if (job->State() != JOB_STATE_ABORTED) 578 fJobs.Remove(job); 579 job->NotifyListeners(); 580 job->ReleaseReference(); 581 } 582