xref: /haiku/src/libs/compat/freebsd_network/taskqueue.c (revision 1214ef1b2100f2b3299fc9d8d6142e46f70a4c3f)
1 /*
2  * Copyright 2007, Hugo Santos. All Rights Reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *      Hugo Santos, hugosantos@gmail.com
7  */
8 
9 #include "device.h"
10 
11 #include <stdio.h>
12 
13 #include <compat/sys/taskqueue.h>
14 #include <compat/sys/haiku-module.h>
15 
16 struct taskqueue {
17 	char tq_name[64];
18 	mutex tq_mutex;
19 	struct list tq_list;
20 	taskqueue_enqueue_fn tq_enqueue;
21 	void *tq_arg;
22 	int tq_fast;
23 	int32 tq_spinlock;
24 	sem_id tq_sem;
25 	thread_id *tq_threads;
26 	thread_id tq_thread_storage;
27 	int tq_threadcount;
28 };
29 
30 
31 struct taskqueue *taskqueue_fast = NULL;
32 struct taskqueue *taskqueue_swi = NULL;
33 
34 
35 static struct taskqueue *
36 _taskqueue_create(const char *name, int mflags, int fast,
37 	taskqueue_enqueue_fn enqueue, void *context)
38 {
39 	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
40 	if (tq == NULL)
41 		return NULL;
42 
43 	tq->tq_fast = fast;
44 
45 	if (fast) {
46 		tq->tq_spinlock = 0;
47 	} else {
48 		if (mutex_init(&tq->tq_mutex, name) < B_OK) {
49 			free(tq);
50 			return NULL;
51 		}
52 	}
53 
54 	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
55 	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
56 	tq->tq_enqueue = enqueue;
57 	tq->tq_arg = context;
58 
59 	tq->tq_sem = -1;
60 	tq->tq_threads = NULL;
61 	tq->tq_threadcount = 0;
62 
63 	return tq;
64 }
65 
66 
67 static void
68 tq_lock(struct taskqueue *tq, cpu_status *status)
69 {
70 	if (tq->tq_fast) {
71 		*status = disable_interrupts();
72 		acquire_spinlock(&tq->tq_spinlock);
73 	} else {
74 		mutex_lock(&tq->tq_mutex);
75 	}
76 }
77 
78 
79 static void
80 tq_unlock(struct taskqueue *tq, cpu_status status)
81 {
82 	if (tq->tq_fast) {
83 		release_spinlock(&tq->tq_spinlock);
84 		restore_interrupts(status);
85 	} else {
86 		mutex_unlock(&tq->tq_mutex);
87 	}
88 }
89 
90 
91 struct taskqueue *
92 taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue,
93 	void *context, void **unused)
94 {
95 	return _taskqueue_create(name, mflags, 0, enqueue, context);
96 }
97 
98 
99 static int32
100 tq_handle_thread(void *data)
101 {
102 	struct taskqueue *tq = data;
103 	cpu_status cpu_state;
104 	struct task *t;
105 	int pending;
106 	sem_id sem;
107 
108 	/* just a synchronization point */
109 	tq_lock(tq, &cpu_state);
110 	sem = tq->tq_sem;
111 	tq_unlock(tq, cpu_state);
112 
113 	while (1) {
114 		status_t status = acquire_sem(sem);
115 		if (status < B_OK)
116 			break;
117 
118 		tq_lock(tq, &cpu_state);
119 		t = list_remove_head_item(&tq->tq_list);
120 		pending = t->ta_pending;
121 		t->ta_pending = 0;
122 		tq_unlock(tq, cpu_state);
123 
124 		t->ta_handler(t->ta_argument, pending);
125 	}
126 
127 	return 0;
128 }
129 
130 
131 static int
132 _taskqueue_start_threads(struct taskqueue **tqp, int count, int prio,
133 	const char *name)
134 {
135 	struct taskqueue *tq = (*tqp);
136 	int i, j;
137 
138 	if (count == 0)
139 		return -1;
140 
141 	if (tq->tq_threads != NULL)
142 		return -1;
143 
144 	if (count == 1) {
145 		tq->tq_threads = &tq->tq_thread_storage;
146 	} else {
147 		tq->tq_threads = malloc(sizeof(thread_id) * count);
148 		if (tq->tq_threads == NULL)
149 			return B_NO_MEMORY;
150 	}
151 
152 	tq->tq_sem = create_sem(0, tq->tq_name);
153 	if (tq->tq_sem < B_OK) {
154 		if (count > 1)
155 			free(tq->tq_threads);
156 		tq->tq_threads = NULL;
157 		return tq->tq_sem;
158 	}
159 
160 	for (i = 0; i < count; i++) {
161 		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
162 			prio, tq);
163 		if (tq->tq_threads[i] < B_OK) {
164 			status_t status = tq->tq_threads[i];
165 			for (j = 0; j < i; j++)
166 				kill_thread(tq->tq_threads[j]);
167 			if (count > 1)
168 				free(tq->tq_threads);
169 			tq->tq_threads = NULL;
170 			delete_sem(tq->tq_sem);
171 			return status;
172 		}
173 	}
174 
175 	tq->tq_threadcount = count;
176 
177 	for (i = 0; i < count; i++)
178 		resume_thread(tq->tq_threads[i]);
179 
180 	return 0;
181 }
182 
183 
184 int
185 taskqueue_start_threads(struct taskqueue **tqp, int count, int prio,
186 	const char *format, ...)
187 {
188 	/* we assume that start_threads is called in a sane place, and thus
189 	 * don't need to be locked. This is mostly due to the fact that if
190 	 * the TQ is 'fast', locking the TQ disables interrupts... and then
191 	 * we can't create semaphores, threads and bananas. */
192 
193 	/* cpu_status state; */
194 	char name[64];
195 	int result;
196 	va_list vl;
197 
198 	va_start(vl, format);
199 	vsnprintf(name, sizeof(name), format, vl);
200 	va_end(vl);
201 
202 	/*tq_lock(*tqp, &state);*/
203 	result = _taskqueue_start_threads(tqp, count, prio, name);
204 	/*tq_unlock(*tqp, state);*/
205 
206 	return result;
207 }
208 
209 
210 void
211 taskqueue_free(struct taskqueue *tq)
212 {
213 	/* lock and  drain list? */
214 	if (!tq->tq_fast)
215 		mutex_destroy(&tq->tq_mutex);
216 	if (tq->tq_sem != -1) {
217 		int i;
218 
219 		delete_sem(tq->tq_sem);
220 
221 		for (i = 0; i < tq->tq_threadcount; i++) {
222 			status_t status;
223 			wait_for_thread(tq->tq_threads[i], &status);
224 		}
225 
226 		if (tq->tq_threadcount > 1)
227 			free(tq->tq_threads);
228 	}
229 
230 	free(tq);
231 }
232 
233 
234 void
235 taskqueue_drain(struct taskqueue *tq, struct task *task)
236 {
237 	cpu_status status;
238 
239 	tq_lock(tq, &status);
240 	if (task->ta_pending != 0)
241 		UNIMPLEMENTED();
242 	tq_unlock(tq, status);
243 }
244 
245 
246 int
247 taskqueue_enqueue(struct taskqueue *tq, struct task *task)
248 {
249 	cpu_status status;
250 	tq_lock(tq, &status);
251 	/* we don't really support priorities */
252 	if (task->ta_pending) {
253 		task->ta_pending++;
254 	} else {
255 		list_add_item(&tq->tq_list, task);
256 		task->ta_pending = 1;
257 		tq->tq_enqueue(tq->tq_arg);
258 	}
259 	tq_unlock(tq, status);
260 	return 0;
261 }
262 
263 
264 void
265 taskqueue_thread_enqueue(void *context)
266 {
267 	struct taskqueue **tqp = context;
268 	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
269 }
270 
271 
272 int
273 taskqueue_enqueue_fast(struct taskqueue *tq, struct task *task)
274 {
275 	return taskqueue_enqueue(tq, task);
276 }
277 
278 
279 struct taskqueue *
280 taskqueue_create_fast(const char *name, int mflags,
281 	taskqueue_enqueue_fn enqueue, void *context)
282 {
283 	return _taskqueue_create(name, mflags, 1, enqueue, context);
284 }
285 
286 
287 void
288 task_init(struct task *t, int prio, task_handler_t handler, void *context)
289 {
290 	t->ta_priority = prio;
291 	t->ta_handler = handler;
292 	t->ta_argument = context;
293 	t->ta_pending = 0;
294 }
295 
296 
297 status_t
298 init_taskqueues()
299 {
300 	status_t status = B_NO_MEMORY;
301 
302 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
303 		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
304 			taskqueue_thread_enqueue, &taskqueue_fast);
305 		if (taskqueue_fast == NULL)
306 			return B_NO_MEMORY;
307 
308 		status = taskqueue_start_threads(&taskqueue_fast, 1,
309 			B_REAL_TIME_PRIORITY, "fast taskq");
310 		if (status < B_OK)
311 			goto err_1;
312 	}
313 
314 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
315 		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
316 			taskqueue_thread_enqueue, &taskqueue_swi);
317 		if (taskqueue_swi == NULL) {
318 			status = B_NO_MEMORY;
319 			goto err_1;
320 		}
321 
322 		status = taskqueue_start_threads(&taskqueue_swi, 1,
323 			B_REAL_TIME_PRIORITY, "swi taskq");
324 		if (status < B_OK)
325 			goto err_2;
326 	}
327 
328 	return B_OK;
329 
330 err_2:
331 	if (taskqueue_swi)
332 		taskqueue_free(taskqueue_swi);
333 
334 err_1:
335 	if (taskqueue_fast)
336 		taskqueue_free(taskqueue_fast);
337 
338 	return status;
339 }
340 
341 
342 void
343 uninit_taskqueues()
344 {
345 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
346 		taskqueue_free(taskqueue_swi);
347 
348 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
349 		taskqueue_free(taskqueue_fast);
350 }
351