1 // RequestConnection.cpp
2
3 #include <new>
4
5 #include <OS.h>
6
7 #include "Channel.h"
8 #include "Connection.h"
9 #include "RequestChannel.h"
10 #include "RequestConnection.h"
11 #include "RequestHandler.h"
12
13 // DownStreamThread
14 class RequestConnection::DownStreamThread {
15 public:
DownStreamThread()16 DownStreamThread()
17 : fThread(-1),
18 fConnection(NULL),
19 fChannel(NULL),
20 fHandler(NULL),
21 fTerminating(false)
22 {
23 }
24
~DownStreamThread()25 ~DownStreamThread()
26 {
27 Terminate();
28 delete fChannel;
29 }
30
Init(RequestConnection * connection,Channel * channel,RequestHandler * handler)31 status_t Init(RequestConnection* connection, Channel* channel,
32 RequestHandler* handler)
33 {
34 if (!connection || !channel || !handler)
35 return B_BAD_VALUE;
36 fConnection = connection;
37 fHandler = handler;
38 fChannel = new(std::nothrow) RequestChannel(channel);
39 if (!fChannel)
40 return B_NO_MEMORY;
41 fThread = spawn_thread(_LoopEntry, "down stream thread",
42 B_NORMAL_PRIORITY, this);
43 if (fThread < 0)
44 return fThread;
45 return B_OK;
46 }
47
Run()48 void Run()
49 {
50 resume_thread(fThread);
51 }
52
Terminate()53 void Terminate()
54 {
55 fTerminating = true;
56 if (fThread > 0 && find_thread(NULL) != fThread) {
57 int32 result;
58 wait_for_thread(fThread, &result);
59 }
60 }
61
GetRequestChannel()62 RequestChannel* GetRequestChannel()
63 {
64 return fChannel;
65 }
66
67 private:
_LoopEntry(void * data)68 static int32 _LoopEntry(void* data)
69 {
70 return ((DownStreamThread*)data)->_Loop();
71 }
72
_Loop()73 int32 _Loop()
74 {
75 while (!fTerminating) {
76 Request* request;
77 status_t error = fChannel->ReceiveRequest(&request);
78 if (error == B_OK) {
79 error = fHandler->HandleRequest(request, fChannel);
80 delete request;
81 }
82 if (error != B_OK)
83 fTerminating = fConnection->DownStreamChannelError(this, error);
84 }
85 return 0;
86 }
87
88 private:
89 thread_id fThread;
90 RequestConnection* fConnection;
91 RequestChannel* fChannel;
92 RequestHandler* fHandler;
93 volatile bool fTerminating;
94 };
95
96
97 // RequestConnection
98
99 // constructor
RequestConnection(Connection * connection,RequestHandler * requestHandler,bool ownsRequestHandler)100 RequestConnection::RequestConnection(Connection* connection,
101 RequestHandler* requestHandler, bool ownsRequestHandler)
102 : fConnection(connection),
103 fRequestHandler(requestHandler),
104 fOwnsRequestHandler(ownsRequestHandler),
105 fThreads(NULL),
106 fThreadCount(0),
107 fTerminationCount(0)
108 {
109 }
110
111 // destructor
~RequestConnection()112 RequestConnection::~RequestConnection()
113 {
114 Close();
115 delete[] fThreads;
116 delete fConnection;
117 if (fOwnsRequestHandler)
118 delete fRequestHandler;
119 }
120
121 // Init
122 status_t
Init()123 RequestConnection::Init()
124 {
125 // check parameters
126 if (!fConnection || !fRequestHandler)
127 return B_BAD_VALUE;
128 if (fConnection->CountDownStreamChannels() < 1)
129 return B_ERROR;
130 // create a thread per down-stream channel
131 fThreadCount = fConnection->CountDownStreamChannels();
132 fThreads = new(std::nothrow) DownStreamThread[fThreadCount];
133 if (!fThreads)
134 return B_NO_MEMORY;
135 // initialize the threads
136 for (int32 i = 0; i < fThreadCount; i++) {
137 status_t error = fThreads[i].Init(this,
138 fConnection->DownStreamChannelAt(i), fRequestHandler);
139 if (error != B_OK)
140 return error;
141 }
142 // run the threads
143 for (int32 i = 0; i < fThreadCount; i++)
144 fThreads[i].Run();
145 return B_OK;
146 }
147
148 // Close
149 void
Close()150 RequestConnection::Close()
151 {
152 atomic_add(&fTerminationCount, 1);
153 if (fConnection)
154 fConnection->Close();
155 if (fThreads) {
156 for (int32 i = 0; i < fThreadCount; i++)
157 fThreads[i].Terminate();
158 }
159 }
160
161 // SendRequest
162 status_t
SendRequest(Request * request,Request ** reply)163 RequestConnection::SendRequest(Request* request, Request** reply)
164 {
165 return _SendRequest(request, reply, NULL);
166 }
167
168 // SendRequest
169 status_t
SendRequest(Request * request,RequestHandler * replyHandler)170 RequestConnection::SendRequest(Request* request, RequestHandler* replyHandler)
171 {
172 if (!replyHandler)
173 return B_BAD_VALUE;
174 return _SendRequest(request, NULL, replyHandler);
175 }
176
177 // DownStreamChannelError
178 bool
DownStreamChannelError(DownStreamThread * thread,status_t error)179 RequestConnection::DownStreamChannelError(DownStreamThread* thread,
180 status_t error)
181 {
182 if (atomic_add(&fTerminationCount, 1) == 0 && fRequestHandler) {
183 ConnectionBrokenRequest request;
184 request.error = error;
185 fRequestHandler->HandleRequest(&request, thread->GetRequestChannel());
186 }
187 return true;
188 }
189
190 // _SendRequest
191 status_t
_SendRequest(Request * request,Request ** _reply,RequestHandler * replyHandler)192 RequestConnection::_SendRequest(Request* request, Request** _reply,
193 RequestHandler* replyHandler)
194 {
195 // check parameters
196 if (!request)
197 return B_BAD_VALUE;
198 // get a channel
199 Channel* channel = NULL;
200 status_t error = fConnection->GetUpStreamChannel(&channel);
201 if (error != B_OK)
202 return error;
203 // send the request
204 RequestChannel requestChannel(channel);
205 error = requestChannel.SendRequest(request);
206 // receive the reply
207 Request* reply = NULL;
208 if (error == B_OK && (_reply || replyHandler)) {
209 error = requestChannel.ReceiveRequest(&reply);
210 // handle the reply
211 if (error == B_OK) {
212 if (replyHandler)
213 error = replyHandler->HandleRequest(reply, &requestChannel);
214 if (error == B_OK && _reply)
215 *_reply = reply;
216 else
217 delete reply;
218 }
219 }
220 // cleanup
221 if (fConnection->PutUpStreamChannel(channel) != B_OK) {
222 // Ugh! A serious error. Probably insufficient memory.
223 delete channel;
224 }
225 return error;
226 }
227
228