xref: /haiku/src/apps/haikudepot/process/ProcessCoordinator.cpp (revision e1c4049fed1047bdb957b0529e1921e97ef94770)
1 /*
2  * Copyright 2018-2022, Andrew Lindesay <apl@lindesay.co.nz>.
3  * All rights reserved. Distributed under the terms of the MIT License.
4  */
5 
6 
7 #include "ProcessCoordinator.h"
8 
9 #include <AutoLocker.h>
10 #include <Catalog.h>
11 #include <StringFormat.h>
12 #include <Uuid.h>
13 
14 #include "Logger.h"
15 
16 
17 #undef B_TRANSLATION_CONTEXT
18 #define B_TRANSLATION_CONTEXT "ProcessCoordinator"
19 
20 #define LOCK_TIMEOUT_MICROS (1000 * 1000)
21 
22 // These are keys that are used to store the ProcessCoordinatorState data into
23 // a BMessage instance.
24 
25 #define KEY_PROCESS_COORDINATOR_IDENTIFIER	"processCoordinatorIdentifier"
26 #define KEY_PROGRESS						"progress"
27 #define KEY_MESSAGE							"message"
28 #define KEY_IS_RUNNING						"isRunning"
29 #define KEY_ERROR_STATUS					"errorStatus"
30 
31 
32 // #pragma mark - ProcessCoordinatorState implementation
33 
34 
35 ProcessCoordinatorState::ProcessCoordinatorState(BMessage* from)
36 {
37 	if (from->FindString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
38 			&fProcessCoordinatorIdentifier) != B_OK) {
39 		HDFATAL("unable to find the key [%s]",
40 			KEY_PROCESS_COORDINATOR_IDENTIFIER);
41 	}
42 
43 	if (from->FindFloat(KEY_PROGRESS, &fProgress) != B_OK) {
44 		HDFATAL("unable to find the key [%s]", KEY_PROGRESS);
45 	}
46 
47 	if (from->FindString(KEY_MESSAGE, &fMessage) != B_OK) {
48 		HDFATAL("unable to find the key [%s]", KEY_MESSAGE);
49 	}
50 
51 	if (from->FindBool(KEY_IS_RUNNING, &fIsRunning) != B_OK) {
52 		HDFATAL("unable to find the key [%s]", KEY_IS_RUNNING);
53 	}
54 
55 	int64 errorStatusNumeric;
56 	if (from->FindInt64(KEY_ERROR_STATUS, &errorStatusNumeric) != B_OK) {
57 		HDFATAL("unable to find the key [%s]", KEY_ERROR_STATUS);
58 	}
59 	fErrorStatus = static_cast<status_t>(errorStatusNumeric);
60 }
61 
62 
63 ProcessCoordinatorState::ProcessCoordinatorState(
64 	const ProcessCoordinator* processCoordinator, float progress,
65 	const BString& message, bool isRunning, status_t errorStatus)
66 	:
67 	fProcessCoordinatorIdentifier(processCoordinator->Identifier()),
68 	fProgress(progress),
69 	fMessage(message),
70 	fIsRunning(isRunning),
71 	fErrorStatus(errorStatus)
72 {
73 }
74 
75 
76 ProcessCoordinatorState::~ProcessCoordinatorState()
77 {
78 }
79 
80 
81 const BString
82 ProcessCoordinatorState::ProcessCoordinatorIdentifier() const
83 {
84 	return fProcessCoordinatorIdentifier;
85 }
86 
87 
88 float
89 ProcessCoordinatorState::Progress() const
90 {
91 	return fProgress;
92 }
93 
94 
95 BString
96 ProcessCoordinatorState::Message() const
97 {
98 	return fMessage;
99 }
100 
101 
102 bool
103 ProcessCoordinatorState::IsRunning() const
104 {
105 	return fIsRunning;
106 }
107 
108 
109 status_t
110 ProcessCoordinatorState::ErrorStatus() const
111 {
112 	return fErrorStatus;
113 }
114 
115 
116 status_t
117 ProcessCoordinatorState::Archive(BMessage* into, bool deep) const
118 {
119 	status_t result = B_OK;
120 	if (result == B_OK) {
121 		result = into->AddString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
122 			fProcessCoordinatorIdentifier);
123 	}
124 	if (result == B_OK)
125 		result = into->AddFloat(KEY_PROGRESS, fProgress);
126 	if (result == B_OK)
127 		result = into->AddString(KEY_MESSAGE, fMessage);
128 	if (result == B_OK)
129 		result = into->AddBool(KEY_IS_RUNNING, fIsRunning);
130 	if (result == B_OK)
131 		result = into->AddInt64(KEY_ERROR_STATUS, static_cast<int64>(fErrorStatus));
132 	return result;
133 }
134 
135 
136 // #pragma mark - ProcessCoordinator implementation
137 
138 
139 ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message)
140 	:
141 	fName(name),
142 	fLock(),
143 	fCoordinateAndCallListenerRerun(false),
144 	fCoordinateAndCallListenerRerunLock(),
145 	fListener(NULL),
146 	fMessage(message),
147 	fWasStopped(false),
148 	fIdentifier(BUuid().ToString())
149 {
150 }
151 
152 
153 ProcessCoordinator::~ProcessCoordinator()
154 {
155 	AutoLocker<BLocker> locker(&fLock);
156 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
157 		AbstractProcessNode* node = fNodes.ItemAt(i);
158 		node->Process()->SetListener(NULL);
159 		delete node;
160 	}
161 	delete fMessage;
162 }
163 
164 const BString&
165 ProcessCoordinator::Identifier() const
166 {
167 	return fIdentifier;
168 }
169 
170 
171 void
172 ProcessCoordinator::SetListener(ProcessCoordinatorListener* listener)
173 {
174 	fListener = listener;
175 }
176 
177 
178 void
179 ProcessCoordinator::AddNode(AbstractProcessNode* node)
180 {
181 	AutoLocker<BLocker> locker(&fLock);
182 	fNodes.AddItem(node);
183 	node->SetListener(this);
184 	node->Process()->SetListener(this);
185 }
186 
187 
188 void
189 ProcessCoordinator::ProcessChanged()
190 {
191 	_CoordinateAndCallListener();
192 }
193 
194 
195 bool
196 ProcessCoordinator::IsRunning()
197 {
198 	AutoLocker<BLocker> locker(&fLock);
199 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
200 		AbstractProcessNode* node = fNodes.ItemAt(i);
201 		if (node->IsRunning())
202 			return true;
203 	}
204 
205 	return false;
206 }
207 
208 
209 void
210 ProcessCoordinator::Start()
211 {
212 	_CoordinateAndCallListener();
213 }
214 
215 
216 void
217 ProcessCoordinator::RequestStop()
218 {
219 	AutoLocker<BLocker> locker(&fLock);
220 	if (!fWasStopped) {
221 		fWasStopped = true;
222 		HDINFO("[Coordinator] will stop process coordinator");
223 		for (int32 i = 0; i < fNodes.CountItems(); i++) {
224 			AbstractProcessNode* node = fNodes.ItemAt(i);
225 			HDINFO("[Coordinator] stopping process [%s]",
226 				node->Process()->Name());
227 			node->RequestStop();
228 		}
229 	}
230 }
231 
232 
233 status_t
234 ProcessCoordinator::ErrorStatus()
235 {
236 	AutoLocker<BLocker> locker(&fLock);
237 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
238 		status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus();
239 
240 		if (result != B_OK)
241 			return result;
242 	}
243 
244 	return B_OK;
245 }
246 
247 
248 float
249 ProcessCoordinator::Progress()
250 {
251 	AutoLocker<BLocker> locker(&fLock);
252 	float result = 0.0f;
253 
254 	if (!fWasStopped) {
255 		int32 count = fNodes.CountItems();
256 
257 		// if there is only one then return it's value directly because this
258 		// allows for the indeterminate state of -1.
259 
260 		if (count == 1)
261 			result = fNodes.ItemAt(0)->Process()->Progress();
262 		else {
263 			float progressPerNode = 1.0f / ((float) count);
264 
265 			for (int32 i = count - 1; i >= 0; i--) {
266 				AbstractProcess* process = fNodes.ItemAt(i)->Process();
267 
268 				switch(process->ProcessState()) {
269 					case PROCESS_INITIAL:
270 						break;
271 					case PROCESS_RUNNING:
272 						result += (progressPerNode * fmaxf(
273 							0.0f, fminf(1.0, process->Progress())));
274 						break;
275 					case PROCESS_COMPLETE:
276 						result += progressPerNode;
277 						break;
278 				}
279 			}
280 		}
281 	}
282 	return result;
283 }
284 
285 
286 const BString&
287 ProcessCoordinator::Name() const
288 {
289 	return fName;
290 }
291 
292 
293 BMessage*
294 ProcessCoordinator::Message() const
295 {
296 	return fMessage;
297 }
298 
299 
300 BString
301 ProcessCoordinator::_CreateStatusMessage()
302 {
303 	// work through the nodes and take a description from the first one.  If
304 	// there are others present then use a 'plus X others' suffix.  Go backwards
305 	// through the processes so that the most recent activity is shown first.
306 
307 	BString firstProcessDescription;
308 	uint32 additionalRunningProcesses = 0;
309 
310 	for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) {
311 		AbstractProcess* process = fNodes.ItemAt(i)->Process();
312 		if (process->ProcessState() == PROCESS_RUNNING) {
313 			if (firstProcessDescription.IsEmpty()) {
314 				if (strlen(process->Description()) != 0)
315 					firstProcessDescription = process->Description();
316 				else
317 					additionalRunningProcesses++;
318 			}
319 			else
320 				additionalRunningProcesses++;
321 		}
322 	}
323 
324 	if (firstProcessDescription.IsEmpty())
325 		return "???";
326 
327 	if (additionalRunningProcesses == 0)
328 		return firstProcessDescription;
329 
330 	static BStringFormat format(B_TRANSLATE(
331 		"%FIRST_PROCESS_DESCRIPTION% +"
332 		"{0, plural, one{# process} other{# processes}}"));
333 	BString result;
334 	format.Format(result, additionalRunningProcesses);
335 	result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription);
336 
337 	return result;
338 }
339 
340 
341 /*! This method assumes that a lock is held on the coordinator. */
342 
343 ProcessCoordinatorState
344 ProcessCoordinator::_CreateStatus()
345 {
346 	return ProcessCoordinatorState(
347 		this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus());
348 }
349 
350 
351 /*! This will try to obtain the lock and if it cannot obtain the lock then
352     it will flag that when the coordinator has finished its current
353     coordination, it should initiate another coordination.
354  */
355 void
356 ProcessCoordinator::_CoordinateAndCallListener()
357 {
358 	if (fLock.LockWithTimeout(LOCK_TIMEOUT_MICROS) != B_OK) {
359 		HDDEBUG("[Coordinator] would coordinate nodes, but coordination is "
360 			"in progress - will defer");
361 		AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
362 		fCoordinateAndCallListenerRerun = true;
363 		return;
364 	}
365 
366 	ProcessCoordinatorState state = _Coordinate();
367 
368 	if (fListener != NULL)
369 		fListener->CoordinatorChanged(state);
370 
371 	fLock.Unlock();
372 
373 	bool coordinateAndCallListenerRerun = false;
374 
375 	{
376 		AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
377 		coordinateAndCallListenerRerun = fCoordinateAndCallListenerRerun;
378 		fCoordinateAndCallListenerRerun = false;
379 	}
380 
381 	if (coordinateAndCallListenerRerun) {
382 		HDDEBUG("[Coordinator] will run deferred coordination");
383 		_CoordinateAndCallListener();
384 	}
385 }
386 
387 
388 ProcessCoordinatorState
389 ProcessCoordinator::_Coordinate()
390 {
391 	HDTRACE("[Coordinator] will coordinate nodes");
392 	AutoLocker<BLocker> locker(&fLock);
393 	_StopSuccessorNodesToErroredOrStoppedNodes();
394 
395 	// go through the nodes and find those that are still to be run and
396 	// for which the preconditions are met to start.
397 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
398 		AbstractProcessNode* node = fNodes.ItemAt(i);
399 
400 		if (node->Process()->ProcessState() == PROCESS_INITIAL) {
401 			if (node->AllPredecessorsComplete())
402 				node->Start();
403 			else {
404 				HDTRACE("[Coordinator] all predecessors not complete -> "
405 					"[%s] not started", node->Process()->Name());
406 			}
407 		} else {
408 			HDTRACE("[Coordinator] process [%s] running or complete",
409 				node->Process()->Name());
410 		}
411 	}
412 
413 	return _CreateStatus();
414 }
415 
416 
417 /*! This method assumes that a lock is held on the coordinator. */
418 
419 void
420 ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes()
421 {
422 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
423 		AbstractProcessNode* node = fNodes.ItemAt(i);
424 		AbstractProcess* process = node->Process();
425 
426 		if (process->WasStopped() || process->ErrorStatus() != B_OK)
427 			_StopSuccessorNodes(node);
428 	}
429 }
430 
431 
432 /*! This method assumes that a lock is held on the coordinator. */
433 
434 void
435 ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode)
436 {
437 	for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) {
438 		AbstractProcessNode* node = predecessorNode->SuccessorAt(i);
439 		AbstractProcess* process = node->Process();
440 
441 		if (process->ProcessState() == PROCESS_INITIAL) {
442 			HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)",
443 				predecessorNode->Process()->Name(), process->Name());
444 			node->RequestStop();
445 			_StopSuccessorNodes(node);
446 		}
447 	}
448 }
449 
450 
451 int32
452 ProcessCoordinator::_CountNodesCompleted()
453 {
454 	int32 nodesCompleted = 0;
455 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
456 		AbstractProcess *process = fNodes.ItemAt(i)->Process();
457 		if (process->ProcessState() == PROCESS_COMPLETE)
458 			nodesCompleted++;
459 	}
460 	return nodesCompleted;
461 }
462 
463 
464 BString
465 ProcessCoordinator::LogReport()
466 {
467 	BString result;
468 	AutoLocker<BLocker> locker(&fLock);
469 
470 	for (int32 i = 0; i < fNodes.CountItems(); i++) {
471 		if (0 != result.Length())
472 			result.Append("\n");
473 		AbstractProcessNode* node = fNodes.ItemAt(i);
474 		result.Append(node->LogReport());
475 	}
476 
477 	return result;
478 }
479