1 /* 2 * Copyright 2015, Axel Dörfler, axeld@pinc-software.de. 3 * Distributed under the terms of the MIT License. 4 */ 5 6 7 #include "Worker.h" 8 9 10 static const bigtime_t kWorkerTimeout = 1000000; 11 // One second until a worker thread quits without a job 12 13 static const int32 kWorkerCountPerCPU = 3; 14 15 static int32 sWorkerCount; 16 17 18 Worker::Worker(JobQueue& queue) 19 : 20 fThread(-1), 21 fJobQueue(queue) 22 { 23 } 24 25 26 Worker::~Worker() 27 { 28 } 29 30 31 status_t 32 Worker::Init() 33 { 34 fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY, 35 this); 36 if (fThread < 0) 37 return fThread; 38 39 status_t status = resume_thread(fThread); 40 if (status == B_OK) 41 atomic_add(&sWorkerCount, 1); 42 43 return status; 44 } 45 46 47 status_t 48 Worker::Process() 49 { 50 while (true) { 51 BJob* job; 52 status_t status = fJobQueue.Pop(Timeout(), false, &job); 53 if (status != B_OK) 54 return status; 55 56 status = Run(job); 57 if (status != B_OK) { 58 // TODO: proper error reporting on failed job! 59 debug_printf("Launching %s failed: %s\n", job->Title().String(), 60 strerror(status)); 61 } 62 } 63 } 64 65 66 bigtime_t 67 Worker::Timeout() const 68 { 69 return kWorkerTimeout; 70 } 71 72 73 const char* 74 Worker::Name() const 75 { 76 return "worker"; 77 } 78 79 80 status_t 81 Worker::Run(BJob* job) 82 { 83 return job->Run(); 84 } 85 86 87 /*static*/ status_t 88 Worker::_Process(void* _self) 89 { 90 Worker* self = (Worker*)_self; 91 status_t status = self->Process(); 92 delete self; 93 94 return status; 95 } 96 97 98 // #pragma mark - 99 100 101 MainWorker::MainWorker(JobQueue& queue) 102 : 103 Worker(queue), 104 fMaxWorkerCount(kWorkerCountPerCPU) 105 { 106 // TODO: keep track of workers, and quit them on destruction 107 system_info info; 108 if (get_system_info(&info) == B_OK) 109 fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU; 110 } 111 112 113 bigtime_t 114 MainWorker::Timeout() const 115 { 116 return B_INFINITE_TIMEOUT; 117 } 118 119 120 const char* 121 MainWorker::Name() const 122 { 123 return "main worker"; 124 } 125 126 127 status_t 128 MainWorker::Run(BJob* job) 129 { 130 int32 count = atomic_get(&sWorkerCount); 131 132 size_t jobCount = fJobQueue.CountJobs(); 133 if (jobCount > INT_MAX) 134 jobCount = INT_MAX; 135 136 if ((int32)jobCount > count && count < fMaxWorkerCount) { 137 Worker* worker = new Worker(fJobQueue); 138 worker->Init(); 139 } 140 141 return Worker::Run(job); 142 } 143