1 /* 2 * Copyright 2012-2014, Rene Gollent, rene@gollent.com. 3 * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de. 4 * Distributed under the terms of the MIT License. 5 */ 6 7 #include "Worker.h" 8 9 #include <AutoDeleter.h> 10 #include <AutoLocker.h> 11 12 13 // pragma mark - JobKey 14 15 16 JobKey::~JobKey() 17 { 18 } 19 20 21 // pragma mark - SimpleJobKey 22 23 24 SimpleJobKey::SimpleJobKey(const void* object, uint32 type) 25 : 26 object(object), 27 type(type) 28 { 29 } 30 31 32 SimpleJobKey::SimpleJobKey(const SimpleJobKey& other) 33 : 34 object(other.object), 35 type(other.type) 36 { 37 } 38 39 40 size_t 41 SimpleJobKey::HashValue() const 42 { 43 return (size_t)(addr_t)object ^ (size_t)type; 44 } 45 46 47 bool 48 SimpleJobKey::operator==(const JobKey& other) const 49 { 50 const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other); 51 return otherKey != NULL && object == otherKey->object 52 && type == otherKey->type; 53 } 54 55 56 SimpleJobKey& 57 SimpleJobKey::operator=(const SimpleJobKey& other) 58 { 59 object = other.object; 60 type = other.type; 61 return *this; 62 } 63 64 65 // #pragma mark - JobListener 66 67 68 JobListener::~JobListener() 69 { 70 } 71 72 73 void 74 JobListener::JobStarted(Job* job) 75 { 76 } 77 78 79 void 80 JobListener::JobDone(Job* job) 81 { 82 } 83 84 85 void 86 JobListener::JobWaitingForInput(Job* job) 87 { 88 } 89 90 91 void 92 JobListener::JobFailed(Job* job) 93 { 94 } 95 96 97 void 98 JobListener::JobAborted(Job* job) 99 { 100 } 101 102 103 // #pragma mark - Job 104 105 106 Job::Job() 107 : 108 fWorker(NULL), 109 fState(JOB_STATE_UNSCHEDULED), 110 fDependency(NULL), 111 fWaitStatus(JOB_DEPENDENCY_NOT_FOUND), 112 fListeners(10) 113 { 114 } 115 116 117 Job::~Job() 118 { 119 } 120 121 122 job_wait_status 123 Job::WaitFor(const JobKey& key) 124 { 125 return fWorker->WaitForJob(this, key); 126 } 127 128 129 status_t 130 Job::WaitForUserInput() 131 { 132 return fWorker->WaitForUserInput(this); 133 } 134 135 136 void 137 Job::SetDescription(const char* format, ...) 138 { 139 va_list args; 140 va_start(args, format); 141 fDescription.SetToFormatVarArgs(format, args); 142 va_end(args); 143 } 144 145 146 void 147 Job::SetWorker(Worker* worker) 148 { 149 fWorker = worker; 150 } 151 152 153 void 154 Job::SetState(job_state state) 155 { 156 fState = state; 157 } 158 159 160 void 161 Job::SetDependency(Job* job) 162 { 163 fDependency = job; 164 } 165 166 167 void 168 Job::SetWaitStatus(job_wait_status status) 169 { 170 fWaitStatus = status; 171 switch (fWaitStatus) { 172 case JOB_DEPENDENCY_ACTIVE: 173 case JOB_USER_INPUT_WAITING: 174 fState = JOB_STATE_WAITING; 175 break; 176 default: 177 fState = JOB_STATE_ACTIVE; 178 break; 179 } 180 } 181 182 183 status_t 184 Job::AddListener(JobListener* listener) 185 { 186 return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY; 187 } 188 189 190 void 191 Job::RemoveListener(JobListener* listener) 192 { 193 fListeners.RemoveItem(listener); 194 } 195 196 197 void 198 Job::NotifyListeners() 199 { 200 int32 count = fListeners.CountItems(); 201 for (int32 i = count - 1; i >= 0; i--) { 202 JobListener* listener = fListeners.ItemAt(i); 203 switch (fState) { 204 case JOB_STATE_ACTIVE: 205 listener->JobStarted(this); 206 break; 207 case JOB_STATE_WAITING: 208 if (fWaitStatus == JOB_USER_INPUT_WAITING) 209 listener->JobWaitingForInput(this); 210 break; 211 case JOB_STATE_SUCCEEDED: 212 listener->JobDone(this); 213 break; 214 case JOB_STATE_FAILED: 215 listener->JobFailed(this); 216 break; 217 case JOB_STATE_ABORTED: 218 default: 219 listener->JobAborted(this); 220 break; 221 } 222 } 223 } 224 225 226 // #pragma mark - Worker 227 228 229 Worker::Worker() 230 : 231 fLock("worker"), 232 fWorkerThread(-1), 233 fTerminating(false) 234 { 235 } 236 237 238 Worker::~Worker() 239 { 240 ShutDown(); 241 242 if (fWorkerThread >= 0) 243 wait_for_thread(fWorkerThread, NULL); 244 } 245 246 247 status_t 248 Worker::Init() 249 { 250 // check lock 251 status_t error = fLock.InitCheck(); 252 if (error != B_OK) 253 return error; 254 255 // init jobs table 256 error = fJobs.Init(); 257 if (error != B_OK) 258 return error; 259 260 // create semaphore for the worker 261 fWorkToDoSem = create_sem(0, "work to do"); 262 if (fWorkToDoSem < 0) 263 return fWorkToDoSem; 264 265 // spawn worker thread 266 fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY, 267 this); 268 if (fWorkerThread < 0) 269 return fWorkerThread; 270 271 resume_thread(fWorkerThread); 272 273 return B_OK; 274 } 275 276 277 void 278 Worker::ShutDown() 279 { 280 AutoLocker<Worker> locker(this); 281 282 if (fTerminating) 283 return; 284 285 fTerminating = true; 286 287 // abort all jobs 288 Job* job = fJobs.Clear(true); 289 while (job != NULL) { 290 Job* nextJob = job->fNext; 291 _AbortJob(job, false); 292 job = nextJob; 293 294 } 295 296 // let the work thread terminate 297 delete_sem(fWorkToDoSem); 298 fWorkToDoSem = -1; 299 } 300 301 302 status_t 303 Worker::ScheduleJob(Job* job, JobListener* listener) 304 { 305 if (job == NULL) 306 return B_NO_MEMORY; 307 308 BReference<Job> jobReference(job, true); 309 AutoLocker<Worker> locker(this); 310 311 if (fTerminating) 312 return B_ERROR; 313 314 if (listener != NULL) { 315 status_t error = job->AddListener(listener); 316 if (error != B_OK) 317 return error; 318 } 319 320 bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty(); 321 322 job->SetWorker(this); 323 job->SetState(JOB_STATE_UNSCHEDULED); 324 fJobs.Insert(job); 325 fUnscheduledJobs.Add(jobReference.Detach()); 326 327 if (notify) 328 release_sem(fWorkToDoSem); 329 330 return B_OK; 331 } 332 333 334 void 335 Worker::AbortJob(const JobKey& key) 336 { 337 AutoLocker<Worker> locker(this); 338 339 Job* job = fJobs.Lookup(key); 340 if (job == NULL) 341 return; 342 343 _AbortJob(job, true); 344 } 345 346 347 Job* 348 Worker::GetJob(const JobKey& key) 349 { 350 AutoLocker<Worker> locker(this); 351 return fJobs.Lookup(key); 352 } 353 354 355 status_t 356 Worker::ResumeJob(Job* job) 357 { 358 AutoLocker<Worker> locker(this); 359 360 for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) { 361 if (it.Current() == job) { 362 it.Remove(); 363 job->SetState(JOB_STATE_UNSCHEDULED); 364 fUnscheduledJobs.Add(job); 365 release_sem(fWorkToDoSem); 366 return B_OK; 367 } 368 } 369 370 return B_ENTRY_NOT_FOUND; 371 } 372 373 374 bool 375 Worker::HasPendingJobs() 376 { 377 AutoLocker<Worker> locker(this); 378 return !fJobs.IsEmpty(); 379 } 380 381 382 status_t 383 Worker::AddListener(const JobKey& key, JobListener* listener) 384 { 385 AutoLocker<Worker> locker(this); 386 387 Job* job = fJobs.Lookup(key); 388 if (job == NULL) 389 return B_ENTRY_NOT_FOUND; 390 391 return job->AddListener(listener); 392 } 393 394 395 void 396 Worker::RemoveListener(const JobKey& key, JobListener* listener) 397 { 398 AutoLocker<Worker> locker(this); 399 400 if (Job* job = fJobs.Lookup(key)) 401 job->RemoveListener(listener); 402 } 403 404 405 job_wait_status 406 Worker::WaitForJob(Job* waitingJob, const JobKey& key) 407 { 408 AutoLocker<Worker> locker(this); 409 410 // don't wait when the game is over anyway 411 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED) 412 return JOB_DEPENDENCY_ABORTED; 413 414 Job* job = fJobs.Lookup(key); 415 if (job == NULL) 416 return JOB_DEPENDENCY_NOT_FOUND; 417 418 waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE); 419 waitingJob->SetDependency(job); 420 job->DependentJobs().Add(waitingJob); 421 422 return waitingJob->WaitStatus(); 423 } 424 425 426 status_t 427 Worker::WaitForUserInput(Job* waitingJob) 428 { 429 AutoLocker<Worker> locker(this); 430 431 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED) 432 return B_INTERRUPTED; 433 434 waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING); 435 waitingJob->NotifyListeners(); 436 fSuspendedJobs.Add(waitingJob); 437 438 return B_OK; 439 } 440 441 442 /*static*/ status_t 443 Worker::_WorkerLoopEntry(void* data) 444 { 445 return ((Worker*)data)->_WorkerLoop(); 446 } 447 448 449 status_t 450 Worker::_WorkerLoop() 451 { 452 _ProcessJobs(); 453 454 // clean up aborted jobs 455 AutoLocker<Worker> locker(this); 456 while (Job* job = fAbortedJobs.RemoveHead()) 457 _FinishJob(job); 458 459 return B_OK; 460 } 461 462 463 void 464 Worker::_ProcessJobs() 465 { 466 while (true) { 467 AutoLocker<Worker> locker(this); 468 469 // wait for next job 470 if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) { 471 locker.Unlock(); 472 473 status_t error = acquire_sem(fWorkToDoSem); 474 if (error != B_OK) { 475 if (error == B_INTERRUPTED) { 476 locker.Lock(); 477 continue; 478 } 479 break; 480 } 481 482 locker.Lock(); 483 } 484 485 // clean up aborted jobs 486 while (Job* job = fAbortedJobs.RemoveHead()) 487 _FinishJob(job); 488 489 // process the next job 490 if (Job* job = fUnscheduledJobs.RemoveHead()) { 491 job->SetState(JOB_STATE_ACTIVE); 492 job->NotifyListeners(); 493 494 locker.Unlock(); 495 status_t error = job->Do(); 496 locker.Lock(); 497 498 if (job->State() == JOB_STATE_ACTIVE) { 499 job->SetState( 500 error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED); 501 } else if (job->State() == JOB_STATE_WAITING) 502 continue; 503 504 _FinishJob(job); 505 } 506 } 507 } 508 509 510 void 511 Worker::_AbortJob(Job* job, bool removeFromTable) 512 { 513 switch (job->State()) { 514 case JOB_STATE_ABORTED: 515 return; 516 517 case JOB_STATE_UNSCHEDULED: 518 fUnscheduledJobs.Remove(job); 519 fAbortedJobs.Add(job); 520 break; 521 522 case JOB_STATE_WAITING: 523 { 524 Job* dependency = job->Dependency(); 525 if (dependency != NULL) 526 dependency->DependentJobs().Remove(job); 527 job->SetDependency(NULL); 528 break; 529 } 530 case JOB_STATE_ACTIVE: 531 case JOB_STATE_FAILED: 532 case JOB_STATE_SUCCEEDED: 533 default: 534 break; 535 } 536 537 job->SetState(JOB_STATE_ABORTED); 538 if (removeFromTable) 539 fJobs.Remove(job); 540 } 541 542 543 void 544 Worker::_FinishJob(Job* job) 545 { 546 // wake up dependent jobs 547 if (!job->DependentJobs().IsEmpty()) { 548 job_wait_status waitStatus; 549 switch (job->State()) { 550 case JOB_STATE_ABORTED: 551 waitStatus = JOB_DEPENDENCY_ABORTED; 552 break; 553 case JOB_STATE_FAILED: 554 waitStatus = JOB_DEPENDENCY_FAILED; 555 break; 556 case JOB_STATE_SUCCEEDED: 557 waitStatus = JOB_DEPENDENCY_SUCCEEDED; 558 break; 559 560 case JOB_STATE_UNSCHEDULED: 561 case JOB_STATE_WAITING: 562 case JOB_STATE_ACTIVE: 563 default: 564 // should never happen 565 waitStatus = JOB_DEPENDENCY_NOT_FOUND; 566 break; 567 } 568 569 while (Job* dependentJob = job->DependentJobs().RemoveHead()) { 570 dependentJob->SetDependency(NULL); 571 dependentJob->SetWaitStatus(waitStatus); 572 fUnscheduledJobs.Add(dependentJob); 573 } 574 575 release_sem(fWorkToDoSem); 576 } 577 578 if (job->State() != JOB_STATE_ABORTED) 579 fJobs.Remove(job); 580 job->NotifyListeners(); 581 job->ReleaseReference(); 582 } 583