xref: /haiku/src/apps/haikudepot/process/ProcessCoordinator.cpp (revision cbe0a0c436162d78cc3f92a305b64918c839d079)
1 /*
2  * Copyright 2018-2020, Andrew Lindesay <apl@lindesay.co.nz>.
3  * All rights reserved. Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "ProcessCoordinator.h"
8 
9 #include <AutoLocker.h>
10 #include <Catalog.h>
11 #include <StringFormat.h>
12 
13 #include "Logger.h"
14 
15 
16 #undef B_TRANSLATION_CONTEXT
17 #define B_TRANSLATION_CONTEXT "ProcessCoordinator"
18 
19 
20 // #pragma mark - ProcessCoordinatorState implementation
21 
22 
23 ProcessCoordinatorState::ProcessCoordinatorState(
24 	const ProcessCoordinator* processCoordinator, float progress,
25 	const BString& message, bool isRunning, status_t errorStatus)
26 	:
27 	fProcessCoordinator(processCoordinator),
28 	fProgress(progress),
29 	fMessage(message),
30 	fIsRunning(isRunning),
31 	fErrorStatus(errorStatus)
32 {
33 }
34 
35 
36 ProcessCoordinatorState::~ProcessCoordinatorState()
37 {
38 }
39 
40 
41 const ProcessCoordinator*
42 ProcessCoordinatorState::Coordinator() const
43 {
44 	return fProcessCoordinator;
45 }
46 
47 
48 float
49 ProcessCoordinatorState::Progress() const
50 {
51 	return fProgress;
52 }
53 
54 
55 BString
56 ProcessCoordinatorState::Message() const
57 {
58 	return fMessage;
59 }
60 
61 
62 bool
63 ProcessCoordinatorState::IsRunning() const
64 {
65 	return fIsRunning;
66 }
67 
68 
69 status_t
70 ProcessCoordinatorState::ErrorStatus() const
71 {
72 	return fErrorStatus;
73 }
74 
75 
76 // #pragma mark - ProcessCoordinator implementation
77 
78 
79 ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message)
80 	:
81 	fName(name),
82 	fListener(NULL),
83 	fMessage(message),
84 	fWasStopped(false)
85 {
86 }
87 
88 
89 ProcessCoordinator::~ProcessCoordinator()
90 {
91 	AutoLocker<BLocker> locker(&fLock);
92 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
93 		AbstractProcessNode* node = fNodes.ItemAt(i);
94 		node->Process()->SetListener(NULL);
95 		delete node;
96 	}
97 	delete fMessage;
98 }
99 
100 
101 void
102 ProcessCoordinator::SetListener(ProcessCoordinatorListener *listener)
103 {
104 	fListener = listener;
105 }
106 
107 
108 void
109 ProcessCoordinator::AddNode(AbstractProcessNode* node)
110 {
111 	AutoLocker<BLocker> locker(&fLock);
112 	fNodes.AddItem(node);
113 	node->Process()->SetListener(this);
114 }
115 
116 
117 void
118 ProcessCoordinator::ProcessChanged()
119 {
120 	_CoordinateAndCallListener();
121 }
122 
123 
124 bool
125 ProcessCoordinator::IsRunning()
126 {
127 	AutoLocker<BLocker> locker(&fLock);
128 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
129 		if (_IsRunning(fNodes.ItemAt(i)))
130 			return true;
131 	}
132 
133 	return false;
134 }
135 
136 
137 void
138 ProcessCoordinator::Start()
139 {
140 	_CoordinateAndCallListener();
141 }
142 
143 
144 void
145 ProcessCoordinator::RequestStop()
146 {
147 	AutoLocker<BLocker> locker(&fLock);
148 	if (!fWasStopped) {
149 		fWasStopped = true;
150 		HDINFO("[Coordinator] will stop process coordinator");
151 		for (int32 i = 0; i < fNodes.CountItems(); i++) {
152 			AbstractProcessNode* node = fNodes.ItemAt(i);
153 			HDINFO("[Coordinator] stopping process [%s]",
154 				node->Process()->Name());
155 			node->RequestStop();
156 		}
157 	}
158 }
159 
160 
161 status_t
162 ProcessCoordinator::ErrorStatus()
163 {
164 	AutoLocker<BLocker> locker(&fLock);
165 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
166 		status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus();
167 
168 		if (result != B_OK)
169 			return result;
170 	}
171 
172 	return B_OK;
173 }
174 
175 
176 float
177 ProcessCoordinator::Progress()
178 {
179 	AutoLocker<BLocker> locker(&fLock);
180 	float result = 0.0f;
181 
182 	if (!fWasStopped) {
183 		int32 count = fNodes.CountItems();
184 
185 		// if there is only one then return it's value directly because this
186 		// allows for the indeterminate state of -1.
187 
188 		if (count == 1)
189 			result = fNodes.ItemAt(0)->Process()->Progress();
190 		else {
191 			float progressPerNode = 1.0f / ((float) count);
192 
193 			for (int32 i = count - 1; i >= 0; i--) {
194 				AbstractProcess* process = fNodes.ItemAt(i)->Process();
195 
196 				switch(process->ProcessState()) {
197 					case PROCESS_INITIAL:
198 						break;
199 					case PROCESS_RUNNING:
200 						result += (progressPerNode * fmaxf(
201 							0.0f, fminf(1.0, process->Progress())));
202 						break;
203 					case PROCESS_COMPLETE:
204 						result += progressPerNode;
205 						break;
206 				}
207 			}
208 		}
209 	}
210 	return result;
211 }
212 
213 
214 const BString&
215 ProcessCoordinator::Name() const
216 {
217 	return fName;
218 }
219 
220 
221 BMessage*
222 ProcessCoordinator::Message() const
223 {
224 	return fMessage;
225 }
226 
227 
228 BString
229 ProcessCoordinator::_CreateStatusMessage()
230 {
231 	// work through the nodes and take a description from the first one.  If
232 	// there are others present then use a 'plus X others' suffix.  Go backwards
233 	// through the processes so that the most recent activity is shown first.
234 
235 	BString firstProcessDescription;
236 	uint32 additionalRunningProcesses = 0;
237 
238 	for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) {
239 		AbstractProcess* process = fNodes.ItemAt(i)->Process();
240 
241 		if (process->ProcessState() == PROCESS_RUNNING) {
242 			if (firstProcessDescription.IsEmpty()) {
243 				firstProcessDescription = process->Description();
244 			} else {
245 				additionalRunningProcesses++;
246 			}
247 		}
248 	}
249 
250 	if (firstProcessDescription.IsEmpty())
251 		return "???";
252 
253 	if (additionalRunningProcesses == 0)
254 		return firstProcessDescription;
255 
256 	static BStringFormat format(B_TRANSLATE(
257 		"%FIRST_PROCESS_DESCRIPTION% +"
258 		"{0, plural, one{# process} other{# processes}}"));
259 	BString result;
260 	format.Format(result, additionalRunningProcesses);
261 	result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription);
262 
263 	return result;
264 }
265 
266 
267 /*! This method assumes that a lock is held on the coordinator. */
268 
269 ProcessCoordinatorState
270 ProcessCoordinator::_CreateStatus()
271 {
272 	return ProcessCoordinatorState(
273 		this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus());
274 }
275 
276 
277 void
278 ProcessCoordinator::_CoordinateAndCallListener()
279 {
280 	ProcessCoordinatorState state = _Coordinate();
281 
282 	if (fListener != NULL)
283 		fListener->CoordinatorChanged(state);
284 }
285 
286 
287 ProcessCoordinatorState
288 ProcessCoordinator::_Coordinate()
289 {
290 	HDTRACE("[Coordinator] will coordinate nodes");
291 	AutoLocker<BLocker> locker(&fLock);
292 	_StopSuccessorNodesToErroredOrStoppedNodes();
293 
294 	// go through the nodes and find those that are still to be run and
295 	// for which the preconditions are met to start.
296 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
297 		AbstractProcessNode* node = fNodes.ItemAt(i);
298 
299 		if (node->Process()->ProcessState() == PROCESS_INITIAL) {
300 			if (node->AllPredecessorsComplete())
301 				node->Start();
302 			else {
303 				HDTRACE("[Coordinator] all predecessors not complete -> "
304 					"[%s] not started", node->Process()->Name());
305 			}
306 		} else {
307 			HDTRACE("[Coordinator] process [%s] running or complete",
308 				node->Process()->Name());
309 		}
310 	}
311 
312 	return _CreateStatus();
313 }
314 
315 
316 /*! This method assumes that a lock is held on the coordinator. */
317 
318 void
319 ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes()
320 {
321 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
322 		AbstractProcessNode* node = fNodes.ItemAt(i);
323 		AbstractProcess* process = node->Process();
324 
325 		if (process->WasStopped() || process->ErrorStatus() != B_OK)
326 			_StopSuccessorNodes(node);
327 	}
328 }
329 
330 
331 /*! This method assumes that a lock is held on the coordinator. */
332 
333 void
334 ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode)
335 {
336 	for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) {
337 		AbstractProcessNode* node = predecessorNode->SuccessorAt(i);
338 		AbstractProcess* process = node->Process();
339 
340 		if (process->ProcessState() == PROCESS_INITIAL) {
341 			HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)",
342 				predecessorNode->Process()->Name(), process->Name());
343 			node->RequestStop();
344 			_StopSuccessorNodes(node);
345 		}
346 	}
347 }
348 
349 
350 bool
351 ProcessCoordinator::_IsRunning(AbstractProcessNode* node)
352 {
353 	return node->Process()->ProcessState() != PROCESS_COMPLETE;
354 }
355 
356 
357 int32
358 ProcessCoordinator::_CountNodesCompleted()
359 {
360 	int32 nodesCompleted = 0;
361 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
362 		AbstractProcess *process = fNodes.ItemAt(i)->Process();
363 		if (process->ProcessState() == PROCESS_COMPLETE)
364 			nodesCompleted++;
365 	}
366 	return nodesCompleted;
367 }
368