1 /*
2 * Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
3 * Distributed under the terms of the MIT License.
4 */
5
6
7 #include "Worker.h"
8
9
10 static const bigtime_t kWorkerTimeout = 1000000;
11 // One second until a worker thread quits without a job
12
13 static const int32 kWorkerCountPerCPU = 3;
14
15 static int32 sWorkerCount;
16
17
Worker(JobQueue & queue)18 Worker::Worker(JobQueue& queue)
19 :
20 fThread(-1),
21 fJobQueue(queue)
22 {
23 }
24
25
~Worker()26 Worker::~Worker()
27 {
28 }
29
30
31 status_t
Init()32 Worker::Init()
33 {
34 fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY,
35 this);
36 if (fThread < 0)
37 return fThread;
38
39 status_t status = resume_thread(fThread);
40 if (status == B_OK)
41 atomic_add(&sWorkerCount, 1);
42
43 return status;
44 }
45
46
47 status_t
Process()48 Worker::Process()
49 {
50 while (true) {
51 BJob* job;
52 status_t status = fJobQueue.Pop(Timeout(), false, &job);
53 if (status != B_OK)
54 return status;
55
56 status = Run(job);
57 if (status != B_OK) {
58 // TODO: proper error reporting on failed job!
59 debug_printf("Launching %s failed: %s\n", job->Title().String(),
60 strerror(status));
61 }
62 }
63 }
64
65
66 bigtime_t
Timeout() const67 Worker::Timeout() const
68 {
69 return kWorkerTimeout;
70 }
71
72
73 const char*
Name() const74 Worker::Name() const
75 {
76 return "worker";
77 }
78
79
80 status_t
Run(BJob * job)81 Worker::Run(BJob* job)
82 {
83 return job->Run();
84 }
85
86
87 /*static*/ status_t
_Process(void * _self)88 Worker::_Process(void* _self)
89 {
90 Worker* self = (Worker*)_self;
91 status_t status = self->Process();
92 delete self;
93
94 return status;
95 }
96
97
98 // #pragma mark -
99
100
MainWorker(JobQueue & queue)101 MainWorker::MainWorker(JobQueue& queue)
102 :
103 Worker(queue),
104 fMaxWorkerCount(kWorkerCountPerCPU)
105 {
106 // TODO: keep track of workers, and quit them on destruction
107 system_info info;
108 if (get_system_info(&info) == B_OK)
109 fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU;
110 }
111
112
113 bigtime_t
Timeout() const114 MainWorker::Timeout() const
115 {
116 return B_INFINITE_TIMEOUT;
117 }
118
119
120 const char*
Name() const121 MainWorker::Name() const
122 {
123 return "main worker";
124 }
125
126
127 status_t
Run(BJob * job)128 MainWorker::Run(BJob* job)
129 {
130 int32 count = atomic_get(&sWorkerCount);
131
132 size_t jobCount = fJobQueue.CountJobs();
133 if (jobCount > INT_MAX)
134 jobCount = INT_MAX;
135
136 if ((int32)jobCount > count && count < fMaxWorkerCount) {
137 Worker* worker = new Worker(fJobQueue);
138 worker->Init();
139 }
140
141 return Worker::Run(job);
142 }
143