xref: /haiku/src/libs/compat/freebsd_network/taskqueue.c (revision 220d04022750f40f8bac8f01fa551211e28d04f2)
1 /*
2  * Copyright 2009, Colin Günther, coling@gmx.de
3  * Copyright 2007, Hugo Santos. All Rights Reserved.
4  * Distributed under the terms of the MIT License.
5  *
6  * Authors:
7  *      Hugo Santos, hugosantos@gmail.com
8  */
9 
10 
11 #include "device.h"
12 
13 #include <stdio.h>
14 
15 #include <compat/sys/taskqueue.h>
16 #include <compat/sys/haiku-module.h>
17 
18 
19 #define TQ_FLAGS_ACTIVE		(1 << 0)
20 #define TQ_FLAGS_BLOCKED	(1 << 1)
21 #define TQ_FLAGS_PENDING	(1 << 2)
22 
23 
24 struct taskqueue {
25 	char tq_name[64];
26 	mutex tq_mutex;
27 	struct list tq_list;
28 	taskqueue_enqueue_fn tq_enqueue;
29 	void *tq_arg;
30 	int tq_fast;
31 	spinlock tq_spinlock;
32 	sem_id tq_sem;
33 	thread_id *tq_threads;
34 	thread_id tq_thread_storage;
35 	int tq_threadcount;
36 	int tq_flags;
37 };
38 
39 struct taskqueue *taskqueue_fast = NULL;
40 struct taskqueue *taskqueue_swi = NULL;
41 
42 
43 static struct taskqueue *
44 _taskqueue_create(const char *name, int mflags, int fast,
45 	taskqueue_enqueue_fn enqueueFunction, void *context)
46 {
47 	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
48 	if (tq == NULL)
49 		return NULL;
50 
51 	tq->tq_fast = fast;
52 
53 	if (fast) {
54 		B_INITIALIZE_SPINLOCK(&tq->tq_spinlock);
55 	} else {
56 		mutex_init_etc(&tq->tq_mutex, name, MUTEX_FLAG_CLONE_NAME);
57 	}
58 
59 	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
60 	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
61 	tq->tq_enqueue = enqueueFunction;
62 	tq->tq_arg = context;
63 
64 	tq->tq_sem = -1;
65 	tq->tq_threads = NULL;
66 	tq->tq_threadcount = 0;
67 	tq->tq_flags = TQ_FLAGS_ACTIVE;
68 
69 	return tq;
70 }
71 
72 
73 static void
74 tq_lock(struct taskqueue *taskQueue, cpu_status *status)
75 {
76 	if (taskQueue->tq_fast) {
77 		*status = disable_interrupts();
78 		acquire_spinlock(&taskQueue->tq_spinlock);
79 	} else {
80 		mutex_lock(&taskQueue->tq_mutex);
81 	}
82 }
83 
84 
85 static void
86 tq_unlock(struct taskqueue *taskQueue, cpu_status status)
87 {
88 	if (taskQueue->tq_fast) {
89 		release_spinlock(&taskQueue->tq_spinlock);
90 		restore_interrupts(status);
91 	} else {
92 		mutex_unlock(&taskQueue->tq_mutex);
93 	}
94 }
95 
96 
97 struct taskqueue *
98 taskqueue_create(const char *name, int mflags,
99 	taskqueue_enqueue_fn enqueueFunction, void *context)
100 {
101 	return _taskqueue_create(name, mflags, 0, enqueueFunction, context);
102 }
103 
104 
105 static int32
106 tq_handle_thread(void *data)
107 {
108 	struct taskqueue *tq = data;
109 	cpu_status cpu_state;
110 	struct task *t;
111 	int pending;
112 	sem_id sem;
113 
114 	/* just a synchronization point */
115 	tq_lock(tq, &cpu_state);
116 	sem = tq->tq_sem;
117 	tq_unlock(tq, cpu_state);
118 
119 	while (acquire_sem(sem) == B_NO_ERROR) {
120 		tq_lock(tq, &cpu_state);
121 		t = list_remove_head_item(&tq->tq_list);
122 		tq_unlock(tq, cpu_state);
123 		if (t == NULL)
124 			continue;
125 		pending = t->ta_pending;
126 		t->ta_pending = 0;
127 
128 		t->ta_handler(t->ta_argument, pending);
129 	}
130 
131 	return 0;
132 }
133 
134 
135 static int
136 _taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
137 	const char *name)
138 {
139 	struct taskqueue *tq = (*taskQueue);
140 	int i, j;
141 
142 	if (count == 0)
143 		return -1;
144 
145 	if (tq->tq_threads != NULL)
146 		return -1;
147 
148 	if (count == 1) {
149 		tq->tq_threads = &tq->tq_thread_storage;
150 	} else {
151 		tq->tq_threads = malloc(sizeof(thread_id) * count);
152 		if (tq->tq_threads == NULL)
153 			return B_NO_MEMORY;
154 	}
155 
156 	tq->tq_sem = create_sem(0, tq->tq_name);
157 	if (tq->tq_sem < B_OK) {
158 		if (count > 1)
159 			free(tq->tq_threads);
160 		tq->tq_threads = NULL;
161 		return tq->tq_sem;
162 	}
163 
164 	for (i = 0; i < count; i++) {
165 		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
166 			priority, tq);
167 		if (tq->tq_threads[i] < B_OK) {
168 			status_t status = tq->tq_threads[i];
169 			for (j = 0; j < i; j++)
170 				kill_thread(tq->tq_threads[j]);
171 			if (count > 1)
172 				free(tq->tq_threads);
173 			tq->tq_threads = NULL;
174 			delete_sem(tq->tq_sem);
175 			return status;
176 		}
177 	}
178 
179 	tq->tq_threadcount = count;
180 
181 	for (i = 0; i < count; i++)
182 		resume_thread(tq->tq_threads[i]);
183 
184 	return 0;
185 }
186 
187 
188 int
189 taskqueue_start_threads(struct taskqueue **taskQueue, int count, int priority,
190 	const char *format, ...)
191 {
192 	/* we assume that start_threads is called in a sane place, and thus
193 	 * don't need to be locked. This is mostly due to the fact that if
194 	 * the TQ is 'fast', locking the TQ disables interrupts... and then
195 	 * we can't create semaphores, threads and bananas. */
196 
197 	/* cpu_status state; */
198 	char name[64];
199 	int result;
200 	va_list vl;
201 
202 	va_start(vl, format);
203 	vsnprintf(name, sizeof(name), format, vl);
204 	va_end(vl);
205 
206 	/*tq_lock(*tqp, &state);*/
207 	result = _taskqueue_start_threads(taskQueue, count, priority, name);
208 	/*tq_unlock(*tqp, state);*/
209 
210 	return result;
211 }
212 
213 
214 void
215 taskqueue_free(struct taskqueue *taskQueue)
216 {
217 	/* lock and  drain list? */
218 	taskQueue->tq_flags &= ~TQ_FLAGS_ACTIVE;
219 	if (!taskQueue->tq_fast)
220 		mutex_destroy(&taskQueue->tq_mutex);
221 	if (taskQueue->tq_sem != -1) {
222 		int i;
223 
224 		delete_sem(taskQueue->tq_sem);
225 
226 		for (i = 0; i < taskQueue->tq_threadcount; i++) {
227 			status_t status;
228 			wait_for_thread(taskQueue->tq_threads[i], &status);
229 		}
230 
231 		if (taskQueue->tq_threadcount > 1)
232 			free(taskQueue->tq_threads);
233 	}
234 
235 	free(taskQueue);
236 }
237 
238 
239 void
240 taskqueue_drain(struct taskqueue *taskQueue, struct task *task)
241 {
242 	cpu_status status;
243 
244 	tq_lock(taskQueue, &status);
245 	while (task->ta_pending != 0) {
246 		tq_unlock(taskQueue, status);
247 		snooze(0);
248 		tq_lock(taskQueue, &status);
249 	}
250 	tq_unlock(taskQueue, status);
251 }
252 
253 
254 int
255 taskqueue_enqueue(struct taskqueue *taskQueue, struct task *task)
256 {
257 	cpu_status status;
258 	tq_lock(taskQueue, &status);
259 	/* we don't really support priorities */
260 	if (task->ta_pending) {
261 		task->ta_pending++;
262 	} else {
263 		list_add_item(&taskQueue->tq_list, task);
264 		task->ta_pending = 1;
265 		if ((taskQueue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
266 			taskQueue->tq_enqueue(taskQueue->tq_arg);
267 		else
268 			taskQueue->tq_flags |= TQ_FLAGS_PENDING;
269 	}
270 	tq_unlock(taskQueue, status);
271 	return 0;
272 }
273 
274 
275 void
276 taskqueue_thread_enqueue(void *context)
277 {
278 	struct taskqueue **tqp = context;
279 	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
280 }
281 
282 
283 int
284 taskqueue_enqueue_fast(struct taskqueue *taskQueue, struct task *task)
285 {
286 	return taskqueue_enqueue(taskQueue, task);
287 }
288 
289 
290 struct taskqueue *
291 taskqueue_create_fast(const char *name, int mflags,
292 	taskqueue_enqueue_fn enqueueFunction, void *context)
293 {
294 	return _taskqueue_create(name, mflags, 1, enqueueFunction, context);
295 }
296 
297 
298 void
299 task_init(struct task *task, int prio, task_handler_t handler, void *context)
300 {
301 	task->ta_priority = prio;
302 	task->ta_handler = handler;
303 	task->ta_argument = context;
304 	task->ta_pending = 0;
305 }
306 
307 
308 status_t
309 init_taskqueues()
310 {
311 	status_t status = B_NO_MEMORY;
312 
313 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
314 		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
315 			taskqueue_thread_enqueue, &taskqueue_fast);
316 		if (taskqueue_fast == NULL)
317 			return B_NO_MEMORY;
318 
319 		status = taskqueue_start_threads(&taskqueue_fast, 1,
320 			B_REAL_TIME_PRIORITY, "fast taskq thread");
321 		if (status < B_OK)
322 			goto err_1;
323 	}
324 
325 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
326 		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
327 			taskqueue_thread_enqueue, &taskqueue_swi);
328 		if (taskqueue_swi == NULL) {
329 			status = B_NO_MEMORY;
330 			goto err_1;
331 		}
332 
333 		status = taskqueue_start_threads(&taskqueue_swi, 1,
334 			B_REAL_TIME_PRIORITY, "swi taskq");
335 		if (status < B_OK)
336 			goto err_2;
337 	}
338 
339 	return B_OK;
340 
341 err_2:
342 	if (taskqueue_swi)
343 		taskqueue_free(taskqueue_swi);
344 
345 err_1:
346 	if (taskqueue_fast)
347 		taskqueue_free(taskqueue_fast);
348 
349 	return status;
350 }
351 
352 
353 void
354 uninit_taskqueues()
355 {
356 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
357 		taskqueue_free(taskqueue_swi);
358 
359 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
360 		taskqueue_free(taskqueue_fast);
361 }
362 
363 
364 void
365 taskqueue_block(struct taskqueue *taskQueue)
366 {
367 	cpu_status status;
368 
369 	tq_lock(taskQueue, &status);
370 	taskQueue->tq_flags |= TQ_FLAGS_BLOCKED;
371 	tq_unlock(taskQueue, status);
372 }
373 
374 
375 void
376 taskqueue_unblock(struct taskqueue *taskQueue)
377 {
378 	cpu_status status;
379 
380 	tq_lock(taskQueue, &status);
381 	taskQueue->tq_flags &= ~TQ_FLAGS_BLOCKED;
382 	if (taskQueue->tq_flags & TQ_FLAGS_PENDING) {
383 		taskQueue->tq_flags &= ~TQ_FLAGS_PENDING;
384 		taskQueue->tq_enqueue(taskQueue->tq_arg);
385 	}
386 	tq_unlock(taskQueue, status);
387 }
388