xref: /haiku/src/libs/compat/freebsd_network/taskqueue.c (revision a4ef4a49150f118d47324242917a596a3f8f8bd5)
1 /*
2  * Copyright 2007, Hugo Santos. All Rights Reserved.
3  * Distributed under the terms of the MIT License.
4  *
5  * Authors:
6  *      Hugo Santos, hugosantos@gmail.com
7  */
8 
9 #include "device.h"
10 
11 #include <stdio.h>
12 
13 #include <compat/sys/taskqueue.h>
14 #include <compat/sys/haiku-module.h>
15 
16 struct taskqueue {
17 	char tq_name[64];
18 	mutex tq_mutex;
19 	struct list tq_list;
20 	taskqueue_enqueue_fn tq_enqueue;
21 	void *tq_arg;
22 	int tq_fast;
23 	spinlock tq_spinlock;
24 	sem_id tq_sem;
25 	thread_id *tq_threads;
26 	thread_id tq_thread_storage;
27 	int tq_threadcount;
28 };
29 
30 
31 struct taskqueue *taskqueue_fast = NULL;
32 struct taskqueue *taskqueue_swi = NULL;
33 
34 
35 static struct taskqueue *
36 _taskqueue_create(const char *name, int mflags, int fast,
37 	taskqueue_enqueue_fn enqueue, void *context)
38 {
39 	struct taskqueue *tq = malloc(sizeof(struct taskqueue));
40 	if (tq == NULL)
41 		return NULL;
42 
43 	tq->tq_fast = fast;
44 
45 	if (fast) {
46 		B_INITIALIZE_SPINLOCK(&tq->tq_spinlock);
47 	} else {
48 		mutex_init_etc(&tq->tq_mutex, name, MUTEX_FLAG_CLONE_NAME);
49 	}
50 
51 	strlcpy(tq->tq_name, name, sizeof(tq->tq_name));
52 	list_init_etc(&tq->tq_list, offsetof(struct task, ta_link));
53 	tq->tq_enqueue = enqueue;
54 	tq->tq_arg = context;
55 
56 	tq->tq_sem = -1;
57 	tq->tq_threads = NULL;
58 	tq->tq_threadcount = 0;
59 
60 	return tq;
61 }
62 
63 
64 static void
65 tq_lock(struct taskqueue *tq, cpu_status *status)
66 {
67 	if (tq->tq_fast) {
68 		*status = disable_interrupts();
69 		acquire_spinlock(&tq->tq_spinlock);
70 	} else {
71 		mutex_lock(&tq->tq_mutex);
72 	}
73 }
74 
75 
76 static void
77 tq_unlock(struct taskqueue *tq, cpu_status status)
78 {
79 	if (tq->tq_fast) {
80 		release_spinlock(&tq->tq_spinlock);
81 		restore_interrupts(status);
82 	} else {
83 		mutex_unlock(&tq->tq_mutex);
84 	}
85 }
86 
87 
88 struct taskqueue *
89 taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue,
90 	void *context, void **unused)
91 {
92 	return _taskqueue_create(name, mflags, 0, enqueue, context);
93 }
94 
95 
96 static int32
97 tq_handle_thread(void *data)
98 {
99 	struct taskqueue *tq = data;
100 	cpu_status cpu_state;
101 	struct task *t;
102 	int pending;
103 	sem_id sem;
104 
105 	/* just a synchronization point */
106 	tq_lock(tq, &cpu_state);
107 	sem = tq->tq_sem;
108 	tq_unlock(tq, cpu_state);
109 
110 	while (1) {
111 		status_t status = acquire_sem(sem);
112 		if (status < B_OK)
113 			break;
114 
115 		tq_lock(tq, &cpu_state);
116 		t = list_remove_head_item(&tq->tq_list);
117 		pending = t->ta_pending;
118 		t->ta_pending = 0;
119 		tq_unlock(tq, cpu_state);
120 
121 		t->ta_handler(t->ta_argument, pending);
122 	}
123 
124 	return 0;
125 }
126 
127 
128 static int
129 _taskqueue_start_threads(struct taskqueue **tqp, int count, int prio,
130 	const char *name)
131 {
132 	struct taskqueue *tq = (*tqp);
133 	int i, j;
134 
135 	if (count == 0)
136 		return -1;
137 
138 	if (tq->tq_threads != NULL)
139 		return -1;
140 
141 	if (count == 1) {
142 		tq->tq_threads = &tq->tq_thread_storage;
143 	} else {
144 		tq->tq_threads = malloc(sizeof(thread_id) * count);
145 		if (tq->tq_threads == NULL)
146 			return B_NO_MEMORY;
147 	}
148 
149 	tq->tq_sem = create_sem(0, tq->tq_name);
150 	if (tq->tq_sem < B_OK) {
151 		if (count > 1)
152 			free(tq->tq_threads);
153 		tq->tq_threads = NULL;
154 		return tq->tq_sem;
155 	}
156 
157 	for (i = 0; i < count; i++) {
158 		tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name,
159 			prio, tq);
160 		if (tq->tq_threads[i] < B_OK) {
161 			status_t status = tq->tq_threads[i];
162 			for (j = 0; j < i; j++)
163 				kill_thread(tq->tq_threads[j]);
164 			if (count > 1)
165 				free(tq->tq_threads);
166 			tq->tq_threads = NULL;
167 			delete_sem(tq->tq_sem);
168 			return status;
169 		}
170 	}
171 
172 	tq->tq_threadcount = count;
173 
174 	for (i = 0; i < count; i++)
175 		resume_thread(tq->tq_threads[i]);
176 
177 	return 0;
178 }
179 
180 
181 int
182 taskqueue_start_threads(struct taskqueue **tqp, int count, int prio,
183 	const char *format, ...)
184 {
185 	/* we assume that start_threads is called in a sane place, and thus
186 	 * don't need to be locked. This is mostly due to the fact that if
187 	 * the TQ is 'fast', locking the TQ disables interrupts... and then
188 	 * we can't create semaphores, threads and bananas. */
189 
190 	/* cpu_status state; */
191 	char name[64];
192 	int result;
193 	va_list vl;
194 
195 	va_start(vl, format);
196 	vsnprintf(name, sizeof(name), format, vl);
197 	va_end(vl);
198 
199 	/*tq_lock(*tqp, &state);*/
200 	result = _taskqueue_start_threads(tqp, count, prio, name);
201 	/*tq_unlock(*tqp, state);*/
202 
203 	return result;
204 }
205 
206 
207 void
208 taskqueue_free(struct taskqueue *tq)
209 {
210 	/* lock and  drain list? */
211 	if (!tq->tq_fast)
212 		mutex_destroy(&tq->tq_mutex);
213 	if (tq->tq_sem != -1) {
214 		int i;
215 
216 		delete_sem(tq->tq_sem);
217 
218 		for (i = 0; i < tq->tq_threadcount; i++) {
219 			status_t status;
220 			wait_for_thread(tq->tq_threads[i], &status);
221 		}
222 
223 		if (tq->tq_threadcount > 1)
224 			free(tq->tq_threads);
225 	}
226 
227 	free(tq);
228 }
229 
230 
231 void
232 taskqueue_drain(struct taskqueue *tq, struct task *task)
233 {
234 	cpu_status status;
235 
236 	tq_lock(tq, &status);
237 	if (task->ta_pending != 0)
238 		UNIMPLEMENTED();
239 	tq_unlock(tq, status);
240 }
241 
242 
243 int
244 taskqueue_enqueue(struct taskqueue *tq, struct task *task)
245 {
246 	cpu_status status;
247 	tq_lock(tq, &status);
248 	/* we don't really support priorities */
249 	if (task->ta_pending) {
250 		task->ta_pending++;
251 	} else {
252 		list_add_item(&tq->tq_list, task);
253 		task->ta_pending = 1;
254 		tq->tq_enqueue(tq->tq_arg);
255 	}
256 	tq_unlock(tq, status);
257 	return 0;
258 }
259 
260 
261 void
262 taskqueue_thread_enqueue(void *context)
263 {
264 	struct taskqueue **tqp = context;
265 	release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE);
266 }
267 
268 
269 int
270 taskqueue_enqueue_fast(struct taskqueue *tq, struct task *task)
271 {
272 	return taskqueue_enqueue(tq, task);
273 }
274 
275 
276 struct taskqueue *
277 taskqueue_create_fast(const char *name, int mflags,
278 	taskqueue_enqueue_fn enqueue, void *context)
279 {
280 	return _taskqueue_create(name, mflags, 1, enqueue, context);
281 }
282 
283 
284 void
285 task_init(struct task *t, int prio, task_handler_t handler, void *context)
286 {
287 	t->ta_priority = prio;
288 	t->ta_handler = handler;
289 	t->ta_argument = context;
290 	t->ta_pending = 0;
291 }
292 
293 
294 status_t
295 init_taskqueues()
296 {
297 	status_t status = B_NO_MEMORY;
298 
299 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) {
300 		taskqueue_fast = taskqueue_create_fast("fast taskq", 0,
301 			taskqueue_thread_enqueue, &taskqueue_fast);
302 		if (taskqueue_fast == NULL)
303 			return B_NO_MEMORY;
304 
305 		status = taskqueue_start_threads(&taskqueue_fast, 1,
306 			B_REAL_TIME_PRIORITY, "fast taskq");
307 		if (status < B_OK)
308 			goto err_1;
309 	}
310 
311 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) {
312 		taskqueue_swi = taskqueue_create_fast("swi taskq", 0,
313 			taskqueue_thread_enqueue, &taskqueue_swi);
314 		if (taskqueue_swi == NULL) {
315 			status = B_NO_MEMORY;
316 			goto err_1;
317 		}
318 
319 		status = taskqueue_start_threads(&taskqueue_swi, 1,
320 			B_REAL_TIME_PRIORITY, "swi taskq");
321 		if (status < B_OK)
322 			goto err_2;
323 	}
324 
325 	return B_OK;
326 
327 err_2:
328 	if (taskqueue_swi)
329 		taskqueue_free(taskqueue_swi);
330 
331 err_1:
332 	if (taskqueue_fast)
333 		taskqueue_free(taskqueue_fast);
334 
335 	return status;
336 }
337 
338 
339 void
340 uninit_taskqueues()
341 {
342 	if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE))
343 		taskqueue_free(taskqueue_swi);
344 
345 	if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE))
346 		taskqueue_free(taskqueue_fast);
347 }
348