xref: /haiku/src/kits/support/JobQueue.cpp (revision 6ff95509c2e9df2cf0e423e1eb37eb31d92ec511)
1 /*
2  * Copyright 2011-2015, Haiku, Inc. All Rights Reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *		Axel Dörfler <axeld@pinc-software.de>
7  *		Oliver Tappe <zooey@hirschkaefer.de>
8  */
9 
10 
11 #include <JobQueue.h>
12 
13 #include <set>
14 
15 #include <Autolock.h>
16 #include <Job.h>
17 
18 #include <JobPrivate.h>
19 
20 
21 namespace BSupportKit {
22 
23 namespace BPrivate {
24 
25 
26 struct JobQueue::JobPriorityLess {
27 	bool operator()(const BJob* left, const BJob* right) const;
28 };
29 
30 
31 /*!	Sort jobs by:
32 		1. descending count of dependencies (only jobs without dependencies are
33 		   runnable)
34 		2. job ticket number (order in which jobs were added to the queue)
35 */
36 bool
37 JobQueue::JobPriorityLess::operator()(const BJob* left, const BJob* right) const
38 {
39 	int32 difference = left->CountDependencies() - right->CountDependencies();
40 	if (difference < 0)
41 		return true;
42 	if (difference > 0)
43 		return false;
44 
45 	return left->TicketNumber() < right->TicketNumber();
46 };
47 
48 
49 class JobQueue::JobPriorityQueue
50 	: public std::set<BJob*, JobPriorityLess> {
51 };
52 
53 
54 // #pragma mark -
55 
56 
57 JobQueue::JobQueue()
58 	:
59 	fLock("job queue"),
60 	fNextTicketNumber(1)
61 {
62 	fInitStatus = _Init();
63 }
64 
65 
66 JobQueue::~JobQueue()
67 {
68 	Close();
69 	delete fQueuedJobs;
70 }
71 
72 
73 status_t
74 JobQueue::InitCheck() const
75 {
76 	return fInitStatus;
77 }
78 
79 
80 status_t
81 JobQueue::AddJob(BJob* job)
82 {
83 	if (fQueuedJobs == NULL)
84 		return B_NO_INIT;
85 
86 	BAutolock lock(&fLock);
87 	if (lock.IsLocked()) {
88 		try {
89 			if (!fQueuedJobs->insert(job).second)
90 				return B_NAME_IN_USE;
91 		} catch (const std::bad_alloc& e) {
92 			return B_NO_MEMORY;
93 		} catch (...) {
94 			return B_ERROR;
95 		}
96 		BJob::Private(*job).SetTicketNumber(fNextTicketNumber++);
97 		job->AddStateListener(this);
98 		if (job->IsRunnable())
99 			release_sem(fHaveRunnableJobSem);
100 	}
101 
102 	return B_OK;
103 }
104 
105 
106 status_t
107 JobQueue::RemoveJob(BJob* job)
108 {
109 	if (fQueuedJobs == NULL)
110 		return B_NO_INIT;
111 
112 	BAutolock lock(&fLock);
113 	if (lock.IsLocked()) {
114 		try {
115 			if (fQueuedJobs->erase(job) == 0)
116 				return B_NAME_NOT_FOUND;
117 		} catch (...) {
118 			return B_ERROR;
119 		}
120 		BJob::Private(*job).ClearTicketNumber();
121 		job->RemoveStateListener(this);
122 	}
123 
124 	return B_OK;
125 }
126 
127 
128 void
129 JobQueue::JobSucceeded(BJob* job)
130 {
131 	BAutolock lock(&fLock);
132 	if (lock.IsLocked())
133 		_RequeueDependantJobsOf(job);
134 }
135 
136 
137 void
138 JobQueue::JobFailed(BJob* job)
139 {
140 	BAutolock lock(&fLock);
141 	if (lock.IsLocked())
142 		_RemoveDependantJobsOf(job);
143 }
144 
145 
146 BJob*
147 JobQueue::Pop()
148 {
149 	BJob* job;
150 	if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK)
151 		return job;
152 
153 	return NULL;
154 }
155 
156 
157 status_t
158 JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job)
159 {
160 	BAutolock lock(&fLock);
161 	if (lock.IsLocked()) {
162 		while (true) {
163 			JobPriorityQueue::iterator head = fQueuedJobs->begin();
164 			if (head != fQueuedJobs->end()) {
165 				if ((*head)->IsRunnable()) {
166 					*_job = *head;
167 					fQueuedJobs->erase(head);
168 					return B_OK;
169 				}
170 			} else if (returnWhenEmpty)
171 				return B_ENTRY_NOT_FOUND;
172 
173 			// we need to wait until a job becomes available/runnable
174 			status_t result;
175 			do {
176 				lock.Unlock();
177 				result = acquire_sem_etc(fHaveRunnableJobSem, 1,
178 					B_RELATIVE_TIMEOUT, timeout);
179 				if (!lock.Lock())
180 					return B_ERROR;
181 			} while (result == B_INTERRUPTED);
182 			if (result != B_OK)
183 				return result;
184 		}
185 	}
186 
187 	return B_ERROR;
188 }
189 
190 
191 size_t
192 JobQueue::CountJobs() const
193 {
194 	BAutolock locker(fLock);
195 	return fQueuedJobs->size();
196 }
197 
198 
199 void
200 JobQueue::Close()
201 {
202 	if (fHaveRunnableJobSem < 0)
203 		return;
204 
205 	BAutolock lock(&fLock);
206 	if (lock.IsLocked()) {
207 		delete_sem(fHaveRunnableJobSem);
208 		fHaveRunnableJobSem = -1;
209 
210 		if (fQueuedJobs != NULL) {
211 			// get rid of all jobs
212 			for (JobPriorityQueue::iterator iter = fQueuedJobs->begin();
213 				iter != fQueuedJobs->end(); ++iter) {
214 				delete (*iter);
215 			}
216 			fQueuedJobs->clear();
217 		}
218 	}
219 }
220 
221 
222 status_t
223 JobQueue::_Init()
224 {
225 	status_t result = fLock.InitCheck();
226 	if (result != B_OK)
227 		return result;
228 
229 	fQueuedJobs = new (std::nothrow) JobPriorityQueue();
230 	if (fQueuedJobs == NULL)
231 		return B_NO_MEMORY;
232 
233 	fHaveRunnableJobSem = create_sem(0, "have runnable job");
234 	if (fHaveRunnableJobSem < 0)
235 		return fHaveRunnableJobSem;
236 
237 	return B_OK;
238 }
239 
240 
241 void
242 JobQueue::_RequeueDependantJobsOf(BJob* job)
243 {
244 	while (BJob* dependantJob = job->DependantJobAt(0)) {
245 		try {
246 			fQueuedJobs->erase(dependantJob);
247 		} catch (...) {
248 		}
249 		dependantJob->RemoveDependency(job);
250 		try {
251 			fQueuedJobs->insert(dependantJob);
252 			if (dependantJob->IsRunnable())
253 				release_sem(fHaveRunnableJobSem);
254 		} catch (...) {
255 		}
256 	}
257 }
258 
259 
260 void
261 JobQueue::_RemoveDependantJobsOf(BJob* job)
262 {
263 	while (BJob* dependantJob = job->DependantJobAt(0)) {
264 		try {
265 			fQueuedJobs->erase(dependantJob);
266 		} catch (...) {
267 		}
268 		_RemoveDependantJobsOf(dependantJob);
269 		dependantJob->RemoveDependency(job);
270 		delete dependantJob;
271 	}
272 }
273 
274 
275 }	// namespace BPrivate
276 
277 }	// namespace BPackageKit
278