xref: /haiku/src/add-ons/kernel/file_systems/netfs/shared/RequestConnection.cpp (revision 220d04022750f40f8bac8f01fa551211e28d04f2)
1 // RequestConnection.cpp
2 
3 #include <new>
4 
5 #include <OS.h>
6 
7 #include "Channel.h"
8 #include "Connection.h"
9 #include "RequestChannel.h"
10 #include "RequestConnection.h"
11 #include "RequestHandler.h"
12 
13 // DownStreamThread
14 class RequestConnection::DownStreamThread {
15 public:
16 	DownStreamThread()
17 		: fThread(-1),
18 		  fConnection(NULL),
19 		  fChannel(NULL),
20 		  fHandler(NULL),
21 		  fTerminating(false)
22 	{
23 	}
24 
25 	~DownStreamThread()
26 	{
27 		Terminate();
28 		delete fChannel;
29 	}
30 
31 	status_t Init(RequestConnection* connection, Channel* channel,
32 		RequestHandler* handler)
33 	{
34 		if (!connection || !channel || !handler)
35 			return B_BAD_VALUE;
36 		fConnection = connection;
37 		fHandler = handler;
38 		fChannel = new(std::nothrow) RequestChannel(channel);
39 		if (!fChannel)
40 			return B_NO_MEMORY;
41 		fThread = spawn_thread(_LoopEntry, "down stream thread",
42 			B_NORMAL_PRIORITY, this);
43 		if (fThread < 0)
44 			return fThread;
45 		return B_OK;
46 	}
47 
48 	void Run()
49 	{
50 		resume_thread(fThread);
51 	}
52 
53 	void Terminate()
54 	{
55 		fTerminating = true;
56 		if (fThread > 0 && find_thread(NULL) != fThread) {
57 			int32 result;
58 			wait_for_thread(fThread, &result);
59 		}
60 	}
61 
62 	RequestChannel* GetRequestChannel()
63 	{
64 		return fChannel;
65 	}
66 
67 private:
68 	static int32 _LoopEntry(void* data)
69 	{
70 		return ((DownStreamThread*)data)->_Loop();
71 	}
72 
73 	int32 _Loop()
74 	{
75 		while (!fTerminating) {
76 			Request* request;
77 			status_t error = fChannel->ReceiveRequest(&request);
78 			if (error == B_OK) {
79 				error = fHandler->HandleRequest(request, fChannel);
80 				delete request;
81 			}
82 			if (error != B_OK)
83 				fTerminating = fConnection->DownStreamChannelError(this, error);
84 		}
85 		return 0;
86 	}
87 
88 private:
89 	thread_id			fThread;
90 	RequestConnection*	fConnection;
91 	RequestChannel*		fChannel;
92 	RequestHandler*		fHandler;
93 	volatile bool		fTerminating;
94 };
95 
96 
97 // RequestConnection
98 
99 // constructor
100 RequestConnection::RequestConnection(Connection* connection,
101 	RequestHandler* requestHandler, bool ownsRequestHandler)
102 	: fConnection(connection),
103 	  fRequestHandler(requestHandler),
104 	  fOwnsRequestHandler(ownsRequestHandler),
105 	  fThreads(NULL),
106 	  fThreadCount(0),
107 	  fTerminationCount(0)
108 {
109 }
110 
111 // destructor
112 RequestConnection::~RequestConnection()
113 {
114 	Close();
115 	delete[] fThreads;
116 	delete fConnection;
117 	if (fOwnsRequestHandler)
118 		delete fRequestHandler;
119 }
120 
121 // Init
122 status_t
123 RequestConnection::Init()
124 {
125 	// check parameters
126 	if (!fConnection || !fRequestHandler)
127 		return B_BAD_VALUE;
128 	if (fConnection->CountDownStreamChannels() < 1)
129 		return B_ERROR;
130 	// create a thread per down-stream channel
131 	fThreadCount = fConnection->CountDownStreamChannels();
132 	fThreads = new(std::nothrow) DownStreamThread[fThreadCount];
133 	if (!fThreads)
134 		return B_NO_MEMORY;
135 	// initialize the threads
136 	for (int32 i = 0; i < fThreadCount; i++) {
137 		status_t error = fThreads[i].Init(this,
138 			fConnection->DownStreamChannelAt(i), fRequestHandler);
139 		if (error != B_OK)
140 			return error;
141 	}
142 	// run the threads
143 	for (int32 i = 0; i < fThreadCount; i++)
144 		fThreads[i].Run();
145 	return B_OK;
146 }
147 
148 // Close
149 void
150 RequestConnection::Close()
151 {
152 	atomic_add(&fTerminationCount, 1);
153 	if (fConnection)
154 		fConnection->Close();
155 	if (fThreads) {
156 		for (int32 i = 0; i < fThreadCount; i++)
157 			fThreads[i].Terminate();
158 	}
159 }
160 
161 // SendRequest
162 status_t
163 RequestConnection::SendRequest(Request* request, Request** reply)
164 {
165 	return _SendRequest(request, reply, NULL);
166 }
167 
168 // SendRequest
169 status_t
170 RequestConnection::SendRequest(Request* request, RequestHandler* replyHandler)
171 {
172 	if (!replyHandler)
173 		return B_BAD_VALUE;
174 	return _SendRequest(request, NULL, replyHandler);
175 }
176 
177 // DownStreamChannelError
178 bool
179 RequestConnection::DownStreamChannelError(DownStreamThread* thread,
180 	status_t error)
181 {
182 	if (atomic_add(&fTerminationCount, 1) == 0 && fRequestHandler) {
183 		ConnectionBrokenRequest request;
184 		request.error = error;
185 		fRequestHandler->HandleRequest(&request, thread->GetRequestChannel());
186 	}
187 	return true;
188 }
189 
190 // _SendRequest
191 status_t
192 RequestConnection::_SendRequest(Request* request, Request** _reply,
193 	RequestHandler* replyHandler)
194 {
195 	// check parameters
196 	if (!request)
197 		return B_BAD_VALUE;
198 	// get a channel
199 	Channel* channel = NULL;
200 	status_t error = fConnection->GetUpStreamChannel(&channel);
201 	if (error != B_OK)
202 		return error;
203 	// send the request
204 	RequestChannel requestChannel(channel);
205 	error = requestChannel.SendRequest(request);
206 	// receive the reply
207 	Request* reply = NULL;
208 	if (error == B_OK && (_reply || replyHandler)) {
209 		error = requestChannel.ReceiveRequest(&reply);
210 		// handle the reply
211 		if (error == B_OK) {
212 			if (replyHandler)
213 				error = replyHandler->HandleRequest(reply, &requestChannel);
214 			if (error == B_OK && _reply)
215 				*_reply = reply;
216 			else
217 				delete reply;
218 		}
219 	}
220 	// cleanup
221 	if (fConnection->PutUpStreamChannel(channel) != B_OK) {
222 		// Ugh! A serious error. Probably insufficient memory.
223 		delete channel;
224 	}
225 	return error;
226 }
227 
228