1 /*
2 * Copyright 2012-2014, Rene Gollent, rene@gollent.com.
3 * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de.
4 * Distributed under the terms of the MIT License.
5 */
6
7 #include "Worker.h"
8
9 #include <AutoDeleter.h>
10 #include <AutoLocker.h>
11
12
13 // pragma mark - JobKey
14
15
~JobKey()16 JobKey::~JobKey()
17 {
18 }
19
20
21 // pragma mark - SimpleJobKey
22
23
SimpleJobKey(const void * object,uint32 type)24 SimpleJobKey::SimpleJobKey(const void* object, uint32 type)
25 :
26 object(object),
27 type(type)
28 {
29 }
30
31
SimpleJobKey(const SimpleJobKey & other)32 SimpleJobKey::SimpleJobKey(const SimpleJobKey& other)
33 :
34 object(other.object),
35 type(other.type)
36 {
37 }
38
39
40 size_t
HashValue() const41 SimpleJobKey::HashValue() const
42 {
43 return (size_t)(addr_t)object ^ (size_t)type;
44 }
45
46
47 bool
operator ==(const JobKey & other) const48 SimpleJobKey::operator==(const JobKey& other) const
49 {
50 const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other);
51 return otherKey != NULL && object == otherKey->object
52 && type == otherKey->type;
53 }
54
55
56 SimpleJobKey&
operator =(const SimpleJobKey & other)57 SimpleJobKey::operator=(const SimpleJobKey& other)
58 {
59 object = other.object;
60 type = other.type;
61 return *this;
62 }
63
64
65 // #pragma mark - JobListener
66
67
~JobListener()68 JobListener::~JobListener()
69 {
70 }
71
72
73 void
JobStarted(Job * job)74 JobListener::JobStarted(Job* job)
75 {
76 }
77
78
79 void
JobDone(Job * job)80 JobListener::JobDone(Job* job)
81 {
82 }
83
84
85 void
JobWaitingForInput(Job * job)86 JobListener::JobWaitingForInput(Job* job)
87 {
88 }
89
90
91 void
JobFailed(Job * job)92 JobListener::JobFailed(Job* job)
93 {
94 }
95
96
97 void
JobAborted(Job * job)98 JobListener::JobAborted(Job* job)
99 {
100 }
101
102
103 // #pragma mark - Job
104
105
Job()106 Job::Job()
107 :
108 fWorker(NULL),
109 fState(JOB_STATE_UNSCHEDULED),
110 fDependency(NULL),
111 fWaitStatus(JOB_DEPENDENCY_NOT_FOUND),
112 fListeners(10)
113 {
114 }
115
116
~Job()117 Job::~Job()
118 {
119 }
120
121
122 job_wait_status
WaitFor(const JobKey & key)123 Job::WaitFor(const JobKey& key)
124 {
125 return fWorker->WaitForJob(this, key);
126 }
127
128
129 status_t
WaitForUserInput()130 Job::WaitForUserInput()
131 {
132 return fWorker->WaitForUserInput(this);
133 }
134
135
136 void
SetDescription(const char * format,...)137 Job::SetDescription(const char* format, ...)
138 {
139 va_list args;
140 va_start(args, format);
141 fDescription.SetToFormatVarArgs(format, args);
142 va_end(args);
143 }
144
145
146 void
SetWorker(Worker * worker)147 Job::SetWorker(Worker* worker)
148 {
149 fWorker = worker;
150 }
151
152
153 void
SetState(job_state state)154 Job::SetState(job_state state)
155 {
156 fState = state;
157 }
158
159
160 void
SetDependency(Job * job)161 Job::SetDependency(Job* job)
162 {
163 fDependency = job;
164 }
165
166
167 void
SetWaitStatus(job_wait_status status)168 Job::SetWaitStatus(job_wait_status status)
169 {
170 fWaitStatus = status;
171 switch (fWaitStatus) {
172 case JOB_DEPENDENCY_ACTIVE:
173 case JOB_USER_INPUT_WAITING:
174 fState = JOB_STATE_WAITING;
175 break;
176 default:
177 fState = JOB_STATE_ACTIVE;
178 break;
179 }
180 }
181
182
183 status_t
AddListener(JobListener * listener)184 Job::AddListener(JobListener* listener)
185 {
186 return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY;
187 }
188
189
190 void
RemoveListener(JobListener * listener)191 Job::RemoveListener(JobListener* listener)
192 {
193 fListeners.RemoveItem(listener);
194 }
195
196
197 void
NotifyListeners()198 Job::NotifyListeners()
199 {
200 int32 count = fListeners.CountItems();
201 for (int32 i = count - 1; i >= 0; i--) {
202 JobListener* listener = fListeners.ItemAt(i);
203 switch (fState) {
204 case JOB_STATE_ACTIVE:
205 listener->JobStarted(this);
206 break;
207 case JOB_STATE_WAITING:
208 if (fWaitStatus == JOB_USER_INPUT_WAITING)
209 listener->JobWaitingForInput(this);
210 break;
211 case JOB_STATE_SUCCEEDED:
212 listener->JobDone(this);
213 break;
214 case JOB_STATE_FAILED:
215 listener->JobFailed(this);
216 break;
217 case JOB_STATE_ABORTED:
218 default:
219 listener->JobAborted(this);
220 break;
221 }
222 }
223 }
224
225
226 // #pragma mark - Worker
227
228
Worker()229 Worker::Worker()
230 :
231 fLock("worker"),
232 fWorkerThread(-1),
233 fTerminating(false)
234 {
235 }
236
237
~Worker()238 Worker::~Worker()
239 {
240 ShutDown();
241
242 if (fWorkerThread >= 0)
243 wait_for_thread(fWorkerThread, NULL);
244 }
245
246
247 status_t
Init()248 Worker::Init()
249 {
250 // check lock
251 status_t error = fLock.InitCheck();
252 if (error != B_OK)
253 return error;
254
255 // init jobs table
256 error = fJobs.Init();
257 if (error != B_OK)
258 return error;
259
260 // create semaphore for the worker
261 fWorkToDoSem = create_sem(0, "work to do");
262 if (fWorkToDoSem < 0)
263 return fWorkToDoSem;
264
265 // spawn worker thread
266 fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY,
267 this);
268 if (fWorkerThread < 0)
269 return fWorkerThread;
270
271 resume_thread(fWorkerThread);
272
273 return B_OK;
274 }
275
276
277 void
ShutDown()278 Worker::ShutDown()
279 {
280 AutoLocker<Worker> locker(this);
281
282 if (fTerminating)
283 return;
284
285 fTerminating = true;
286
287 // abort all jobs
288 Job* job = fJobs.Clear(true);
289 while (job != NULL) {
290 Job* nextJob = job->fNext;
291 _AbortJob(job, false);
292 job = nextJob;
293
294 }
295
296 // let the work thread terminate
297 delete_sem(fWorkToDoSem);
298 fWorkToDoSem = -1;
299 }
300
301
302 status_t
ScheduleJob(Job * job,JobListener * listener)303 Worker::ScheduleJob(Job* job, JobListener* listener)
304 {
305 if (job == NULL)
306 return B_NO_MEMORY;
307
308 BReference<Job> jobReference(job, true);
309 AutoLocker<Worker> locker(this);
310
311 if (fTerminating)
312 return B_ERROR;
313
314 if (listener != NULL) {
315 status_t error = job->AddListener(listener);
316 if (error != B_OK)
317 return error;
318 }
319
320 bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty();
321
322 job->SetWorker(this);
323 job->SetState(JOB_STATE_UNSCHEDULED);
324 fJobs.Insert(job);
325 fUnscheduledJobs.Add(jobReference.Detach());
326
327 if (notify)
328 release_sem(fWorkToDoSem);
329
330 return B_OK;
331 }
332
333
334 void
AbortJob(const JobKey & key)335 Worker::AbortJob(const JobKey& key)
336 {
337 AutoLocker<Worker> locker(this);
338
339 Job* job = fJobs.Lookup(key);
340 if (job == NULL)
341 return;
342
343 _AbortJob(job, true);
344 }
345
346
347 Job*
GetJob(const JobKey & key)348 Worker::GetJob(const JobKey& key)
349 {
350 AutoLocker<Worker> locker(this);
351 return fJobs.Lookup(key);
352 }
353
354
355 status_t
ResumeJob(Job * job)356 Worker::ResumeJob(Job* job)
357 {
358 AutoLocker<Worker> locker(this);
359
360 for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) {
361 if (it.Current() == job) {
362 it.Remove();
363 job->SetState(JOB_STATE_UNSCHEDULED);
364 fUnscheduledJobs.Add(job);
365 release_sem(fWorkToDoSem);
366 return B_OK;
367 }
368 }
369
370 return B_ENTRY_NOT_FOUND;
371 }
372
373
374 bool
HasPendingJobs()375 Worker::HasPendingJobs()
376 {
377 AutoLocker<Worker> locker(this);
378 return !fJobs.IsEmpty();
379 }
380
381
382 status_t
AddListener(const JobKey & key,JobListener * listener)383 Worker::AddListener(const JobKey& key, JobListener* listener)
384 {
385 AutoLocker<Worker> locker(this);
386
387 Job* job = fJobs.Lookup(key);
388 if (job == NULL)
389 return B_ENTRY_NOT_FOUND;
390
391 return job->AddListener(listener);
392 }
393
394
395 void
RemoveListener(const JobKey & key,JobListener * listener)396 Worker::RemoveListener(const JobKey& key, JobListener* listener)
397 {
398 AutoLocker<Worker> locker(this);
399
400 if (Job* job = fJobs.Lookup(key))
401 job->RemoveListener(listener);
402 }
403
404
405 job_wait_status
WaitForJob(Job * waitingJob,const JobKey & key)406 Worker::WaitForJob(Job* waitingJob, const JobKey& key)
407 {
408 AutoLocker<Worker> locker(this);
409
410 // don't wait when the game is over anyway
411 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
412 return JOB_DEPENDENCY_ABORTED;
413
414 Job* job = fJobs.Lookup(key);
415 if (job == NULL)
416 return JOB_DEPENDENCY_NOT_FOUND;
417
418 waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE);
419 waitingJob->SetDependency(job);
420 job->DependentJobs().Add(waitingJob);
421
422 return waitingJob->WaitStatus();
423 }
424
425
426 status_t
WaitForUserInput(Job * waitingJob)427 Worker::WaitForUserInput(Job* waitingJob)
428 {
429 AutoLocker<Worker> locker(this);
430
431 if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
432 return B_INTERRUPTED;
433
434 waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING);
435 waitingJob->NotifyListeners();
436 fSuspendedJobs.Add(waitingJob);
437
438 return B_OK;
439 }
440
441
442 /*static*/ status_t
_WorkerLoopEntry(void * data)443 Worker::_WorkerLoopEntry(void* data)
444 {
445 return ((Worker*)data)->_WorkerLoop();
446 }
447
448
449 status_t
_WorkerLoop()450 Worker::_WorkerLoop()
451 {
452 _ProcessJobs();
453
454 // clean up aborted jobs
455 AutoLocker<Worker> locker(this);
456 while (Job* job = fAbortedJobs.RemoveHead())
457 _FinishJob(job);
458
459 return B_OK;
460 }
461
462
463 void
_ProcessJobs()464 Worker::_ProcessJobs()
465 {
466 while (true) {
467 AutoLocker<Worker> locker(this);
468
469 // wait for next job
470 if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) {
471 locker.Unlock();
472
473 status_t error = acquire_sem(fWorkToDoSem);
474 if (error != B_OK) {
475 if (error == B_INTERRUPTED) {
476 locker.Lock();
477 continue;
478 }
479 break;
480 }
481
482 locker.Lock();
483 }
484
485 // clean up aborted jobs
486 while (Job* job = fAbortedJobs.RemoveHead())
487 _FinishJob(job);
488
489 // process the next job
490 if (Job* job = fUnscheduledJobs.RemoveHead()) {
491 job->SetState(JOB_STATE_ACTIVE);
492 job->NotifyListeners();
493
494 locker.Unlock();
495 status_t error = job->Do();
496 locker.Lock();
497
498 if (job->State() == JOB_STATE_ACTIVE) {
499 job->SetState(
500 error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED);
501 } else if (job->State() == JOB_STATE_WAITING)
502 continue;
503
504 _FinishJob(job);
505 }
506 }
507 }
508
509
510 void
_AbortJob(Job * job,bool removeFromTable)511 Worker::_AbortJob(Job* job, bool removeFromTable)
512 {
513 switch (job->State()) {
514 case JOB_STATE_ABORTED:
515 return;
516
517 case JOB_STATE_UNSCHEDULED:
518 fUnscheduledJobs.Remove(job);
519 fAbortedJobs.Add(job);
520 break;
521
522 case JOB_STATE_WAITING:
523 {
524 Job* dependency = job->Dependency();
525 if (dependency != NULL)
526 dependency->DependentJobs().Remove(job);
527 job->SetDependency(NULL);
528 break;
529 }
530 case JOB_STATE_ACTIVE:
531 case JOB_STATE_FAILED:
532 case JOB_STATE_SUCCEEDED:
533 default:
534 break;
535 }
536
537 job->SetState(JOB_STATE_ABORTED);
538 if (removeFromTable)
539 fJobs.Remove(job);
540 }
541
542
543 void
_FinishJob(Job * job)544 Worker::_FinishJob(Job* job)
545 {
546 // wake up dependent jobs
547 if (!job->DependentJobs().IsEmpty()) {
548 job_wait_status waitStatus;
549 switch (job->State()) {
550 case JOB_STATE_ABORTED:
551 waitStatus = JOB_DEPENDENCY_ABORTED;
552 break;
553 case JOB_STATE_FAILED:
554 waitStatus = JOB_DEPENDENCY_FAILED;
555 break;
556 case JOB_STATE_SUCCEEDED:
557 waitStatus = JOB_DEPENDENCY_SUCCEEDED;
558 break;
559
560 case JOB_STATE_UNSCHEDULED:
561 case JOB_STATE_WAITING:
562 case JOB_STATE_ACTIVE:
563 default:
564 // should never happen
565 waitStatus = JOB_DEPENDENCY_NOT_FOUND;
566 break;
567 }
568
569 while (Job* dependentJob = job->DependentJobs().RemoveHead()) {
570 dependentJob->SetDependency(NULL);
571 dependentJob->SetWaitStatus(waitStatus);
572 fUnscheduledJobs.Add(dependentJob);
573 }
574
575 release_sem(fWorkToDoSem);
576 }
577
578 if (job->State() != JOB_STATE_ABORTED)
579 fJobs.Remove(job);
580 job->NotifyListeners();
581 job->ReleaseReference();
582 }
583