1 /* 2 * Copyright 2011-2015, Haiku, Inc. All Rights Reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Axel Dörfler <axeld@pinc-software.de> 7 * Oliver Tappe <zooey@hirschkaefer.de> 8 */ 9 10 11 #include <JobQueue.h> 12 13 #include <set> 14 15 #include <Autolock.h> 16 #include <Job.h> 17 18 #include <JobPrivate.h> 19 20 21 namespace BSupportKit { 22 23 namespace BPrivate { 24 25 26 struct JobQueue::JobPriorityLess { 27 bool operator()(const BJob* left, const BJob* right) const; 28 }; 29 30 31 /*! Sort jobs by: 32 1. descending count of dependencies (only jobs without dependencies are 33 runnable) 34 2. job ticket number (order in which jobs were added to the queue) 35 */ 36 bool 37 JobQueue::JobPriorityLess::operator()(const BJob* left, const BJob* right) const 38 { 39 int32 difference = left->CountDependencies() - right->CountDependencies(); 40 if (difference < 0) 41 return true; 42 if (difference > 0) 43 return false; 44 45 return left->TicketNumber() < right->TicketNumber(); 46 }; 47 48 49 class JobQueue::JobPriorityQueue 50 : public std::set<BJob*, JobPriorityLess> { 51 }; 52 53 54 // #pragma mark - 55 56 57 JobQueue::JobQueue() 58 : 59 fLock("job queue"), 60 fNextTicketNumber(1) 61 { 62 fInitStatus = _Init(); 63 } 64 65 66 JobQueue::~JobQueue() 67 { 68 Close(); 69 delete fQueuedJobs; 70 } 71 72 73 status_t 74 JobQueue::InitCheck() const 75 { 76 return fInitStatus; 77 } 78 79 80 status_t 81 JobQueue::AddJob(BJob* job) 82 { 83 if (fQueuedJobs == NULL) 84 return B_NO_INIT; 85 86 BAutolock lock(&fLock); 87 if (!lock.IsLocked()) 88 return B_ERROR; 89 90 try { 91 if (!fQueuedJobs->insert(job).second) 92 return B_NAME_IN_USE; 93 } catch (const std::bad_alloc& e) { 94 return B_NO_MEMORY; 95 } catch (...) { 96 return B_ERROR; 97 } 98 BJob::Private(*job).SetTicketNumber(fNextTicketNumber++); 99 job->AddStateListener(this); 100 if (job->IsRunnable()) 101 release_sem(fHaveRunnableJobSem); 102 103 return B_OK; 104 } 105 106 107 status_t 108 JobQueue::RemoveJob(BJob* job) 109 { 110 if (fQueuedJobs == NULL) 111 return B_NO_INIT; 112 113 BAutolock lock(&fLock); 114 if (lock.IsLocked()) { 115 try { 116 if (fQueuedJobs->erase(job) == 0) 117 return B_NAME_NOT_FOUND; 118 } catch (...) { 119 return B_ERROR; 120 } 121 BJob::Private(*job).ClearTicketNumber(); 122 job->RemoveStateListener(this); 123 } 124 125 return B_OK; 126 } 127 128 129 void 130 JobQueue::JobSucceeded(BJob* job) 131 { 132 BAutolock lock(&fLock); 133 if (lock.IsLocked()) 134 _RequeueDependantJobsOf(job); 135 } 136 137 138 void 139 JobQueue::JobFailed(BJob* job) 140 { 141 BAutolock lock(&fLock); 142 if (lock.IsLocked()) 143 _RemoveDependantJobsOf(job); 144 } 145 146 147 BJob* 148 JobQueue::Pop() 149 { 150 BJob* job; 151 if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK) 152 return job; 153 154 return NULL; 155 } 156 157 158 status_t 159 JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job) 160 { 161 BAutolock lock(&fLock); 162 if (lock.IsLocked()) { 163 while (true) { 164 JobPriorityQueue::iterator head = fQueuedJobs->begin(); 165 if (head != fQueuedJobs->end()) { 166 if ((*head)->IsRunnable()) { 167 *_job = *head; 168 fQueuedJobs->erase(head); 169 return B_OK; 170 } 171 } else if (returnWhenEmpty) 172 return B_ENTRY_NOT_FOUND; 173 174 // we need to wait until a job becomes available/runnable 175 status_t result; 176 do { 177 lock.Unlock(); 178 result = acquire_sem_etc(fHaveRunnableJobSem, 1, 179 B_RELATIVE_TIMEOUT, timeout); 180 if (!lock.Lock()) 181 return B_ERROR; 182 } while (result == B_INTERRUPTED); 183 if (result != B_OK) 184 return result; 185 } 186 } 187 188 return B_ERROR; 189 } 190 191 192 size_t 193 JobQueue::CountJobs() const 194 { 195 BAutolock locker(fLock); 196 return fQueuedJobs->size(); 197 } 198 199 200 void 201 JobQueue::Close() 202 { 203 if (fHaveRunnableJobSem < 0) 204 return; 205 206 BAutolock lock(&fLock); 207 if (lock.IsLocked()) { 208 delete_sem(fHaveRunnableJobSem); 209 fHaveRunnableJobSem = -1; 210 211 if (fQueuedJobs != NULL) { 212 // get rid of all jobs 213 for (JobPriorityQueue::iterator iter = fQueuedJobs->begin(); 214 iter != fQueuedJobs->end(); ++iter) { 215 delete (*iter); 216 } 217 fQueuedJobs->clear(); 218 } 219 } 220 } 221 222 223 status_t 224 JobQueue::_Init() 225 { 226 status_t result = fLock.InitCheck(); 227 if (result != B_OK) 228 return result; 229 230 fQueuedJobs = new (std::nothrow) JobPriorityQueue(); 231 if (fQueuedJobs == NULL) 232 return B_NO_MEMORY; 233 234 fHaveRunnableJobSem = create_sem(0, "have runnable job"); 235 if (fHaveRunnableJobSem < 0) 236 return fHaveRunnableJobSem; 237 238 return B_OK; 239 } 240 241 242 void 243 JobQueue::_RequeueDependantJobsOf(BJob* job) 244 { 245 while (BJob* dependantJob = job->DependantJobAt(0)) { 246 JobPriorityQueue::iterator found = fQueuedJobs->find(dependantJob); 247 bool removed = false; 248 if (found != fQueuedJobs->end()) { 249 try { 250 fQueuedJobs->erase(dependantJob); 251 removed = true; 252 } catch (...) { 253 } 254 } 255 dependantJob->RemoveDependency(job); 256 if (removed) { 257 // Only insert a job if it was in our queue before 258 try { 259 fQueuedJobs->insert(dependantJob); 260 if (dependantJob->IsRunnable()) 261 release_sem(fHaveRunnableJobSem); 262 } catch (...) { 263 } 264 } 265 } 266 } 267 268 269 void 270 JobQueue::_RemoveDependantJobsOf(BJob* job) 271 { 272 while (BJob* dependantJob = job->DependantJobAt(0)) { 273 try { 274 fQueuedJobs->erase(dependantJob); 275 } catch (...) { 276 } 277 278 if (dependantJob->State() != B_JOB_STATE_ABORTED) { 279 BJob::Private(*dependantJob).SetState(B_JOB_STATE_ABORTED); 280 BJob::Private(*dependantJob).NotifyStateListeners(); 281 } 282 283 _RemoveDependantJobsOf(dependantJob); 284 dependantJob->RemoveDependency(job); 285 // TODO: we need some sort of ownership management 286 delete dependantJob; 287 } 288 } 289 290 291 } // namespace BPrivate 292 293 } // namespace BPackageKit 294