xref: /haiku/src/kits/debugger/util/Worker.cpp (revision cf4ce7e38b38ded05776a0397a38488f8e8308f7)
1 /*
2  * Copyright 2012-2014, Rene Gollent, rene@gollent.com.
3  * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 #include "Worker.h"
8 
9 #include <AutoDeleter.h>
10 #include <AutoLocker.h>
11 
12 
13 // pragma mark - JobKey
14 
15 
~JobKey()16 JobKey::~JobKey()
17 {
18 }
19 
20 
21 // pragma mark - SimpleJobKey
22 
23 
SimpleJobKey(const void * object,uint32 type)24 SimpleJobKey::SimpleJobKey(const void* object, uint32 type)
25 	:
26 	object(object),
27 	type(type)
28 {
29 }
30 
31 
SimpleJobKey(const SimpleJobKey & other)32 SimpleJobKey::SimpleJobKey(const SimpleJobKey& other)
33 	:
34 	object(other.object),
35 	type(other.type)
36 {
37 }
38 
39 
40 size_t
HashValue() const41 SimpleJobKey::HashValue() const
42 {
43 	return (size_t)(addr_t)object ^ (size_t)type;
44 }
45 
46 
47 bool
operator ==(const JobKey & other) const48 SimpleJobKey::operator==(const JobKey& other) const
49 {
50 	const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other);
51 	return otherKey != NULL && object == otherKey->object
52 		&& type == otherKey->type;
53 }
54 
55 
56 SimpleJobKey&
operator =(const SimpleJobKey & other)57 SimpleJobKey::operator=(const SimpleJobKey& other)
58 {
59 	object = other.object;
60 	type = other.type;
61 	return *this;
62 }
63 
64 
65 // #pragma mark - JobListener
66 
67 
~JobListener()68 JobListener::~JobListener()
69 {
70 }
71 
72 
73 void
JobStarted(Job * job)74 JobListener::JobStarted(Job* job)
75 {
76 }
77 
78 
79 void
JobDone(Job * job)80 JobListener::JobDone(Job* job)
81 {
82 }
83 
84 
85 void
JobWaitingForInput(Job * job)86 JobListener::JobWaitingForInput(Job* job)
87 {
88 }
89 
90 
91 void
JobFailed(Job * job)92 JobListener::JobFailed(Job* job)
93 {
94 }
95 
96 
97 void
JobAborted(Job * job)98 JobListener::JobAborted(Job* job)
99 {
100 }
101 
102 
103 // #pragma mark - Job
104 
105 
Job()106 Job::Job()
107 	:
108 	fWorker(NULL),
109 	fState(JOB_STATE_UNSCHEDULED),
110 	fDependency(NULL),
111 	fWaitStatus(JOB_DEPENDENCY_NOT_FOUND),
112 	fListeners(10)
113 {
114 }
115 
116 
~Job()117 Job::~Job()
118 {
119 }
120 
121 
122 job_wait_status
WaitFor(const JobKey & key)123 Job::WaitFor(const JobKey& key)
124 {
125 	return fWorker->WaitForJob(this, key);
126 }
127 
128 
129 status_t
WaitForUserInput()130 Job::WaitForUserInput()
131 {
132 	return fWorker->WaitForUserInput(this);
133 }
134 
135 
136 void
SetDescription(const char * format,...)137 Job::SetDescription(const char* format, ...)
138 {
139 	va_list args;
140 	va_start(args, format);
141 	fDescription.SetToFormatVarArgs(format, args);
142 	va_end(args);
143 }
144 
145 
146 void
SetWorker(Worker * worker)147 Job::SetWorker(Worker* worker)
148 {
149 	fWorker = worker;
150 }
151 
152 
153 void
SetState(job_state state)154 Job::SetState(job_state state)
155 {
156 	fState = state;
157 }
158 
159 
160 void
SetDependency(Job * job)161 Job::SetDependency(Job* job)
162 {
163 	fDependency = job;
164 }
165 
166 
167 void
SetWaitStatus(job_wait_status status)168 Job::SetWaitStatus(job_wait_status status)
169 {
170 	fWaitStatus = status;
171 	switch (fWaitStatus) {
172 		case JOB_DEPENDENCY_ACTIVE:
173 		case JOB_USER_INPUT_WAITING:
174 			fState = JOB_STATE_WAITING;
175 			break;
176 		default:
177 			fState = JOB_STATE_ACTIVE;
178 			break;
179 	}
180 }
181 
182 
183 status_t
AddListener(JobListener * listener)184 Job::AddListener(JobListener* listener)
185 {
186 	return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY;
187 }
188 
189 
190 void
RemoveListener(JobListener * listener)191 Job::RemoveListener(JobListener* listener)
192 {
193 	fListeners.RemoveItem(listener);
194 }
195 
196 
197 void
NotifyListeners()198 Job::NotifyListeners()
199 {
200 	int32 count = fListeners.CountItems();
201 	for (int32 i = count - 1; i >= 0; i--) {
202 		JobListener* listener = fListeners.ItemAt(i);
203 		switch (fState) {
204 			case JOB_STATE_ACTIVE:
205 				listener->JobStarted(this);
206 				break;
207 			case JOB_STATE_WAITING:
208 				if (fWaitStatus == JOB_USER_INPUT_WAITING)
209 					listener->JobWaitingForInput(this);
210 				break;
211 			case JOB_STATE_SUCCEEDED:
212 				listener->JobDone(this);
213 				break;
214 			case JOB_STATE_FAILED:
215 				listener->JobFailed(this);
216 				break;
217 			case JOB_STATE_ABORTED:
218 			default:
219 				listener->JobAborted(this);
220 				break;
221 		}
222 	}
223 }
224 
225 
226 // #pragma mark - Worker
227 
228 
Worker()229 Worker::Worker()
230 	:
231 	fLock("worker"),
232 	fWorkerThread(-1),
233 	fTerminating(false)
234 {
235 }
236 
237 
~Worker()238 Worker::~Worker()
239 {
240 	ShutDown();
241 
242 	if (fWorkerThread >= 0)
243 		wait_for_thread(fWorkerThread, NULL);
244 }
245 
246 
247 status_t
Init()248 Worker::Init()
249 {
250 	// check lock
251 	status_t error = fLock.InitCheck();
252 	if (error != B_OK)
253 		return error;
254 
255 	// init jobs table
256 	error = fJobs.Init();
257 	if (error != B_OK)
258 		return error;
259 
260 	// create semaphore for the worker
261 	fWorkToDoSem = create_sem(0, "work to do");
262 	if (fWorkToDoSem < 0)
263 		return fWorkToDoSem;
264 
265 	// spawn worker thread
266 	fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY,
267 		this);
268 	if (fWorkerThread < 0)
269 		return fWorkerThread;
270 
271 	resume_thread(fWorkerThread);
272 
273 	return B_OK;
274 }
275 
276 
277 void
ShutDown()278 Worker::ShutDown()
279 {
280 	AutoLocker<Worker> locker(this);
281 
282 	if (fTerminating)
283 		return;
284 
285 	fTerminating = true;
286 
287 	// abort all jobs
288 	Job* job = fJobs.Clear(true);
289 	while (job != NULL) {
290 		Job* nextJob = job->fNext;
291 		_AbortJob(job, false);
292 		job = nextJob;
293 
294 	}
295 
296 	// let the work thread terminate
297 	delete_sem(fWorkToDoSem);
298 	fWorkToDoSem = -1;
299 }
300 
301 
302 status_t
ScheduleJob(Job * job,JobListener * listener)303 Worker::ScheduleJob(Job* job, JobListener* listener)
304 {
305 	if (job == NULL)
306 		return B_NO_MEMORY;
307 
308 	BReference<Job> jobReference(job, true);
309 	AutoLocker<Worker> locker(this);
310 
311 	if (fTerminating)
312 		return B_ERROR;
313 
314 	if (listener != NULL) {
315 		status_t error = job->AddListener(listener);
316 		if (error != B_OK)
317 			return error;
318 	}
319 
320 	bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty();
321 
322 	job->SetWorker(this);
323 	job->SetState(JOB_STATE_UNSCHEDULED);
324 	fJobs.Insert(job);
325 	fUnscheduledJobs.Add(jobReference.Detach());
326 
327 	if (notify)
328 		release_sem(fWorkToDoSem);
329 
330 	return B_OK;
331 }
332 
333 
334 void
AbortJob(const JobKey & key)335 Worker::AbortJob(const JobKey& key)
336 {
337 	AutoLocker<Worker> locker(this);
338 
339 	Job* job = fJobs.Lookup(key);
340 	if (job == NULL)
341 		return;
342 
343 	_AbortJob(job, true);
344 }
345 
346 
347 Job*
GetJob(const JobKey & key)348 Worker::GetJob(const JobKey& key)
349 {
350 	AutoLocker<Worker> locker(this);
351 	return fJobs.Lookup(key);
352 }
353 
354 
355 status_t
ResumeJob(Job * job)356 Worker::ResumeJob(Job* job)
357 {
358 	AutoLocker<Worker> locker(this);
359 
360 	for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) {
361 		if (it.Current() == job) {
362 			it.Remove();
363 			job->SetState(JOB_STATE_UNSCHEDULED);
364 			fUnscheduledJobs.Add(job);
365 			release_sem(fWorkToDoSem);
366 			return B_OK;
367 		}
368 	}
369 
370 	return B_ENTRY_NOT_FOUND;
371 }
372 
373 
374 bool
HasPendingJobs()375 Worker::HasPendingJobs()
376 {
377 	AutoLocker<Worker> locker(this);
378 	return !fJobs.IsEmpty();
379 }
380 
381 
382 status_t
AddListener(const JobKey & key,JobListener * listener)383 Worker::AddListener(const JobKey& key, JobListener* listener)
384 {
385 	AutoLocker<Worker> locker(this);
386 
387 	Job* job = fJobs.Lookup(key);
388 	if (job == NULL)
389 		return B_ENTRY_NOT_FOUND;
390 
391 	return job->AddListener(listener);
392 }
393 
394 
395 void
RemoveListener(const JobKey & key,JobListener * listener)396 Worker::RemoveListener(const JobKey& key, JobListener* listener)
397 {
398 	AutoLocker<Worker> locker(this);
399 
400 	if (Job* job = fJobs.Lookup(key))
401 		job->RemoveListener(listener);
402 }
403 
404 
405 job_wait_status
WaitForJob(Job * waitingJob,const JobKey & key)406 Worker::WaitForJob(Job* waitingJob, const JobKey& key)
407 {
408 	AutoLocker<Worker> locker(this);
409 
410 	// don't wait when the game is over anyway
411 	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
412 		return JOB_DEPENDENCY_ABORTED;
413 
414 	Job* job = fJobs.Lookup(key);
415 	if (job == NULL)
416 		return JOB_DEPENDENCY_NOT_FOUND;
417 
418 	waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE);
419 	waitingJob->SetDependency(job);
420 	job->DependentJobs().Add(waitingJob);
421 
422 	return waitingJob->WaitStatus();
423 }
424 
425 
426 status_t
WaitForUserInput(Job * waitingJob)427 Worker::WaitForUserInput(Job* waitingJob)
428 {
429 	AutoLocker<Worker> locker(this);
430 
431 	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
432 		return B_INTERRUPTED;
433 
434 	waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING);
435 	waitingJob->NotifyListeners();
436 	fSuspendedJobs.Add(waitingJob);
437 
438 	return B_OK;
439 }
440 
441 
442 /*static*/ status_t
_WorkerLoopEntry(void * data)443 Worker::_WorkerLoopEntry(void* data)
444 {
445 	return ((Worker*)data)->_WorkerLoop();
446 }
447 
448 
449 status_t
_WorkerLoop()450 Worker::_WorkerLoop()
451 {
452 	_ProcessJobs();
453 
454 	// clean up aborted jobs
455 	AutoLocker<Worker> locker(this);
456 	while (Job* job = fAbortedJobs.RemoveHead())
457 		_FinishJob(job);
458 
459 	return B_OK;
460 }
461 
462 
463 void
_ProcessJobs()464 Worker::_ProcessJobs()
465 {
466 	while (true) {
467 		AutoLocker<Worker> locker(this);
468 
469 		// wait for next job
470 		if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) {
471 			locker.Unlock();
472 
473 			status_t error = acquire_sem(fWorkToDoSem);
474 			if (error != B_OK) {
475 				if (error == B_INTERRUPTED) {
476 					locker.Lock();
477 					continue;
478 				}
479 				break;
480 			}
481 
482 			locker.Lock();
483 		}
484 
485 		// clean up aborted jobs
486 		while (Job* job = fAbortedJobs.RemoveHead())
487 			_FinishJob(job);
488 
489 		// process the next job
490 		if (Job* job = fUnscheduledJobs.RemoveHead()) {
491 			job->SetState(JOB_STATE_ACTIVE);
492 			job->NotifyListeners();
493 
494 			locker.Unlock();
495 			status_t error = job->Do();
496 			locker.Lock();
497 
498 			if (job->State() == JOB_STATE_ACTIVE) {
499 				job->SetState(
500 					error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED);
501 			} else if (job->State() == JOB_STATE_WAITING)
502 				continue;
503 
504 			_FinishJob(job);
505 		}
506 	}
507 }
508 
509 
510 void
_AbortJob(Job * job,bool removeFromTable)511 Worker::_AbortJob(Job* job, bool removeFromTable)
512 {
513 	switch (job->State()) {
514 		case JOB_STATE_ABORTED:
515 			return;
516 
517 		case JOB_STATE_UNSCHEDULED:
518 			fUnscheduledJobs.Remove(job);
519 			fAbortedJobs.Add(job);
520 			break;
521 
522 		case JOB_STATE_WAITING:
523 		{
524 			Job* dependency = job->Dependency();
525 			if (dependency != NULL)
526 				dependency->DependentJobs().Remove(job);
527 			job->SetDependency(NULL);
528 			break;
529 		}
530 		case JOB_STATE_ACTIVE:
531 		case JOB_STATE_FAILED:
532 		case JOB_STATE_SUCCEEDED:
533 		default:
534 			break;
535 	}
536 
537 	job->SetState(JOB_STATE_ABORTED);
538 	if (removeFromTable)
539 		fJobs.Remove(job);
540 }
541 
542 
543 void
_FinishJob(Job * job)544 Worker::_FinishJob(Job* job)
545 {
546 	// wake up dependent jobs
547 	if (!job->DependentJobs().IsEmpty()) {
548 		job_wait_status waitStatus;
549 		switch (job->State()) {
550 			case JOB_STATE_ABORTED:
551 				waitStatus = JOB_DEPENDENCY_ABORTED;
552 				break;
553 			case JOB_STATE_FAILED:
554 				waitStatus = JOB_DEPENDENCY_FAILED;
555 				break;
556 			case JOB_STATE_SUCCEEDED:
557 				waitStatus = JOB_DEPENDENCY_SUCCEEDED;
558 				break;
559 
560 			case JOB_STATE_UNSCHEDULED:
561 			case JOB_STATE_WAITING:
562 			case JOB_STATE_ACTIVE:
563 			default:
564 				// should never happen
565 				waitStatus = JOB_DEPENDENCY_NOT_FOUND;
566 				break;
567 		}
568 
569 		while (Job* dependentJob = job->DependentJobs().RemoveHead()) {
570 			dependentJob->SetDependency(NULL);
571 			dependentJob->SetWaitStatus(waitStatus);
572 			fUnscheduledJobs.Add(dependentJob);
573 		}
574 
575 		release_sem(fWorkToDoSem);
576 	}
577 
578 	if (job->State() != JOB_STATE_ABORTED)
579 		fJobs.Remove(job);
580 	job->NotifyListeners();
581 	job->ReleaseReference();
582 }
583