xref: /haiku/src/libs/compat/freebsd_network/taskqueue.c (revision 922e7ba1f3228e6f28db69b0ded8f86eb32dea17)
1 /*
2  * Copyright 2009, Colin Günther, coling@gmx.de
3  * Copyright 2007, Hugo Santos. All Rights Reserved.
4  * Distributed under the terms of the MIT License.
5  *
6  * Authors:
7  *      Hugo Santos, hugosantos@gmail.com
8  */
9 
10 
11 #include "device.h"
12 
13 #include <stdio.h>
14 
15 #include <compat/sys/taskqueue.h>
16 #include <compat/sys/haiku-module.h>
17 
18 
19 #define TQ_FLAGS_ACTIVE		(1 << 0)
20 #define TQ_FLAGS_BLOCKED	(1 << 1)
21 #define TQ_FLAGS_PENDING	(1 << 2)
22 
23 
24 struct taskqueue {
25 	char tq_name[64];
26 	mutex tq_mutex;
27 	struct list tq_list;
28 	taskqueue_enqueue_fn tq_enqueue;
29 	void *tq_arg;
30 	int tq_fast;
31 	spinlock tq_spinlock;
32 	sem_id tq_sem;
33 	thread_id *tq_threads;
34 	thread_id tq_thread_storage;
35 	int tq_threadcount;
36 	int tq_flags;
37 };
38 
39 struct taskqueue *taskqueue_fast = NULL;
40 struct taskqueue *taskqueue_swi = NULL;
41 
42 
43 static struct taskqueue *
44 _taskqueue_create(const char *name, int mflags, int fast,
45 	taskqueue_enqueue_fn enqueueFunction, void *context)
46 {
47 	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
48 	if (tq == NULL)
49 		return NULL;
50 
51 	tq->tq_fast = fast;
52 
53 	if (fast) {
54 		B_INITIALIZE_SPINLOCK(&tq->tq_spinlock);
55 	} else {
56 		mutex_init_etc(&tq->tq_mutex, name, MUTEX_FLAG_CLONE_NAME);
57 	}
58 
59 	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
60 	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
61 	tq->tq_enqueue = enqueueFunction;
62 	tq->tq_arg = context;
63 
64 	tq->tq_sem = -1;
65 	tq->tq_threads = NULL;
66 	tq->tq_threadcount = 0;
67 	tq->tq_flags = TQ_FLAGS_ACTIVE;
68 
69 	return tq;
70 }
71 
72 
73 static void
74 tq_lock(struct taskqueue *taskQueue, cpu_status *status)
75 {
76 	if (taskQueue->tq_fast) {
77 		*status = disable_interrupts();
78 		acquire_spinlock(&taskQueue->tq_spinlock);
79 	} else {
80 		mutex_lock(&taskQueue->tq_mutex);
81 	}
82 }
83 
84 
85 static void
86 tq_unlock(struct taskqueue *taskQueue, cpu_status status)
87 {
88 	if (taskQueue->tq_fast) {
89 		release_spinlock(&taskQueue->tq_spinlock);
90 		restore_interrupts(status);
91 	} else {
92 		mutex_unlock(&taskQueue->tq_mutex);
93 	}
94 }
95 
96 
97 struct taskqueue *
98 taskqueue_create(const char *name, int mflags,
99 	taskqueue_enqueue_fn enqueueFunction, void *context)
100 {
101 	return _taskqueue_create(name, mflags, 0, enqueueFunction, context);
102 }
103 
104 
105 static int32
106 tq_handle_thread(void *data)
107 {
108 	struct taskqueue *tq = data;
109 	cpu_status cpu_state;
110 	struct task *t;
111 	int pending;
112 	sem_id sem;
113 
114 	/* just a synchronization point */
115 	tq_lock(tq, &cpu_state);
116 	sem = tq->tq_sem;
117 	tq_unlock(tq, cpu_state);
118 
119 	while (1) {
120 		status_t status = acquire_sem(sem);
121 		if (status < B_OK)
122 			break;
123 
124 		tq_lock(tq, &cpu_state);
125 		t = list_remove_head_item(&tq->tq_list);
126 		pending = t->ta_pending;
127 		t->ta_pending = 0;
128 		tq_unlock(tq, cpu_state);
129 
130 		t->ta_handler(t->ta_argument, pending);
131 	}
132 
133 	return 0;
134 }
135 
136 
137 static int
138 _taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
139 	const char *name)
140 {
141 	struct taskqueue *tq = (*taskQueue);
142 	int i, j;
143 
144 	if (count == 0)
145 		return -1;
146 
147 	if (tq->tq_threads != NULL)
148 		return -1;
149 
150 	if (count == 1) {
151 		tq->tq_threads = &tq->tq_thread_storage;
152 	} else {
153 		tq->tq_threads = malloc(sizeof(thread_id) * count);
154 		if (tq->tq_threads == NULL)
155 			return B_NO_MEMORY;
156 	}
157 
158 	tq->tq_sem = create_sem(0, tq->tq_name);
159 	if (tq->tq_sem < B_OK) {
160 		if (count > 1)
161 			free(tq->tq_threads);
162 		tq->tq_threads = NULL;
163 		return tq->tq_sem;
164 	}
165 
166 	for (i = 0; i < count; i++) {
167 		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
168 			priority, tq);
169 		if (tq->tq_threads[i] < B_OK) {
170 			status_t status = tq->tq_threads[i];
171 			for (j = 0; j < i; j++)
172 				kill_thread(tq->tq_threads[j]);
173 			if (count > 1)
174 				free(tq->tq_threads);
175 			tq->tq_threads = NULL;
176 			delete_sem(tq->tq_sem);
177 			return status;
178 		}
179 	}
180 
181 	tq->tq_threadcount = count;
182 
183 	for (i = 0; i < count; i++)
184 		resume_thread(tq->tq_threads[i]);
185 
186 	return 0;
187 }
188 
189 
190 int
191 taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
192 	const char *format, ...)
193 {
194 	/* we assume that start_threads is called in a sane place, and thus
195 	 * don't need to be locked. This is mostly due to the fact that if
196 	 * the TQ is 'fast', locking the TQ disables interrupts... and then
197 	 * we can't create semaphores, threads and bananas. */
198 
199 	/* cpu_status state; */
200 	char name[64];
201 	int result;
202 	va_list vl;
203 
204 	va_start(vl, format);
205 	vsnprintf(name, sizeof(name), format, vl);
206 	va_end(vl);
207 
208 	/*tq_lock(*tqp, &state);*/
209 	result = _taskqueue_start_threads(taskQueue, count, priority, name);
210 	/*tq_unlock(*tqp, state);*/
211 
212 	return result;
213 }
214 
215 
216 void
217 taskqueue_free(struct taskqueue *taskQueue)
218 {
219 	/* lock and  drain list? */
220 	taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE;
221 	if (!taskQueue->tq_fast)
222 		mutex_destroy(&taskQueue->tq_mutex);
223 	if (taskQueue->tq_sem != -1) {
224 		int i;
225 
226 		delete_sem(taskQueue->tq_sem);
227 
228 		for (i = 0; i < taskQueue->tq_threadcount; i++) {
229 			status_t status;
230 			wait_for_thread(taskQueue->tq_threads[i], &status);
231 		}
232 
233 		if (taskQueue->tq_threadcount > 1)
234 			free(taskQueue->tq_threads);
235 	}
236 
237 	free(taskQueue);
238 }
239 
240 
241 void
242 taskqueue_drain(struct taskqueue *taskQueue, struct task *task)
243 {
244 	cpu_status status;
245 
246 	tq_lock(taskQueue, &status);
247 	while (task->ta_pending != 0) {
248 		tq_unlock(taskQueue, status);
249 		snooze(0);
250 		tq_lock(taskQueue, &status);
251 	}
252 	tq_unlock(taskQueue, status);
253 }
254 
255 
256 int
257 taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task)
258 {
259 	cpu_status status;
260 	tq_lock(taskQueue, &status);
261 	/* we don't really support priorities */
262 	if (task->ta_pending) {
263 		task->ta_pending++;
264 	} else {
265 		list_add_item(&taskQueue->tq_list, task);
266 		task->ta_pending = 1;
267 		if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
268 			taskQueue->tq_enqueue(taskQueue->tq_arg);
269 		else
270 			taskQueue->tq_flags |= TQ_FLAGS_PENDING;
271 	}
272 	tq_unlock(taskQueue, status);
273 	return 0;
274 }
275 
276 
277 void
278 taskqueue_thread_enqueue(void *context)
279 {
280 	struct taskqueue **tqp = context;
281 	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
282 }
283 
284 
285 int
286 taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task)
287 {
288 	return taskqueue_enqueue(taskQueue, task);
289 }
290 
291 
292 struct taskqueue *
293 taskqueue_create_fast(const char *name, int mflags,
294 	taskqueue_enqueue_fn enqueueFunction, void *context)
295 {
296 	return _taskqueue_create(name, mflags, 1, enqueueFunction, context);
297 }
298 
299 
300 void
301 task_init(struct task *task, int prio, task_handler_t handler, void *context)
302 {
303 	task->ta_priority = prio;
304 	task->ta_handler = handler;
305 	task->ta_argument = context;
306 	task->ta_pending = 0;
307 }
308 
309 
310 status_t
311 init_taskqueues()
312 {
313 	status_t status = B_NO_MEMORY;
314 
315 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
316 		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
317 			taskqueue_thread_enqueue, &taskqueue_fast);
318 		if (taskqueue_fast == NULL)
319 			return B_NO_MEMORY;
320 
321 		status = taskqueue_start_threads(&taskqueue_fast, 1,
322 			B_REAL_TIME_PRIORITY, "fast taskq");
323 		if (status < B_OK)
324 			goto err_1;
325 	}
326 
327 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
328 		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
329 			taskqueue_thread_enqueue, &taskqueue_swi);
330 		if (taskqueue_swi == NULL) {
331 			status = B_NO_MEMORY;
332 			goto err_1;
333 		}
334 
335 		status = taskqueue_start_threads(&taskqueue_swi, 1,
336 			B_REAL_TIME_PRIORITY, "swi taskq");
337 		if (status < B_OK)
338 			goto err_2;
339 	}
340 
341 	return B_OK;
342 
343 err_2:
344 	if (taskqueue_swi)
345 		taskqueue_free(taskqueue_swi);
346 
347 err_1:
348 	if (taskqueue_fast)
349 		taskqueue_free(taskqueue_fast);
350 
351 	return status;
352 }
353 
354 
355 void
356 uninit_taskqueues()
357 {
358 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
359 		taskqueue_free(taskqueue_swi);
360 
361 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
362 		taskqueue_free(taskqueue_fast);
363 }
364 
365 
366 void
367 taskqueue_block(struct taskqueue *taskQueue)
368 {
369 	cpu_status status;
370 
371 	tq_lock(taskQueue, &status);
372 	taskQueue->tq_flags |= TQ_FLAGS_BLOCKED;
373 	tq_unlock(taskQueue, status);
374 }
375 
376 
377 void
378 taskqueue_unblock(struct taskqueue *taskQueue)
379 {
380 	cpu_status status;
381 
382 	tq_lock(taskQueue, &status);
383 	taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED;
384 	if (taskQueue->tq_flags & TQ_FLAGS_PENDING) {
385 		taskQueue->tq_flags &= ~TQ_FLAGS_PENDING;
386 		taskQueue->tq_enqueue(taskQueue->tq_arg);
387 	}
388 	tq_unlock(taskQueue, status);
389 }
390