xref: /haiku/src/servers/launch/Worker.cpp (revision 220c5364ab445eb20376f964ade2f7d3a09e163a)
1 /*
2  * Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
3  * Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "Worker.h"
8 
9 
10 static const bigtime_t kWorkerTimeout = 1000000;
11 	// One second until a worker thread quits without a job
12 
13 static const int32 kWorkerCountPerCPU = 3;
14 
15 static int32 sWorkerCount;
16 
17 
18 Worker::Worker(JobQueue& queue)
19 	:
20 	fThread(-1),
21 	fJobQueue(queue)
22 {
23 }
24 
25 
26 Worker::~Worker()
27 {
28 }
29 
30 
31 status_t
32 Worker::Init()
33 {
34 	fThread = spawn_thread(&Worker::_Process, Name(), B_NORMAL_PRIORITY,
35 		this);
36 	if (fThread < 0)
37 		return fThread;
38 
39 	status_t status = resume_thread(fThread);
40 	if (status == B_OK)
41 		atomic_add(&sWorkerCount, 1);
42 
43 	return status;
44 }
45 
46 
47 status_t
48 Worker::Process()
49 {
50 	while (true) {
51 		BJob* job;
52 		status_t status = fJobQueue.Pop(Timeout(), false, &job);
53 		if (status != B_OK)
54 			return status;
55 
56 		status = Run(job);
57 		if (status != B_OK) {
58 			// TODO: proper error reporting on failed job!
59 			debug_printf("Launching %s failed: %s\n", job->Title().String(),
60 				strerror(status));
61 		}
62 	}
63 }
64 
65 
66 bigtime_t
67 Worker::Timeout() const
68 {
69 	return kWorkerTimeout;
70 }
71 
72 
73 const char*
74 Worker::Name() const
75 {
76 	return "worker";
77 }
78 
79 
80 status_t
81 Worker::Run(BJob* job)
82 {
83 	return job->Run();
84 }
85 
86 
87 /*static*/ status_t
88 Worker::_Process(void* _self)
89 {
90 	Worker* self = (Worker*)_self;
91 	status_t status = self->Process();
92 	delete self;
93 
94 	return status;
95 }
96 
97 
98 // #pragma mark -
99 
100 
101 MainWorker::MainWorker(JobQueue& queue)
102 	:
103 	Worker(queue),
104 	fMaxWorkerCount(kWorkerCountPerCPU)
105 {
106 	// TODO: keep track of workers, and quit them on destruction
107 	system_info info;
108 	if (get_system_info(&info) == B_OK)
109 		fMaxWorkerCount = info.cpu_count * kWorkerCountPerCPU;
110 }
111 
112 
113 bigtime_t
114 MainWorker::Timeout() const
115 {
116 	return B_INFINITE_TIMEOUT;
117 }
118 
119 
120 const char*
121 MainWorker::Name() const
122 {
123 	return "main worker";
124 }
125 
126 
127 status_t
128 MainWorker::Run(BJob* job)
129 {
130 	int32 count = atomic_get(&sWorkerCount);
131 
132 	size_t jobCount = fJobQueue.CountJobs();
133 	if (jobCount > INT_MAX)
134 		jobCount = INT_MAX;
135 
136 	if ((int32)jobCount > count && count < fMaxWorkerCount) {
137 		Worker* worker = new Worker(fJobQueue);
138 		worker->Init();
139 	}
140 
141 	return Worker::Run(job);
142 }
143