1 /* 2 * Copyright 2007, Hugo Santos. All Rights Reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Hugo Santos, hugosantos@gmail.com 7 */ 8 9 #include "device.h" 10 11 #include <stdio.h> 12 13 #include <compat/sys/taskqueue.h> 14 #include <compat/sys/haiku-module.h> 15 16 struct taskqueue { 17 char tq_name[64]; 18 mutex tq_mutex; 19 struct list tq_list; 20 taskqueue_enqueue_fn tq_enqueue; 21 void *tq_arg; 22 int tq_fast; 23 int32 tq_spinlock; 24 sem_id tq_sem; 25 thread_id *tq_threads; 26 thread_id tq_thread_storage; 27 int tq_threadcount; 28 }; 29 30 31 struct taskqueue *taskqueue_fast = NULL; 32 struct taskqueue *taskqueue_swi = NULL; 33 34 35 static struct taskqueue * 36 _taskqueue_create(const char *name, int mflags, int fast, 37 taskqueue_enqueue_fn enqueue, void *context) 38 { 39 struct taskqueue *tq = malloc(sizeof(struct taskqueue)); 40 if (tq == NULL) 41 return NULL; 42 43 tq->tq_fast = fast; 44 45 if (fast) { 46 tq->tq_spinlock = 0; 47 } else { 48 if (mutex_init(&tq->tq_mutex, name) < B_OK) { 49 free(tq); 50 return NULL; 51 } 52 } 53 54 strlcpy(tq->tq_name, name, sizeof(tq->tq_name)); 55 list_init_etc(&tq->tq_list, offsetof(struct task, ta_link)); 56 tq->tq_enqueue = enqueue; 57 tq->tq_arg = context; 58 59 tq->tq_sem = -1; 60 tq->tq_threads = NULL; 61 tq->tq_threadcount = 0; 62 63 return tq; 64 } 65 66 67 static void 68 tq_lock(struct taskqueue *tq, cpu_status *status) 69 { 70 if (tq->tq_fast) { 71 *status = disable_interrupts(); 72 acquire_spinlock(&tq->tq_spinlock); 73 } else { 74 mutex_lock(&tq->tq_mutex); 75 } 76 } 77 78 79 static void 80 tq_unlock(struct taskqueue *tq, cpu_status status) 81 { 82 if (tq->tq_fast) { 83 release_spinlock(&tq->tq_spinlock); 84 restore_interrupts(status); 85 } else { 86 mutex_unlock(&tq->tq_mutex); 87 } 88 } 89 90 91 struct taskqueue * 92 taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, 93 void *context, void **unused) 94 { 95 return _taskqueue_create(name, mflags, 0, enqueue, context); 96 } 97 98 99 static int32 100 tq_handle_thread(void *data) 101 { 102 struct taskqueue *tq = data; 103 cpu_status cpu_state; 104 struct task *t; 105 int pending; 106 sem_id sem; 107 108 /* just a synchronization point */ 109 tq_lock(tq, &cpu_state); 110 sem = tq->tq_sem; 111 tq_unlock(tq, cpu_state); 112 113 while (1) { 114 status_t status = acquire_sem(sem); 115 if (status < B_OK) 116 break; 117 118 tq_lock(tq, &cpu_state); 119 t = list_remove_head_item(&tq->tq_list); 120 pending = t->ta_pending; 121 t->ta_pending = 0; 122 tq_unlock(tq, cpu_state); 123 124 t->ta_handler(t->ta_argument, pending); 125 } 126 127 return 0; 128 } 129 130 131 static int 132 _taskqueue_start_threads(struct taskqueue **tqp, int count, int prio, 133 const char *name) 134 { 135 struct taskqueue *tq = (*tqp); 136 int i, j; 137 138 if (count == 0) 139 return -1; 140 141 if (tq->tq_threads != NULL) 142 return -1; 143 144 if (count == 1) { 145 tq->tq_threads = &tq->tq_thread_storage; 146 } else { 147 tq->tq_threads = malloc(sizeof(thread_id) * count); 148 if (tq->tq_threads == NULL) 149 return B_NO_MEMORY; 150 } 151 152 tq->tq_sem = create_sem(0, tq->tq_name); 153 if (tq->tq_sem < B_OK) { 154 if (count > 1) 155 free(tq->tq_threads); 156 tq->tq_threads = NULL; 157 return tq->tq_sem; 158 } 159 160 for (i = 0; i < count; i++) { 161 tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name, 162 prio, tq); 163 if (tq->tq_threads[i] < B_OK) { 164 status_t status = tq->tq_threads[i]; 165 for (j = 0; j < i; j++) 166 kill_thread(tq->tq_threads[j]); 167 if (count > 1) 168 free(tq->tq_threads); 169 tq->tq_threads = NULL; 170 delete_sem(tq->tq_sem); 171 return status; 172 } 173 } 174 175 tq->tq_threadcount = count; 176 177 for (i = 0; i < count; i++) 178 resume_thread(tq->tq_threads[i]); 179 180 return 0; 181 } 182 183 184 int 185 taskqueue_start_threads(struct taskqueue **tqp, int count, int prio, 186 const char *format, ...) 187 { 188 /* we assume that start_threads is called in a sane place, and thus 189 * don't need to be locked. This is mostly due to the fact that if 190 * the TQ is 'fast', locking the TQ disables interrupts... and then 191 * we can't create semaphores, threads and bananas. */ 192 193 /* cpu_status state; */ 194 char name[64]; 195 int result; 196 va_list vl; 197 198 va_start(vl, format); 199 vsnprintf(name, sizeof(name), format, vl); 200 va_end(vl); 201 202 /*tq_lock(*tqp, &state);*/ 203 result = _taskqueue_start_threads(tqp, count, prio, name); 204 /*tq_unlock(*tqp, state);*/ 205 206 return result; 207 } 208 209 210 void 211 taskqueue_free(struct taskqueue *tq) 212 { 213 /* lock and drain list? */ 214 if (!tq->tq_fast) 215 mutex_destroy(&tq->tq_mutex); 216 if (tq->tq_sem != -1) { 217 int i; 218 219 delete_sem(tq->tq_sem); 220 221 for (i = 0; i < tq->tq_threadcount; i++) { 222 status_t status; 223 wait_for_thread(tq->tq_threads[i], &status); 224 } 225 226 if (tq->tq_threadcount > 1) 227 free(tq->tq_threads); 228 } 229 230 free(tq); 231 } 232 233 234 void 235 taskqueue_drain(struct taskqueue *tq, struct task *task) 236 { 237 cpu_status status; 238 239 tq_lock(tq, &status); 240 if (task->ta_pending != 0) 241 UNIMPLEMENTED(); 242 tq_unlock(tq, status); 243 } 244 245 246 int 247 taskqueue_enqueue(struct taskqueue *tq, struct task *task) 248 { 249 cpu_status status; 250 tq_lock(tq, &status); 251 /* we don't really support priorities */ 252 if (task->ta_pending) { 253 task->ta_pending++; 254 } else { 255 list_add_item(&tq->tq_list, task); 256 task->ta_pending = 1; 257 tq->tq_enqueue(tq->tq_arg); 258 } 259 tq_unlock(tq, status); 260 return 0; 261 } 262 263 264 void 265 taskqueue_thread_enqueue(void *context) 266 { 267 struct taskqueue **tqp = context; 268 release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE); 269 } 270 271 272 int 273 taskqueue_enqueue_fast(struct taskqueue *tq, struct task *task) 274 { 275 return taskqueue_enqueue(tq, task); 276 } 277 278 279 struct taskqueue * 280 taskqueue_create_fast(const char *name, int mflags, 281 taskqueue_enqueue_fn enqueue, void *context) 282 { 283 return _taskqueue_create(name, mflags, 1, enqueue, context); 284 } 285 286 287 void 288 task_init(struct task *t, int prio, task_handler_t handler, void *context) 289 { 290 t->ta_priority = prio; 291 t->ta_handler = handler; 292 t->ta_argument = context; 293 t->ta_pending = 0; 294 } 295 296 297 status_t 298 init_taskqueues() 299 { 300 status_t status = B_NO_MEMORY; 301 302 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) { 303 taskqueue_fast = taskqueue_create_fast("fast taskq", 0, 304 taskqueue_thread_enqueue, &taskqueue_fast); 305 if (taskqueue_fast == NULL) 306 return B_NO_MEMORY; 307 308 status = taskqueue_start_threads(&taskqueue_fast, 1, 309 B_REAL_TIME_PRIORITY, "fast taskq"); 310 if (status < B_OK) 311 goto err_1; 312 } 313 314 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) { 315 taskqueue_swi = taskqueue_create_fast("swi taskq", 0, 316 taskqueue_thread_enqueue, &taskqueue_swi); 317 if (taskqueue_swi == NULL) { 318 status = B_NO_MEMORY; 319 goto err_1; 320 } 321 322 status = taskqueue_start_threads(&taskqueue_swi, 1, 323 B_REAL_TIME_PRIORITY, "swi taskq"); 324 if (status < B_OK) 325 goto err_2; 326 } 327 328 return B_OK; 329 330 err_2: 331 if (taskqueue_swi) 332 taskqueue_free(taskqueue_swi); 333 334 err_1: 335 if (taskqueue_fast) 336 taskqueue_free(taskqueue_fast); 337 338 return status; 339 } 340 341 342 void 343 uninit_taskqueues() 344 { 345 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) 346 taskqueue_free(taskqueue_swi); 347 348 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) 349 taskqueue_free(taskqueue_fast); 350 } 351