1 /*
2 * Copyright 2018-2022, Andrew Lindesay <apl@lindesay.co.nz>.
3 * All rights reserved. Distributed under the terms of the MIT License.
4 */
5
6
7 #include "ProcessCoordinator.h"
8
9 #include <AutoLocker.h>
10 #include <Catalog.h>
11 #include <StringFormat.h>
12 #include <Uuid.h>
13
14 #include "Logger.h"
15
16
17 #undef B_TRANSLATION_CONTEXT
18 #define B_TRANSLATION_CONTEXT "ProcessCoordinator"
19
20 #define LOCK_TIMEOUT_MICROS (1000 * 1000)
21
22 // These are keys that are used to store the ProcessCoordinatorState data into
23 // a BMessage instance.
24
25 #define KEY_PROCESS_COORDINATOR_IDENTIFIER "processCoordinatorIdentifier"
26 #define KEY_PROGRESS "progress"
27 #define KEY_MESSAGE "message"
28 #define KEY_IS_RUNNING "isRunning"
29 #define KEY_ERROR_STATUS "errorStatus"
30
31
32 // #pragma mark - ProcessCoordinatorState implementation
33
34
ProcessCoordinatorState(BMessage * from)35 ProcessCoordinatorState::ProcessCoordinatorState(BMessage* from)
36 {
37 if (from->FindString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
38 &fProcessCoordinatorIdentifier) != B_OK) {
39 HDFATAL("unable to find the key [%s]",
40 KEY_PROCESS_COORDINATOR_IDENTIFIER);
41 }
42
43 if (from->FindFloat(KEY_PROGRESS, &fProgress) != B_OK) {
44 HDFATAL("unable to find the key [%s]", KEY_PROGRESS);
45 }
46
47 if (from->FindString(KEY_MESSAGE, &fMessage) != B_OK) {
48 HDFATAL("unable to find the key [%s]", KEY_MESSAGE);
49 }
50
51 if (from->FindBool(KEY_IS_RUNNING, &fIsRunning) != B_OK) {
52 HDFATAL("unable to find the key [%s]", KEY_IS_RUNNING);
53 }
54
55 int64 errorStatusNumeric;
56 if (from->FindInt64(KEY_ERROR_STATUS, &errorStatusNumeric) != B_OK) {
57 HDFATAL("unable to find the key [%s]", KEY_ERROR_STATUS);
58 }
59 fErrorStatus = static_cast<status_t>(errorStatusNumeric);
60 }
61
62
ProcessCoordinatorState(const ProcessCoordinator * processCoordinator,float progress,const BString & message,bool isRunning,status_t errorStatus)63 ProcessCoordinatorState::ProcessCoordinatorState(
64 const ProcessCoordinator* processCoordinator, float progress,
65 const BString& message, bool isRunning, status_t errorStatus)
66 :
67 fProcessCoordinatorIdentifier(processCoordinator->Identifier()),
68 fProgress(progress),
69 fMessage(message),
70 fIsRunning(isRunning),
71 fErrorStatus(errorStatus)
72 {
73 }
74
75
~ProcessCoordinatorState()76 ProcessCoordinatorState::~ProcessCoordinatorState()
77 {
78 }
79
80
81 const BString
ProcessCoordinatorIdentifier() const82 ProcessCoordinatorState::ProcessCoordinatorIdentifier() const
83 {
84 return fProcessCoordinatorIdentifier;
85 }
86
87
88 float
Progress() const89 ProcessCoordinatorState::Progress() const
90 {
91 return fProgress;
92 }
93
94
95 BString
Message() const96 ProcessCoordinatorState::Message() const
97 {
98 return fMessage;
99 }
100
101
102 bool
IsRunning() const103 ProcessCoordinatorState::IsRunning() const
104 {
105 return fIsRunning;
106 }
107
108
109 status_t
ErrorStatus() const110 ProcessCoordinatorState::ErrorStatus() const
111 {
112 return fErrorStatus;
113 }
114
115
116 status_t
Archive(BMessage * into,bool deep) const117 ProcessCoordinatorState::Archive(BMessage* into, bool deep) const
118 {
119 status_t result = B_OK;
120 if (result == B_OK) {
121 result = into->AddString(KEY_PROCESS_COORDINATOR_IDENTIFIER,
122 fProcessCoordinatorIdentifier);
123 }
124 if (result == B_OK)
125 result = into->AddFloat(KEY_PROGRESS, fProgress);
126 if (result == B_OK)
127 result = into->AddString(KEY_MESSAGE, fMessage);
128 if (result == B_OK)
129 result = into->AddBool(KEY_IS_RUNNING, fIsRunning);
130 if (result == B_OK)
131 result = into->AddInt64(KEY_ERROR_STATUS, static_cast<int64>(fErrorStatus));
132 return result;
133 }
134
135
136 // #pragma mark - ProcessCoordinator implementation
137
138
ProcessCoordinator(const char * name,BMessage * message)139 ProcessCoordinator::ProcessCoordinator(const char* name, BMessage* message)
140 :
141 fName(name),
142 fLock(),
143 fCoordinateAndCallListenerRerun(false),
144 fCoordinateAndCallListenerRerunLock(),
145 fListener(NULL),
146 fMessage(message),
147 fWasStopped(false),
148 fIdentifier(BUuid().ToString())
149 {
150 }
151
152
~ProcessCoordinator()153 ProcessCoordinator::~ProcessCoordinator()
154 {
155 AutoLocker<BLocker> locker(&fLock);
156 for (int32 i = 0; i < fNodes.CountItems(); i++) {
157 AbstractProcessNode* node = fNodes.ItemAt(i);
158 node->Process()->SetListener(NULL);
159 delete node;
160 }
161 delete fMessage;
162 }
163
164 const BString&
Identifier() const165 ProcessCoordinator::Identifier() const
166 {
167 return fIdentifier;
168 }
169
170
171 void
SetListener(ProcessCoordinatorListener * listener)172 ProcessCoordinator::SetListener(ProcessCoordinatorListener* listener)
173 {
174 fListener = listener;
175 }
176
177
178 void
AddNode(AbstractProcessNode * node)179 ProcessCoordinator::AddNode(AbstractProcessNode* node)
180 {
181 AutoLocker<BLocker> locker(&fLock);
182 fNodes.AddItem(node);
183 node->SetListener(this);
184 node->Process()->SetListener(this);
185 }
186
187
188 void
ProcessChanged()189 ProcessCoordinator::ProcessChanged()
190 {
191 _CoordinateAndCallListener();
192 }
193
194
195 bool
IsRunning()196 ProcessCoordinator::IsRunning()
197 {
198 AutoLocker<BLocker> locker(&fLock);
199 for (int32 i = 0; i < fNodes.CountItems(); i++) {
200 AbstractProcessNode* node = fNodes.ItemAt(i);
201 if (node->IsRunning())
202 return true;
203 }
204
205 return false;
206 }
207
208
209 void
Start()210 ProcessCoordinator::Start()
211 {
212 _CoordinateAndCallListener();
213 }
214
215
216 void
RequestStop()217 ProcessCoordinator::RequestStop()
218 {
219 AutoLocker<BLocker> locker(&fLock);
220 if (!fWasStopped) {
221 fWasStopped = true;
222 HDINFO("[Coordinator] will stop process coordinator");
223 for (int32 i = 0; i < fNodes.CountItems(); i++) {
224 AbstractProcessNode* node = fNodes.ItemAt(i);
225 HDINFO("[Coordinator] stopping process [%s]",
226 node->Process()->Name());
227 node->RequestStop();
228 }
229 }
230 }
231
232
233 status_t
ErrorStatus()234 ProcessCoordinator::ErrorStatus()
235 {
236 AutoLocker<BLocker> locker(&fLock);
237 for (int32 i = 0; i < fNodes.CountItems(); i++) {
238 status_t result = fNodes.ItemAt(i)->Process()->ErrorStatus();
239
240 if (result != B_OK)
241 return result;
242 }
243
244 return B_OK;
245 }
246
247
248 float
Progress()249 ProcessCoordinator::Progress()
250 {
251 AutoLocker<BLocker> locker(&fLock);
252 float result = 0.0f;
253
254 if (!fWasStopped) {
255 int32 count = fNodes.CountItems();
256
257 // if there is only one then return it's value directly because this
258 // allows for the indeterminate state of -1.
259
260 if (count == 1)
261 result = fNodes.ItemAt(0)->Process()->Progress();
262 else {
263 float progressPerNode = 1.0f / ((float) count);
264
265 for (int32 i = count - 1; i >= 0; i--) {
266 AbstractProcess* process = fNodes.ItemAt(i)->Process();
267
268 switch(process->ProcessState()) {
269 case PROCESS_INITIAL:
270 break;
271 case PROCESS_RUNNING:
272 result += (progressPerNode * fmaxf(
273 0.0f, fminf(1.0, process->Progress())));
274 break;
275 case PROCESS_COMPLETE:
276 result += progressPerNode;
277 break;
278 }
279 }
280 }
281 }
282 return result;
283 }
284
285
286 const BString&
Name() const287 ProcessCoordinator::Name() const
288 {
289 return fName;
290 }
291
292
293 BMessage*
Message() const294 ProcessCoordinator::Message() const
295 {
296 return fMessage;
297 }
298
299
300 BString
_CreateStatusMessage()301 ProcessCoordinator::_CreateStatusMessage()
302 {
303 // work through the nodes and take a description from the first one. If
304 // there are others present then use a 'plus X others' suffix. Go backwards
305 // through the processes so that the most recent activity is shown first.
306
307 BString firstProcessDescription;
308 uint32 additionalRunningProcesses = 0;
309
310 for (int32 i = fNodes.CountItems() - 1; i >= 0; i--) {
311 AbstractProcess* process = fNodes.ItemAt(i)->Process();
312 if (process->ProcessState() == PROCESS_RUNNING) {
313 if (firstProcessDescription.IsEmpty()) {
314 if (strlen(process->Description()) != 0)
315 firstProcessDescription = process->Description();
316 else
317 additionalRunningProcesses++;
318 }
319 else
320 additionalRunningProcesses++;
321 }
322 }
323
324 if (firstProcessDescription.IsEmpty())
325 return "???";
326
327 if (additionalRunningProcesses == 0)
328 return firstProcessDescription;
329
330 static BStringFormat format(B_TRANSLATE(
331 "%FIRST_PROCESS_DESCRIPTION% +"
332 "{0, plural, one{# process} other{# processes}}"));
333 BString result;
334 format.Format(result, additionalRunningProcesses);
335 result.ReplaceAll("%FIRST_PROCESS_DESCRIPTION%", firstProcessDescription);
336
337 return result;
338 }
339
340
341 /*! This method assumes that a lock is held on the coordinator. */
342
343 ProcessCoordinatorState
_CreateStatus()344 ProcessCoordinator::_CreateStatus()
345 {
346 return ProcessCoordinatorState(
347 this, Progress(), _CreateStatusMessage(), IsRunning(), ErrorStatus());
348 }
349
350
351 /*! This will try to obtain the lock and if it cannot obtain the lock then
352 it will flag that when the coordinator has finished its current
353 coordination, it should initiate another coordination.
354 */
355 void
_CoordinateAndCallListener()356 ProcessCoordinator::_CoordinateAndCallListener()
357 {
358 if (fLock.LockWithTimeout(LOCK_TIMEOUT_MICROS) != B_OK) {
359 HDDEBUG("[Coordinator] would coordinate nodes, but coordination is "
360 "in progress - will defer");
361 AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
362 fCoordinateAndCallListenerRerun = true;
363 return;
364 }
365
366 ProcessCoordinatorState state = _Coordinate();
367
368 if (fListener != NULL)
369 fListener->CoordinatorChanged(state);
370
371 fLock.Unlock();
372
373 bool coordinateAndCallListenerRerun = false;
374
375 {
376 AutoLocker<BLocker> locker(&fCoordinateAndCallListenerRerunLock);
377 coordinateAndCallListenerRerun = fCoordinateAndCallListenerRerun;
378 fCoordinateAndCallListenerRerun = false;
379 }
380
381 if (coordinateAndCallListenerRerun) {
382 HDDEBUG("[Coordinator] will run deferred coordination");
383 _CoordinateAndCallListener();
384 }
385 }
386
387
388 ProcessCoordinatorState
_Coordinate()389 ProcessCoordinator::_Coordinate()
390 {
391 HDTRACE("[Coordinator] will coordinate nodes");
392 AutoLocker<BLocker> locker(&fLock);
393 _StopSuccessorNodesToErroredOrStoppedNodes();
394
395 // go through the nodes and find those that are still to be run and
396 // for which the preconditions are met to start.
397 for (int32 i = 0; i < fNodes.CountItems(); i++) {
398 AbstractProcessNode* node = fNodes.ItemAt(i);
399
400 if (node->Process()->ProcessState() == PROCESS_INITIAL) {
401 if (node->AllPredecessorsComplete())
402 node->Start();
403 else {
404 HDTRACE("[Coordinator] all predecessors not complete -> "
405 "[%s] not started", node->Process()->Name());
406 }
407 } else {
408 HDTRACE("[Coordinator] process [%s] running or complete",
409 node->Process()->Name());
410 }
411 }
412
413 return _CreateStatus();
414 }
415
416
417 /*! This method assumes that a lock is held on the coordinator. */
418
419 void
_StopSuccessorNodesToErroredOrStoppedNodes()420 ProcessCoordinator::_StopSuccessorNodesToErroredOrStoppedNodes()
421 {
422 for (int32 i = 0; i < fNodes.CountItems(); i++) {
423 AbstractProcessNode* node = fNodes.ItemAt(i);
424 AbstractProcess* process = node->Process();
425
426 if (process->WasStopped() || process->ErrorStatus() != B_OK)
427 _StopSuccessorNodes(node);
428 }
429 }
430
431
432 /*! This method assumes that a lock is held on the coordinator. */
433
434 void
_StopSuccessorNodes(AbstractProcessNode * predecessorNode)435 ProcessCoordinator::_StopSuccessorNodes(AbstractProcessNode* predecessorNode)
436 {
437 for (int32 i = 0; i < predecessorNode->CountSuccessors(); i++) {
438 AbstractProcessNode* node = predecessorNode->SuccessorAt(i);
439 AbstractProcess* process = node->Process();
440
441 if (process->ProcessState() == PROCESS_INITIAL) {
442 HDDEBUG("[Coordinator] [%s] (failed) --> [%s] (stopping)",
443 predecessorNode->Process()->Name(), process->Name());
444 node->RequestStop();
445 _StopSuccessorNodes(node);
446 }
447 }
448 }
449
450
451 int32
_CountNodesCompleted()452 ProcessCoordinator::_CountNodesCompleted()
453 {
454 int32 nodesCompleted = 0;
455 for (int32 i = 0; i < fNodes.CountItems(); i++) {
456 AbstractProcess *process = fNodes.ItemAt(i)->Process();
457 if (process->ProcessState() == PROCESS_COMPLETE)
458 nodesCompleted++;
459 }
460 return nodesCompleted;
461 }
462
463
464 BString
LogReport()465 ProcessCoordinator::LogReport()
466 {
467 BString result;
468 AutoLocker<BLocker> locker(&fLock);
469
470 for (int32 i = 0; i < fNodes.CountItems(); i++) {
471 if (0 != result.Length())
472 result.Append("\n");
473 AbstractProcessNode* node = fNodes.ItemAt(i);
474 result.Append(node->LogReport());
475 }
476
477 return result;
478 }
479