xref: /haiku/src/kits/debugger/util/Worker.cpp (revision 2cad94c1c30b6223ad8c08710b26e071d32e9979)
1 /*
2  * Copyright 2012-2014, Rene Gollent, rene@gollent.com.
3  * Copyright 2009, Ingo Weinhold, ingo_weinhold@gmx.de.
4  * Distributed under the terms of the MIT License.
5  */
6 
7 #include "Worker.h"
8 
9 #include <AutoDeleter.h>
10 #include <AutoLocker.h>
11 
12 
13 // pragma mark - JobKey
14 
15 
16 JobKey::~JobKey()
17 {
18 }
19 
20 
21 // pragma mark - SimpleJobKey
22 
23 
24 SimpleJobKey::SimpleJobKey(const void* object, uint32 type)
25 	:
26 	object(object),
27 	type(type)
28 {
29 }
30 
31 
32 SimpleJobKey::SimpleJobKey(const SimpleJobKey& other)
33 	:
34 	object(other.object),
35 	type(other.type)
36 {
37 }
38 
39 
40 size_t
41 SimpleJobKey::HashValue() const
42 {
43 	return (size_t)(addr_t)object ^ (size_t)type;
44 }
45 
46 
47 bool
48 SimpleJobKey::operator==(const JobKey& other) const
49 {
50 	const SimpleJobKey* otherKey = dynamic_cast<const SimpleJobKey*>(&other);
51 	return otherKey != NULL && object == otherKey->object
52 		&& type == otherKey->type;
53 }
54 
55 
56 SimpleJobKey&
57 SimpleJobKey::operator=(const SimpleJobKey& other)
58 {
59 	object = other.object;
60 	type = other.type;
61 	return *this;
62 }
63 
64 
65 // #pragma mark - JobListener
66 
67 
68 JobListener::~JobListener()
69 {
70 }
71 
72 
73 void
74 JobListener::JobStarted(Job* job)
75 {
76 }
77 
78 
79 void
80 JobListener::JobDone(Job* job)
81 {
82 }
83 
84 
85 void
86 JobListener::JobWaitingForInput(Job* job)
87 {
88 }
89 
90 
91 void
92 JobListener::JobFailed(Job* job)
93 {
94 }
95 
96 
97 void
98 JobListener::JobAborted(Job* job)
99 {
100 }
101 
102 
103 // #pragma mark - Job
104 
105 
106 Job::Job()
107 	:
108 	fWorker(NULL),
109 	fState(JOB_STATE_UNSCHEDULED),
110 	fDependency(NULL),
111 	fWaitStatus(JOB_DEPENDENCY_NOT_FOUND),
112 	fListeners(10)
113 {
114 }
115 
116 
117 Job::~Job()
118 {
119 }
120 
121 
122 job_wait_status
123 Job::WaitFor(const JobKey& key)
124 {
125 	return fWorker->WaitForJob(this, key);
126 }
127 
128 
129 status_t
130 Job::WaitForUserInput()
131 {
132 	return fWorker->WaitForUserInput(this);
133 }
134 
135 
136 void
137 Job::SetDescription(const char* format, ...)
138 {
139 	va_list args;
140 	va_start(args, format);
141 	fDescription.SetToFormatVarArgs(format, args);
142 }
143 
144 
145 void
146 Job::SetWorker(Worker* worker)
147 {
148 	fWorker = worker;
149 }
150 
151 
152 void
153 Job::SetState(job_state state)
154 {
155 	fState = state;
156 }
157 
158 
159 void
160 Job::SetDependency(Job* job)
161 {
162 	fDependency = job;
163 }
164 
165 
166 void
167 Job::SetWaitStatus(job_wait_status status)
168 {
169 	fWaitStatus = status;
170 	switch (fWaitStatus) {
171 		case JOB_DEPENDENCY_ACTIVE:
172 		case JOB_USER_INPUT_WAITING:
173 			fState = JOB_STATE_WAITING;
174 			break;
175 		default:
176 			fState = JOB_STATE_ACTIVE;
177 			break;
178 	}
179 }
180 
181 
182 status_t
183 Job::AddListener(JobListener* listener)
184 {
185 	return fListeners.AddItem(listener) ? B_OK : B_NO_MEMORY;
186 }
187 
188 
189 void
190 Job::RemoveListener(JobListener* listener)
191 {
192 	fListeners.RemoveItem(listener);
193 }
194 
195 
196 void
197 Job::NotifyListeners()
198 {
199 	int32 count = fListeners.CountItems();
200 	for (int32 i = count - 1; i >= 0; i--) {
201 		JobListener* listener = fListeners.ItemAt(i);
202 		switch (fState) {
203 			case JOB_STATE_ACTIVE:
204 				listener->JobStarted(this);
205 				break;
206 			case JOB_STATE_WAITING:
207 				if (fWaitStatus == JOB_USER_INPUT_WAITING)
208 					listener->JobWaitingForInput(this);
209 				break;
210 			case JOB_STATE_SUCCEEDED:
211 				listener->JobDone(this);
212 				break;
213 			case JOB_STATE_FAILED:
214 				listener->JobFailed(this);
215 				break;
216 			case JOB_STATE_ABORTED:
217 			default:
218 				listener->JobAborted(this);
219 				break;
220 		}
221 	}
222 }
223 
224 
225 // #pragma mark - Worker
226 
227 
228 Worker::Worker()
229 	:
230 	fLock("worker"),
231 	fWorkerThread(-1),
232 	fTerminating(false)
233 {
234 }
235 
236 
237 Worker::~Worker()
238 {
239 	ShutDown();
240 
241 	if (fWorkerThread >= 0)
242 		wait_for_thread(fWorkerThread, NULL);
243 }
244 
245 
246 status_t
247 Worker::Init()
248 {
249 	// check lock
250 	status_t error = fLock.InitCheck();
251 	if (error != B_OK)
252 		return error;
253 
254 	// init jobs table
255 	error = fJobs.Init();
256 	if (error != B_OK)
257 		return error;
258 
259 	// create semaphore for the worker
260 	fWorkToDoSem = create_sem(0, "work to do");
261 	if (fWorkToDoSem < 0)
262 		return fWorkToDoSem;
263 
264 	// spawn worker thread
265 	fWorkerThread = spawn_thread(_WorkerLoopEntry, "worker", B_NORMAL_PRIORITY,
266 		this);
267 	if (fWorkerThread < 0)
268 		return fWorkerThread;
269 
270 	resume_thread(fWorkerThread);
271 
272 	return B_OK;
273 }
274 
275 
276 void
277 Worker::ShutDown()
278 {
279 	AutoLocker<Worker> locker(this);
280 
281 	if (fTerminating)
282 		return;
283 
284 	fTerminating = true;
285 
286 	// abort all jobs
287 	Job* job = fJobs.Clear(true);
288 	while (job != NULL) {
289 		Job* nextJob = job->fNext;
290 		_AbortJob(job, false);
291 		job = nextJob;
292 
293 	}
294 
295 	// let the work thread terminate
296 	delete_sem(fWorkToDoSem);
297 	fWorkToDoSem = -1;
298 }
299 
300 
301 status_t
302 Worker::ScheduleJob(Job* job, JobListener* listener)
303 {
304 	if (job == NULL)
305 		return B_NO_MEMORY;
306 
307 	BReference<Job> jobReference(job, true);
308 	AutoLocker<Worker> locker(this);
309 
310 	if (fTerminating)
311 		return B_ERROR;
312 
313 	if (listener != NULL) {
314 		status_t error = job->AddListener(listener);
315 		if (error != B_OK)
316 			return error;
317 	}
318 
319 	bool notify = fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty();
320 
321 	job->SetWorker(this);
322 	job->SetState(JOB_STATE_UNSCHEDULED);
323 	fJobs.Insert(job);
324 	fUnscheduledJobs.Add(jobReference.Detach());
325 
326 	if (notify)
327 		release_sem(fWorkToDoSem);
328 
329 	return B_OK;
330 }
331 
332 
333 void
334 Worker::AbortJob(const JobKey& key)
335 {
336 	AutoLocker<Worker> locker(this);
337 
338 	Job* job = fJobs.Lookup(key);
339 	if (job == NULL)
340 		return;
341 
342 	_AbortJob(job, true);
343 }
344 
345 
346 Job*
347 Worker::GetJob(const JobKey& key)
348 {
349 	AutoLocker<Worker> locker(this);
350 	return fJobs.Lookup(key);
351 }
352 
353 
354 status_t
355 Worker::ResumeJob(Job* job)
356 {
357 	AutoLocker<Worker> locker(this);
358 
359 	for (JobList::Iterator it = fSuspendedJobs.GetIterator(); it.Next();) {
360 		if (it.Current() == job) {
361 			it.Remove();
362 			job->SetState(JOB_STATE_UNSCHEDULED);
363 			fUnscheduledJobs.Add(job);
364 			release_sem(fWorkToDoSem);
365 			return B_OK;
366 		}
367 	}
368 
369 	return B_ENTRY_NOT_FOUND;
370 }
371 
372 
373 bool
374 Worker::HasPendingJobs()
375 {
376 	AutoLocker<Worker> locker(this);
377 	return !fJobs.IsEmpty();
378 }
379 
380 
381 status_t
382 Worker::AddListener(const JobKey& key, JobListener* listener)
383 {
384 	AutoLocker<Worker> locker(this);
385 
386 	Job* job = fJobs.Lookup(key);
387 	if (job == NULL)
388 		return B_ENTRY_NOT_FOUND;
389 
390 	return job->AddListener(listener);
391 }
392 
393 
394 void
395 Worker::RemoveListener(const JobKey& key, JobListener* listener)
396 {
397 	AutoLocker<Worker> locker(this);
398 
399 	if (Job* job = fJobs.Lookup(key))
400 		job->RemoveListener(listener);
401 }
402 
403 
404 job_wait_status
405 Worker::WaitForJob(Job* waitingJob, const JobKey& key)
406 {
407 	AutoLocker<Worker> locker(this);
408 
409 	// don't wait when the game is over anyway
410 	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
411 		return JOB_DEPENDENCY_ABORTED;
412 
413 	Job* job = fJobs.Lookup(key);
414 	if (job == NULL)
415 		return JOB_DEPENDENCY_NOT_FOUND;
416 
417 	waitingJob->SetWaitStatus(JOB_DEPENDENCY_ACTIVE);
418 	waitingJob->SetDependency(job);
419 	job->DependentJobs().Add(waitingJob);
420 
421 	return waitingJob->WaitStatus();
422 }
423 
424 
425 status_t
426 Worker::WaitForUserInput(Job* waitingJob)
427 {
428 	AutoLocker<Worker> locker(this);
429 
430 	if (fTerminating || waitingJob->State() == JOB_STATE_ABORTED)
431 		return B_INTERRUPTED;
432 
433 	waitingJob->SetWaitStatus(JOB_USER_INPUT_WAITING);
434 	waitingJob->NotifyListeners();
435 	fSuspendedJobs.Add(waitingJob);
436 
437 	return B_OK;
438 }
439 
440 
441 /*static*/ status_t
442 Worker::_WorkerLoopEntry(void* data)
443 {
444 	return ((Worker*)data)->_WorkerLoop();
445 }
446 
447 
448 status_t
449 Worker::_WorkerLoop()
450 {
451 	_ProcessJobs();
452 
453 	// clean up aborted jobs
454 	AutoLocker<Worker> locker(this);
455 	while (Job* job = fAbortedJobs.RemoveHead())
456 		_FinishJob(job);
457 
458 	return B_OK;
459 }
460 
461 
462 void
463 Worker::_ProcessJobs()
464 {
465 	while (true) {
466 		AutoLocker<Worker> locker(this);
467 
468 		// wait for next job
469 		if (fUnscheduledJobs.IsEmpty() && fAbortedJobs.IsEmpty()) {
470 			locker.Unlock();
471 
472 			status_t error = acquire_sem(fWorkToDoSem);
473 			if (error != B_OK) {
474 				if (error == B_INTERRUPTED) {
475 					locker.Lock();
476 					continue;
477 				}
478 				break;
479 			}
480 
481 			locker.Lock();
482 		}
483 
484 		// clean up aborted jobs
485 		while (Job* job = fAbortedJobs.RemoveHead())
486 			_FinishJob(job);
487 
488 		// process the next job
489 		if (Job* job = fUnscheduledJobs.RemoveHead()) {
490 			job->SetState(JOB_STATE_ACTIVE);
491 			job->NotifyListeners();
492 
493 			locker.Unlock();
494 			status_t error = job->Do();
495 			locker.Lock();
496 
497 			if (job->State() == JOB_STATE_ACTIVE) {
498 				job->SetState(
499 					error == B_OK ? JOB_STATE_SUCCEEDED : JOB_STATE_FAILED);
500 			} else if (job->State() == JOB_STATE_WAITING)
501 				continue;
502 
503 			_FinishJob(job);
504 		}
505 	}
506 }
507 
508 
509 void
510 Worker::_AbortJob(Job* job, bool removeFromTable)
511 {
512 	switch (job->State()) {
513 		case JOB_STATE_ABORTED:
514 			return;
515 
516 		case JOB_STATE_UNSCHEDULED:
517 			fUnscheduledJobs.Remove(job);
518 			fAbortedJobs.Add(job);
519 			break;
520 
521 		case JOB_STATE_WAITING:
522 		{
523 			Job* dependency = job->Dependency();
524 			if (dependency != NULL)
525 				dependency->DependentJobs().Remove(job);
526 			job->SetDependency(NULL);
527 			break;
528 		}
529 		case JOB_STATE_ACTIVE:
530 		case JOB_STATE_FAILED:
531 		case JOB_STATE_SUCCEEDED:
532 		default:
533 			break;
534 	}
535 
536 	job->SetState(JOB_STATE_ABORTED);
537 	if (removeFromTable)
538 		fJobs.Remove(job);
539 }
540 
541 
542 void
543 Worker::_FinishJob(Job* job)
544 {
545 	// wake up dependent jobs
546 	if (!job->DependentJobs().IsEmpty()) {
547 		job_wait_status waitStatus;
548 		switch (job->State()) {
549 			case JOB_STATE_ABORTED:
550 				waitStatus = JOB_DEPENDENCY_ABORTED;
551 				break;
552 			case JOB_STATE_FAILED:
553 				waitStatus = JOB_DEPENDENCY_FAILED;
554 				break;
555 			case JOB_STATE_SUCCEEDED:
556 				waitStatus = JOB_DEPENDENCY_SUCCEEDED;
557 				break;
558 
559 			case JOB_STATE_UNSCHEDULED:
560 			case JOB_STATE_WAITING:
561 			case JOB_STATE_ACTIVE:
562 			default:
563 				// should never happen
564 				waitStatus = JOB_DEPENDENCY_NOT_FOUND;
565 				break;
566 		}
567 
568 		while (Job* dependentJob = job->DependentJobs().RemoveHead()) {
569 			dependentJob->SetDependency(NULL);
570 			dependentJob->SetWaitStatus(waitStatus);
571 			fUnscheduledJobs.Add(dependentJob);
572 		}
573 
574 		release_sem(fWorkToDoSem);
575 	}
576 
577 	if (job->State() != JOB_STATE_ABORTED)
578 		fJobs.Remove(job);
579 	job->NotifyListeners();
580 	job->ReleaseReference();
581 }
582