xref: /haiku/src/apps/cortex/support/ObservableLooper.cpp (revision c90684742e7361651849be4116d0e5de3a817194)
1 // ObservableLooper.cpp
2 
3 #include "ObservableLooper.h"
4 
5 #include <Debug.h>
6 #include <MessageRunner.h>
7 
8 __USE_CORTEX_NAMESPACE
9 
10 
11 // ---------------------------------------------------------------- //
12 // *** deletion
13 // ---------------------------------------------------------------- //
14 
15 // clients must call release() rather than deleting,
16 // to ensure that all observers are notified of the
17 // object's demise.  if the object has already been
18 // released, return an error.
19 
20 status_t ObservableLooper::release() {
21 
22 	// +++++ what if I'm not running?
23 	// +++++ is a lock necessary?
24 
25 	if(isReleased())
26 		return B_NOT_ALLOWED;
27 
28 	// send request through proper channels
29 	BMessenger(this).SendMessage(B_QUIT_REQUESTED);
30 
31 	return B_OK;
32 }
33 
34 // ---------------------------------------------------------------- //
35 // *** ctor/dtor
36 // ---------------------------------------------------------------- //
37 
38 ObservableLooper::~ObservableLooper() {
39 	if(CountTargets()) {
40 		PRINT((
41 			"*** ~ObservableLooper() '%s': %ld observers remain\n",
42 			Name(), CountTargets()));
43 	}
44 	if(m_executioner)
45 		delete m_executioner;
46 }
47 
48 ObservableLooper::ObservableLooper(
49 	const char*							name,
50 	int32										priority,
51 	int32										portCapacity,
52 	bigtime_t								quitTimeout) :
53 	BLooper(name, priority, portCapacity),
54 	m_quitTimeout(quitTimeout),
55 	m_executioner(0),
56 	m_quitting(false) {}
57 
58 ObservableLooper::ObservableLooper(
59 	BMessage*								archive) :
60 	BLooper(archive),
61 	m_quitTimeout(B_INFINITE_TIMEOUT),
62 	m_executioner(0),
63 	m_quitting(false) {
64 
65 	archive->FindInt64("quitTimeout", (int64*)&m_quitTimeout);
66 }
67 
68 // ---------------------------------------------------------------- //
69 // *** accessors
70 // ---------------------------------------------------------------- //
71 
72 bool ObservableLooper::isReleased() const {
73 	return m_quitting;
74 }
75 
76 // ---------------------------------------------------------------- //
77 // *** hooks
78 // ---------------------------------------------------------------- //
79 
80 // sends M_OBSERVER_ADDED to the newly-added observer
81 void ObservableLooper::observerAdded(
82 	const BMessenger&				observer) {
83 
84 	BMessage m(M_OBSERVER_ADDED);
85 	m.AddMessenger("target", BMessenger(this));
86 	observer.SendMessage(&m);
87 }
88 
89 // sends M_OBSERVER_REMOVED to the newly-removed observer
90 void ObservableLooper::observerRemoved(
91 	const BMessenger&				observer) {
92 
93 	BMessage m(M_OBSERVER_REMOVED);
94 	m.AddMessenger("target", BMessenger(this));
95 	observer.SendMessage(&m);
96 }
97 
98 // ---------------------------------------------------------------- //
99 // *** internal operations
100 // ---------------------------------------------------------------- //
101 
102 status_t ObservableLooper::notify(
103 	BMessage*								message) {
104 	ASSERT(IsLocked());
105 
106 	return Invoke(message);
107 }
108 
109 // sends M_RELEASE_OBSERVABLE
110 void ObservableLooper::notifyRelease() {
111 	BMessage m(M_RELEASE_OBSERVABLE);
112 	m.AddMessenger("target", BMessenger(this));
113 	notify(&m);
114 }
115 
116 // ---------------------------------------------------------------- //
117 // *** BLooper
118 // ---------------------------------------------------------------- //
119 
120 void ObservableLooper::Quit() {
121 	ASSERT(IsLocked());
122 
123 	if(QuitRequested()) {
124 		releaseComplete();
125 		_inherited::Quit();
126 	}
127 	else
128 		Unlock();
129 }
130 
131 bool ObservableLooper::QuitRequested() {
132 
133 	if(CountTargets()) {
134 		if(!m_quitting) {
135 			m_quitting = true;
136 
137 			// no release request yet sent
138 			notifyRelease();
139 
140 			if(m_quitTimeout != B_INFINITE_TIMEOUT) {
141 				// Initiate a timer to force quit -- if an observer
142 				// has died, it shouldn't take me down with it.
143 				ASSERT(!m_executioner);
144 				m_executioner = new BMessageRunner(
145 					BMessenger(this),
146 					new BMessage(M_KILL_OBSERVABLE),
147 					m_quitTimeout,
148 					1);
149 			}
150 		}
151 
152 		// targets remain, so don't quit.
153 		return false;
154 	}
155 
156 	// okay to quit
157 	return true;
158 }
159 
160 // ---------------------------------------------------------------- //
161 // *** BHandler
162 // ---------------------------------------------------------------- //
163 
164 void ObservableLooper::MessageReceived(
165 	BMessage*								message) {
166 
167 //	PRINT((
168 //		"### ObservableLooper::MessageReceived()\n"));
169 //	message->PrintToStream();
170 
171 	switch(message->what) {
172 		case M_ADD_OBSERVER:
173 			_handleAddObserver(message);
174 			break;
175 
176 		case M_REMOVE_OBSERVER:
177 			_handleRemoveObserver(message);
178 			break;
179 
180 		case M_KILL_OBSERVABLE:
181 			releaseComplete();
182 			BLooper::Quit();
183 			break;
184 
185 		default:
186 			_inherited::MessageReceived(message);
187 	}
188 }
189 
190 // ---------------------------------------------------------------- //
191 // *** BArchivable
192 // ---------------------------------------------------------------- //
193 
194 status_t ObservableLooper::Archive(
195 	BMessage*								archive,
196 	bool										deep) const {
197 
198 	ASSERT(IsLocked());
199 
200 	// can't archive an object in limbo
201 	if(m_quitting)
202 		return B_NOT_ALLOWED;
203 
204 	status_t err = _inherited::Archive(archive, deep);
205 	if(err < B_OK)
206 		return err;
207 
208 	archive->AddInt64("quitTimeout", m_quitTimeout);
209 	return B_OK;
210 }
211 
212 // ---------------------------------------------------------------- //
213 // implementation
214 // ---------------------------------------------------------------- //
215 
216 void ObservableLooper::_handleAddObserver(
217 	BMessage*								message) {
218 
219 	BMessage reply;
220 
221 	BMessenger observer;
222 	status_t err = message->FindMessenger(
223 		"observer", &observer);
224 	if(err < B_OK) {
225 		PRINT((
226 			"* ObservableLooper::_handleAddObserver(): no observer specified!\n"));
227 		// send reply? +++++
228 		return;
229 	}
230 
231 	// at this point, a reply of some sort will be sent
232 	reply.AddMessenger("target", BMessenger(this));
233 	reply.AddMessenger("observer", observer);
234 
235 	if(m_quitting) {
236 		// already quitting
237 		reply.what = M_BAD_TARGET;
238 	}
239 	else if(IndexOfTarget(observer.Target(0)) != -1) {
240 		// observer already added
241 		reply.what = M_BAD_OBSERVER;
242 	}
243 	else {
244 		// add it
245 		err = AddTarget(observer.Target(0));
246 		ASSERT(err == B_OK);
247 		reply.what = M_OBSERVER_ADDED;
248 	}
249 
250 	// send reply
251 	message->SendReply(&reply);
252 
253 	// call hook
254 	observerAdded(observer);
255 }
256 
257 void ObservableLooper::_handleRemoveObserver(
258 	BMessage*								message) {
259 
260 //	PRINT(("ObservableLooper::_handleRemoveObserver():\n"
261 //		"  %ld targets\n", CountTargets()));
262 	BMessage reply;
263 
264 	BMessenger observer;
265 	status_t err = message->FindMessenger(
266 		"observer", &observer);
267 	if(err < B_OK) {
268 		PRINT((
269 			"* ObservableLooper::_handleRemoveObserver(): no observer specified!\n"));
270 		// send reply? +++++
271 		return;
272 	}
273 
274 	// at this point, a reply of some sort will be sent
275 	reply.AddMessenger("target", BMessenger(this));
276 	reply.AddMessenger("observer", observer);
277 
278 	int32 index = IndexOfTarget(observer.Target(0));
279 	if(index == -1) {
280 		reply.what = M_BAD_OBSERVER;
281 	}
282 	else {
283 		RemoveTarget(index);
284 		reply.what = M_OBSERVER_REMOVED;
285 	}
286 
287 	message->SendReply(&reply);
288 
289 	// call hook
290 	observerRemoved(observer);
291 
292 	// time to shut down?
293 	if(m_quitting && !CountTargets()) {
294 		releaseComplete();
295 		BLooper::Quit();
296 	}
297 }
298 
299 // END -- ObservableLooper.cpp --
300