1 /* 2 * Copyright 2007, Hugo Santos. All Rights Reserved. 3 * Distributed under the terms of the MIT License. 4 * 5 * Authors: 6 * Hugo Santos, hugosantos@gmail.com 7 */ 8 9 #include "device.h" 10 11 #include <stdio.h> 12 13 #include <compat/sys/taskqueue.h> 14 #include <compat/sys/haiku-module.h> 15 16 struct taskqueue { 17 char tq_name[64]; 18 mutex tq_mutex; 19 struct list tq_list; 20 taskqueue_enqueue_fn tq_enqueue; 21 void *tq_arg; 22 int tq_fast; 23 spinlock tq_spinlock; 24 sem_id tq_sem; 25 thread_id *tq_threads; 26 thread_id tq_thread_storage; 27 int tq_threadcount; 28 }; 29 30 31 struct taskqueue *taskqueue_fast = NULL; 32 struct taskqueue *taskqueue_swi = NULL; 33 34 35 static struct taskqueue * 36 _taskqueue_create(const char *name, int mflags, int fast, 37 taskqueue_enqueue_fn enqueue, void *context) 38 { 39 struct taskqueue *tq = malloc(sizeof(struct taskqueue)); 40 if (tq == NULL) 41 return NULL; 42 43 tq->tq_fast = fast; 44 45 if (fast) { 46 B_INITIALIZE_SPINLOCK(&tq->tq_spinlock); 47 } else { 48 mutex_init_etc(&tq->tq_mutex, name, MUTEX_FLAG_CLONE_NAME); 49 } 50 51 strlcpy(tq->tq_name, name, sizeof(tq->tq_name)); 52 list_init_etc(&tq->tq_list, offsetof(struct task, ta_link)); 53 tq->tq_enqueue = enqueue; 54 tq->tq_arg = context; 55 56 tq->tq_sem = -1; 57 tq->tq_threads = NULL; 58 tq->tq_threadcount = 0; 59 60 return tq; 61 } 62 63 64 static void 65 tq_lock(struct taskqueue *tq, cpu_status *status) 66 { 67 if (tq->tq_fast) { 68 *status = disable_interrupts(); 69 acquire_spinlock(&tq->tq_spinlock); 70 } else { 71 mutex_lock(&tq->tq_mutex); 72 } 73 } 74 75 76 static void 77 tq_unlock(struct taskqueue *tq, cpu_status status) 78 { 79 if (tq->tq_fast) { 80 release_spinlock(&tq->tq_spinlock); 81 restore_interrupts(status); 82 } else { 83 mutex_unlock(&tq->tq_mutex); 84 } 85 } 86 87 88 struct taskqueue * 89 taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, 90 void *context, void **unused) 91 { 92 return _taskqueue_create(name, mflags, 0, enqueue, context); 93 } 94 95 96 static int32 97 tq_handle_thread(void *data) 98 { 99 struct taskqueue *tq = data; 100 cpu_status cpu_state; 101 struct task *t; 102 int pending; 103 sem_id sem; 104 105 /* just a synchronization point */ 106 tq_lock(tq, &cpu_state); 107 sem = tq->tq_sem; 108 tq_unlock(tq, cpu_state); 109 110 while (1) { 111 status_t status = acquire_sem(sem); 112 if (status < B_OK) 113 break; 114 115 tq_lock(tq, &cpu_state); 116 t = list_remove_head_item(&tq->tq_list); 117 pending = t->ta_pending; 118 t->ta_pending = 0; 119 tq_unlock(tq, cpu_state); 120 121 t->ta_handler(t->ta_argument, pending); 122 } 123 124 return 0; 125 } 126 127 128 static int 129 _taskqueue_start_threads(struct taskqueue **tqp, int count, int prio, 130 const char *name) 131 { 132 struct taskqueue *tq = (*tqp); 133 int i, j; 134 135 if (count == 0) 136 return -1; 137 138 if (tq->tq_threads != NULL) 139 return -1; 140 141 if (count == 1) { 142 tq->tq_threads = &tq->tq_thread_storage; 143 } else { 144 tq->tq_threads = malloc(sizeof(thread_id) * count); 145 if (tq->tq_threads == NULL) 146 return B_NO_MEMORY; 147 } 148 149 tq->tq_sem = create_sem(0, tq->tq_name); 150 if (tq->tq_sem < B_OK) { 151 if (count > 1) 152 free(tq->tq_threads); 153 tq->tq_threads = NULL; 154 return tq->tq_sem; 155 } 156 157 for (i = 0; i < count; i++) { 158 tq->tq_threads[i] = spawn_kernel_thread(tq_handle_thread, tq->tq_name, 159 prio, tq); 160 if (tq->tq_threads[i] < B_OK) { 161 status_t status = tq->tq_threads[i]; 162 for (j = 0; j < i; j++) 163 kill_thread(tq->tq_threads[j]); 164 if (count > 1) 165 free(tq->tq_threads); 166 tq->tq_threads = NULL; 167 delete_sem(tq->tq_sem); 168 return status; 169 } 170 } 171 172 tq->tq_threadcount = count; 173 174 for (i = 0; i < count; i++) 175 resume_thread(tq->tq_threads[i]); 176 177 return 0; 178 } 179 180 181 int 182 taskqueue_start_threads(struct taskqueue **tqp, int count, int prio, 183 const char *format, ...) 184 { 185 /* we assume that start_threads is called in a sane place, and thus 186 * don't need to be locked. This is mostly due to the fact that if 187 * the TQ is 'fast', locking the TQ disables interrupts... and then 188 * we can't create semaphores, threads and bananas. */ 189 190 /* cpu_status state; */ 191 char name[64]; 192 int result; 193 va_list vl; 194 195 va_start(vl, format); 196 vsnprintf(name, sizeof(name), format, vl); 197 va_end(vl); 198 199 /*tq_lock(*tqp, &state);*/ 200 result = _taskqueue_start_threads(tqp, count, prio, name); 201 /*tq_unlock(*tqp, state);*/ 202 203 return result; 204 } 205 206 207 void 208 taskqueue_free(struct taskqueue *tq) 209 { 210 /* lock and drain list? */ 211 if (!tq->tq_fast) 212 mutex_destroy(&tq->tq_mutex); 213 if (tq->tq_sem != -1) { 214 int i; 215 216 delete_sem(tq->tq_sem); 217 218 for (i = 0; i < tq->tq_threadcount; i++) { 219 status_t status; 220 wait_for_thread(tq->tq_threads[i], &status); 221 } 222 223 if (tq->tq_threadcount > 1) 224 free(tq->tq_threads); 225 } 226 227 free(tq); 228 } 229 230 231 void 232 taskqueue_drain(struct taskqueue *tq, struct task *task) 233 { 234 cpu_status status; 235 236 tq_lock(tq, &status); 237 if (task->ta_pending != 0) 238 UNIMPLEMENTED(); 239 tq_unlock(tq, status); 240 } 241 242 243 int 244 taskqueue_enqueue(struct taskqueue *tq, struct task *task) 245 { 246 cpu_status status; 247 tq_lock(tq, &status); 248 /* we don't really support priorities */ 249 if (task->ta_pending) { 250 task->ta_pending++; 251 } else { 252 list_add_item(&tq->tq_list, task); 253 task->ta_pending = 1; 254 tq->tq_enqueue(tq->tq_arg); 255 } 256 tq_unlock(tq, status); 257 return 0; 258 } 259 260 261 void 262 taskqueue_thread_enqueue(void *context) 263 { 264 struct taskqueue **tqp = context; 265 release_sem_etc((*tqp)->tq_sem, 1, B_DO_NOT_RESCHEDULE); 266 } 267 268 269 int 270 taskqueue_enqueue_fast(struct taskqueue *tq, struct task *task) 271 { 272 return taskqueue_enqueue(tq, task); 273 } 274 275 276 struct taskqueue * 277 taskqueue_create_fast(const char *name, int mflags, 278 taskqueue_enqueue_fn enqueue, void *context) 279 { 280 return _taskqueue_create(name, mflags, 1, enqueue, context); 281 } 282 283 284 void 285 task_init(struct task *t, int prio, task_handler_t handler, void *context) 286 { 287 t->ta_priority = prio; 288 t->ta_handler = handler; 289 t->ta_argument = context; 290 t->ta_pending = 0; 291 } 292 293 294 status_t 295 init_taskqueues() 296 { 297 status_t status = B_NO_MEMORY; 298 299 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) { 300 taskqueue_fast = taskqueue_create_fast("fast taskq", 0, 301 taskqueue_thread_enqueue, &taskqueue_fast); 302 if (taskqueue_fast == NULL) 303 return B_NO_MEMORY; 304 305 status = taskqueue_start_threads(&taskqueue_fast, 1, 306 B_REAL_TIME_PRIORITY, "fast taskq"); 307 if (status < B_OK) 308 goto err_1; 309 } 310 311 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) { 312 taskqueue_swi = taskqueue_create_fast("swi taskq", 0, 313 taskqueue_thread_enqueue, &taskqueue_swi); 314 if (taskqueue_swi == NULL) { 315 status = B_NO_MEMORY; 316 goto err_1; 317 } 318 319 status = taskqueue_start_threads(&taskqueue_swi, 1, 320 B_REAL_TIME_PRIORITY, "swi taskq"); 321 if (status < B_OK) 322 goto err_2; 323 } 324 325 return B_OK; 326 327 err_2: 328 if (taskqueue_swi) 329 taskqueue_free(taskqueue_swi); 330 331 err_1: 332 if (taskqueue_fast) 333 taskqueue_free(taskqueue_fast); 334 335 return status; 336 } 337 338 339 void 340 uninit_taskqueues() 341 { 342 if (HAIKU_DRIVER_REQUIRES(FBSD_SWI_TASKQUEUE)) 343 taskqueue_free(taskqueue_swi); 344 345 if (HAIKU_DRIVER_REQUIRES(FBSD_FAST_TASKQUEUE)) 346 taskqueue_free(taskqueue_fast); 347 } 348