1 // RequestConnection.cpp 2 3 #include <new> 4 5 #include <OS.h> 6 7 #include "Channel.h" 8 #include "Connection.h" 9 #include "RequestChannel.h" 10 #include "RequestConnection.h" 11 #include "RequestHandler.h" 12 13 // DownStreamThread 14 class RequestConnection::DownStreamThread { 15 public: 16 DownStreamThread() 17 : fThread(-1), 18 fConnection(NULL), 19 fChannel(NULL), 20 fHandler(NULL), 21 fTerminating(false) 22 { 23 } 24 25 ~DownStreamThread() 26 { 27 Terminate(); 28 delete fChannel; 29 } 30 31 status_t Init(RequestConnection* connection, Channel* channel, 32 RequestHandler* handler) 33 { 34 if (!connection || !channel || !handler) 35 return B_BAD_VALUE; 36 fConnection = connection; 37 fHandler = handler; 38 fChannel = new(std::nothrow) RequestChannel(channel); 39 if (!fChannel) 40 return B_NO_MEMORY; 41 fThread = spawn_thread(_LoopEntry, "down stream thread", 42 B_NORMAL_PRIORITY, this); 43 if (fThread < 0) 44 return fThread; 45 return B_OK; 46 } 47 48 void Run() 49 { 50 resume_thread(fThread); 51 } 52 53 void Terminate() 54 { 55 fTerminating = true; 56 if (fThread > 0 && find_thread(NULL) != fThread) { 57 int32 result; 58 wait_for_thread(fThread, &result); 59 } 60 } 61 62 RequestChannel* GetRequestChannel() 63 { 64 return fChannel; 65 } 66 67 private: 68 static int32 _LoopEntry(void* data) 69 { 70 return ((DownStreamThread*)data)->_Loop(); 71 } 72 73 int32 _Loop() 74 { 75 while (!fTerminating) { 76 Request* request; 77 status_t error = fChannel->ReceiveRequest(&request); 78 if (error == B_OK) { 79 error = fHandler->HandleRequest(request, fChannel); 80 delete request; 81 } 82 if (error != B_OK) 83 fTerminating = fConnection->DownStreamChannelError(this, error); 84 } 85 return 0; 86 } 87 88 private: 89 thread_id fThread; 90 RequestConnection* fConnection; 91 RequestChannel* fChannel; 92 RequestHandler* fHandler; 93 volatile bool fTerminating; 94 }; 95 96 97 // RequestConnection 98 99 // constructor 100 RequestConnection::RequestConnection(Connection* connection, 101 RequestHandler* requestHandler, bool ownsRequestHandler) 102 : fConnection(connection), 103 fRequestHandler(requestHandler), 104 fOwnsRequestHandler(ownsRequestHandler), 105 fThreads(NULL), 106 fThreadCount(0), 107 fTerminationCount(0) 108 { 109 } 110 111 // destructor 112 RequestConnection::~RequestConnection() 113 { 114 Close(); 115 delete[] fThreads; 116 delete fConnection; 117 if (fOwnsRequestHandler) 118 delete fRequestHandler; 119 } 120 121 // Init 122 status_t 123 RequestConnection::Init() 124 { 125 // check parameters 126 if (!fConnection || !fRequestHandler) 127 return B_BAD_VALUE; 128 if (fConnection->CountDownStreamChannels() < 1) 129 return B_ERROR; 130 // create a thread per down-stream channel 131 fThreadCount = fConnection->CountDownStreamChannels(); 132 fThreads = new(std::nothrow) DownStreamThread[fThreadCount]; 133 if (!fThreads) 134 return B_NO_MEMORY; 135 // initialize the threads 136 for (int32 i = 0; i < fThreadCount; i++) { 137 status_t error = fThreads[i].Init(this, 138 fConnection->DownStreamChannelAt(i), fRequestHandler); 139 if (error != B_OK) 140 return error; 141 } 142 // run the threads 143 for (int32 i = 0; i < fThreadCount; i++) 144 fThreads[i].Run(); 145 return B_OK; 146 } 147 148 // Close 149 void 150 RequestConnection::Close() 151 { 152 atomic_add(&fTerminationCount, 1); 153 if (fConnection) 154 fConnection->Close(); 155 if (fThreads) { 156 for (int32 i = 0; i < fThreadCount; i++) 157 fThreads[i].Terminate(); 158 } 159 } 160 161 // SendRequest 162 status_t 163 RequestConnection::SendRequest(Request* request, Request** reply) 164 { 165 return _SendRequest(request, reply, NULL); 166 } 167 168 // SendRequest 169 status_t 170 RequestConnection::SendRequest(Request* request, RequestHandler* replyHandler) 171 { 172 if (!replyHandler) 173 return B_BAD_VALUE; 174 return _SendRequest(request, NULL, replyHandler); 175 } 176 177 // DownStreamChannelError 178 bool 179 RequestConnection::DownStreamChannelError(DownStreamThread* thread, 180 status_t error) 181 { 182 if (atomic_add(&fTerminationCount, 1) == 0 && fRequestHandler) { 183 ConnectionBrokenRequest request; 184 request.error = error; 185 fRequestHandler->HandleRequest(&request, thread->GetRequestChannel()); 186 } 187 return true; 188 } 189 190 // _SendRequest 191 status_t 192 RequestConnection::_SendRequest(Request* request, Request** _reply, 193 RequestHandler* replyHandler) 194 { 195 // check parameters 196 if (!request) 197 return B_BAD_VALUE; 198 // get a channel 199 Channel* channel = NULL; 200 status_t error = fConnection->GetUpStreamChannel(&channel); 201 if (error != B_OK) 202 return error; 203 // send the request 204 RequestChannel requestChannel(channel); 205 error = requestChannel.SendRequest(request); 206 // receive the reply 207 Request* reply = NULL; 208 if (error == B_OK && (_reply || replyHandler)) { 209 error = requestChannel.ReceiveRequest(&reply); 210 // handle the reply 211 if (error == B_OK) { 212 if (replyHandler) 213 error = replyHandler->HandleRequest(reply, &requestChannel); 214 if (error == B_OK && _reply) 215 *_reply = reply; 216 else 217 delete reply; 218 } 219 } 220 // cleanup 221 if (fConnection->PutUpStreamChannel(channel) != B_OK) { 222 // Ugh! A serious error. Probably insufficient memory. 223 delete channel; 224 } 225 return error; 226 } 227 228