xref: /haiku/src/libs/compat/freebsd_iflib/mp_ring.c (revision 1e60bdeab63fa7a57bc9a55b032052e95a18bd2c)
1 /*-
2  * Copyright (c) 2014 Chelsio Communications, Inc.
3  * All rights reserved.
4  * Written by: Navdeep Parhar <np@FreeBSD.org>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  */
27 
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30 
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
39 
40 #if defined(__i386__)
41 #define atomic_cmpset_acq_64 atomic_cmpset_64
42 #define atomic_cmpset_rel_64 atomic_cmpset_64
43 #endif
44 
45 #include <net/mp_ring.h>
46 
47 union ring_state {
48 	struct {
49 		uint16_t pidx_head;
50 		uint16_t pidx_tail;
51 		uint16_t cidx;
52 		uint16_t flags;
53 	};
54 	uint64_t state;
55 };
56 
57 enum {
58 	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
59 	BUSY,		/* consumer is running already, or will be shortly. */
60 	STALLED,	/* consumer stopped due to lack of resources. */
61 	ABDICATED,	/* consumer stopped even though there was work to be
62 			   done because it wants another thread to take over. */
63 };
64 
65 static inline uint16_t
66 space_available(struct ifmp_ring *r, union ring_state s)
67 {
68 	uint16_t x = r->size - 1;
69 
70 	if (s.cidx == s.pidx_head)
71 		return (x);
72 	else if (s.cidx > s.pidx_head)
73 		return (s.cidx - s.pidx_head - 1);
74 	else
75 		return (x - s.pidx_head + s.cidx);
76 }
77 
78 static inline uint16_t
79 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
80 {
81 	int x = r->size - idx;
82 
83 	MPASS(x > 0);
84 	return (x > n ? idx + n : n - x);
85 }
86 
87 /* Consumer is about to update the ring's state to s */
88 static inline uint16_t
89 state_to_flags(union ring_state s, int abdicate)
90 {
91 
92 	if (s.cidx == s.pidx_tail)
93 		return (IDLE);
94 	else if (abdicate && s.pidx_tail != s.pidx_head)
95 		return (ABDICATED);
96 
97 	return (BUSY);
98 }
99 
100 #ifdef NO_64BIT_ATOMICS
101 static void
102 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
103 {
104 	union ring_state ns;
105 	int n, pending, total;
106 	uint16_t cidx = os.cidx;
107 	uint16_t pidx = os.pidx_tail;
108 
109 	MPASS(os.flags == BUSY);
110 	MPASS(cidx != pidx);
111 
112 	if (prev == IDLE)
113 		counter_u64_add(r->starts, 1);
114 	pending = 0;
115 	total = 0;
116 
117 	while (cidx != pidx) {
118 
119 		/* Items from cidx to pidx are available for consumption. */
120 		n = r->drain(r, cidx, pidx);
121 		if (n == 0) {
122 			os.state = ns.state = r->state;
123 			ns.cidx = cidx;
124 			ns.flags = STALLED;
125 			r->state = ns.state;
126 			if (prev != STALLED)
127 				counter_u64_add(r->stalls, 1);
128 			else if (total > 0) {
129 				counter_u64_add(r->restarts, 1);
130 				counter_u64_add(r->stalls, 1);
131 			}
132 			break;
133 		}
134 		cidx = increment_idx(r, cidx, n);
135 		pending += n;
136 		total += n;
137 
138 		/*
139 		 * We update the cidx only if we've caught up with the pidx, the
140 		 * real cidx is getting too far ahead of the one visible to
141 		 * everyone else, or we have exceeded our budget.
142 		 */
143 		if (cidx != pidx && pending < 64 && total < budget)
144 			continue;
145 
146 		os.state = ns.state = r->state;
147 		ns.cidx = cidx;
148 		ns.flags = state_to_flags(ns, total >= budget);
149 		r->state = ns.state;
150 
151 		if (ns.flags == ABDICATED)
152 			counter_u64_add(r->abdications, 1);
153 		if (ns.flags != BUSY) {
154 			/* Wrong loop exit if we're going to stall. */
155 			MPASS(ns.flags != STALLED);
156 			if (prev == STALLED) {
157 				MPASS(total > 0);
158 				counter_u64_add(r->restarts, 1);
159 			}
160 			break;
161 		}
162 
163 		/*
164 		 * The acquire style atomic above guarantees visibility of items
165 		 * associated with any pidx change that we notice here.
166 		 */
167 		pidx = ns.pidx_tail;
168 		pending = 0;
169 	}
170 }
171 #else
172 /*
173  * Caller passes in a state, with a guarantee that there is work to do and that
174  * all items up to the pidx_tail in the state are visible.
175  */
176 static void
177 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
178 {
179 	union ring_state ns;
180 	int n, pending, total;
181 	uint16_t cidx = os.cidx;
182 	uint16_t pidx = os.pidx_tail;
183 
184 	MPASS(os.flags == BUSY);
185 	MPASS(cidx != pidx);
186 
187 	if (prev == IDLE)
188 		counter_u64_add(r->starts, 1);
189 	pending = 0;
190 	total = 0;
191 
192 	while (cidx != pidx) {
193 
194 		/* Items from cidx to pidx are available for consumption. */
195 		n = r->drain(r, cidx, pidx);
196 		if (n == 0) {
197 			critical_enter();
198 			do {
199 				os.state = ns.state = r->state;
200 				ns.cidx = cidx;
201 				ns.flags = STALLED;
202 			} while (atomic_cmpset_64(&r->state, os.state,
203 			    ns.state) == 0);
204 			critical_exit();
205 			if (prev != STALLED)
206 				counter_u64_add(r->stalls, 1);
207 			else if (total > 0) {
208 				counter_u64_add(r->restarts, 1);
209 				counter_u64_add(r->stalls, 1);
210 			}
211 			break;
212 		}
213 		cidx = increment_idx(r, cidx, n);
214 		pending += n;
215 		total += n;
216 
217 		/*
218 		 * We update the cidx only if we've caught up with the pidx, the
219 		 * real cidx is getting too far ahead of the one visible to
220 		 * everyone else, or we have exceeded our budget.
221 		 */
222 		if (cidx != pidx && pending < 64 && total < budget)
223 			continue;
224 		critical_enter();
225 		do {
226 			os.state = ns.state = r->state;
227 			ns.cidx = cidx;
228 			ns.flags = state_to_flags(ns, total >= budget);
229 		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
230 		critical_exit();
231 
232 		if (ns.flags == ABDICATED)
233 			counter_u64_add(r->abdications, 1);
234 		if (ns.flags != BUSY) {
235 			/* Wrong loop exit if we're going to stall. */
236 			MPASS(ns.flags != STALLED);
237 			if (prev == STALLED) {
238 				MPASS(total > 0);
239 				counter_u64_add(r->restarts, 1);
240 			}
241 			break;
242 		}
243 
244 		/*
245 		 * The acquire style atomic above guarantees visibility of items
246 		 * associated with any pidx change that we notice here.
247 		 */
248 		pidx = ns.pidx_tail;
249 		pending = 0;
250 	}
251 }
252 #endif
253 
254 int
255 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
256     mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
257 {
258 	struct ifmp_ring *r;
259 
260 	/* All idx are 16b so size can be 65536 at most */
261 	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
262 	    can_drain == NULL)
263 		return (EINVAL);
264 	*pr = NULL;
265 	flags &= M_NOWAIT | M_WAITOK;
266 	MPASS(flags != 0);
267 
268 	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269 	if (r == NULL)
270 		return (ENOMEM);
271 	r->size = size;
272 	r->cookie = cookie;
273 	r->mt = mt;
274 	r->drain = drain;
275 	r->can_drain = can_drain;
276 	r->enqueues = counter_u64_alloc(flags);
277 	r->drops = counter_u64_alloc(flags);
278 	r->starts = counter_u64_alloc(flags);
279 	r->stalls = counter_u64_alloc(flags);
280 	r->restarts = counter_u64_alloc(flags);
281 	r->abdications = counter_u64_alloc(flags);
282 	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
283 	    r->stalls == NULL || r->restarts == NULL ||
284 	    r->abdications == NULL) {
285 		ifmp_ring_free(r);
286 		return (ENOMEM);
287 	}
288 
289 	*pr = r;
290 #ifdef NO_64BIT_ATOMICS
291 	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
292 #endif
293 	return (0);
294 }
295 
296 void
297 ifmp_ring_free(struct ifmp_ring *r)
298 {
299 
300 	if (r == NULL)
301 		return;
302 
303 	if (r->enqueues != NULL)
304 		counter_u64_free(r->enqueues);
305 	if (r->drops != NULL)
306 		counter_u64_free(r->drops);
307 	if (r->starts != NULL)
308 		counter_u64_free(r->starts);
309 	if (r->stalls != NULL)
310 		counter_u64_free(r->stalls);
311 	if (r->restarts != NULL)
312 		counter_u64_free(r->restarts);
313 	if (r->abdications != NULL)
314 		counter_u64_free(r->abdications);
315 
316 	free(r, r->mt);
317 }
318 
319 /*
320  * Enqueue n items and maybe drain the ring for some time.
321  *
322  * Returns an errno.
323  */
324 #ifdef NO_64BIT_ATOMICS
325 int
326 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
327 {
328 	union ring_state os, ns;
329 	uint16_t pidx_start, pidx_stop;
330 	int i;
331 
332 	MPASS(items != NULL);
333 	MPASS(n > 0);
334 
335 	mtx_lock(&r->lock);
336 	/*
337 	 * Reserve room for the new items.  Our reservation, if successful, is
338 	 * from 'pidx_start' to 'pidx_stop'.
339 	 */
340 	os.state = r->state;
341 	if (n >= space_available(r, os)) {
342 		counter_u64_add(r->drops, n);
343 		MPASS(os.flags != IDLE);
344 		mtx_unlock(&r->lock);
345 		if (os.flags == STALLED)
346 			ifmp_ring_check_drainage(r, 0);
347 		return (ENOBUFS);
348 	}
349 	ns.state = os.state;
350 	ns.pidx_head = increment_idx(r, os.pidx_head, n);
351 	r->state = ns.state;
352 	pidx_start = os.pidx_head;
353 	pidx_stop = ns.pidx_head;
354 
355 	/*
356 	 * Wait for other producers who got in ahead of us to enqueue their
357 	 * items, one producer at a time.  It is our turn when the ring's
358 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
359 	 */
360 	while (ns.pidx_tail != pidx_start) {
361 		cpu_spinwait();
362 		ns.state = r->state;
363 	}
364 
365 	/* Now it is our turn to fill up the area we reserved earlier. */
366 	i = pidx_start;
367 	do {
368 		r->items[i] = *items++;
369 		/*HAIKU*/KASSERT((r->items[i] == NULL) || ((uintptr_t)(r->items[i]) > 1024UL), ("is %p", r->items[i]));
370 		if (__predict_false(++i == r->size))
371 			i = 0;
372 	} while (i != pidx_stop);
373 
374 	/*
375 	 * Update the ring's pidx_tail.  The release style atomic guarantees
376 	 * that the items are visible to any thread that sees the updated pidx.
377 	 */
378 	os.state = ns.state = r->state;
379 	ns.pidx_tail = pidx_stop;
380 	if (abdicate) {
381 		if (os.flags == IDLE)
382 			ns.flags = ABDICATED;
383 	}
384 	else {
385 		ns.flags = BUSY;
386 	}
387 	r->state = ns.state;
388 	counter_u64_add(r->enqueues, n);
389 
390 	if (!abdicate) {
391 		/*
392 		 * Turn into a consumer if some other thread isn't active as a consumer
393 		 * already.
394 		 */
395 		if (os.flags != BUSY)
396 			drain_ring_locked(r, ns, os.flags, budget);
397 	}
398 
399 	mtx_unlock(&r->lock);
400 	return (0);
401 }
402 
403 #else
404 int
405 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
406 {
407 	union ring_state os, ns;
408 	uint16_t pidx_start, pidx_stop;
409 	int i;
410 
411 	MPASS(items != NULL);
412 	MPASS(n > 0);
413 
414 	/*
415 	 * Reserve room for the new items.  Our reservation, if successful, is
416 	 * from 'pidx_start' to 'pidx_stop'.
417 	 */
418 	for (;;) {
419 		os.state = r->state;
420 		if (n >= space_available(r, os)) {
421 			counter_u64_add(r->drops, n);
422 			MPASS(os.flags != IDLE);
423 			if (os.flags == STALLED)
424 				ifmp_ring_check_drainage(r, 0);
425 			return (ENOBUFS);
426 		}
427 		ns.state = os.state;
428 		ns.pidx_head = increment_idx(r, os.pidx_head, n);
429 		critical_enter();
430 		if (atomic_cmpset_64(&r->state, os.state, ns.state))
431 			break;
432 		critical_exit();
433 		cpu_spinwait();
434 	}
435 	pidx_start = os.pidx_head;
436 	pidx_stop = ns.pidx_head;
437 
438 	/*
439 	 * Wait for other producers who got in ahead of us to enqueue their
440 	 * items, one producer at a time.  It is our turn when the ring's
441 	 * pidx_tail reaches the beginning of our reservation (pidx_start).
442 	 */
443 	while (ns.pidx_tail != pidx_start) {
444 		cpu_spinwait();
445 		ns.state = r->state;
446 	}
447 
448 	/* Now it is our turn to fill up the area we reserved earlier. */
449 	i = pidx_start;
450 	do {
451 		r->items[i] = *items++;
452 		if (__predict_false(++i == r->size))
453 			i = 0;
454 	} while (i != pidx_stop);
455 
456 	/*
457 	 * Update the ring's pidx_tail.  The release style atomic guarantees
458 	 * that the items are visible to any thread that sees the updated pidx.
459 	 */
460 	do {
461 		os.state = ns.state = r->state;
462 		ns.pidx_tail = pidx_stop;
463 		if (abdicate) {
464 			if (os.flags == IDLE)
465 				ns.flags = ABDICATED;
466 		}
467 		else {
468 			ns.flags = BUSY;
469 		}
470 	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
471 	critical_exit();
472 	counter_u64_add(r->enqueues, n);
473 
474 	if (!abdicate) {
475 		/*
476 		 * Turn into a consumer if some other thread isn't active as a consumer
477 		 * already.
478 		 */
479 		if (os.flags != BUSY)
480 			drain_ring_lockless(r, ns, os.flags, budget);
481 	}
482 
483 	return (0);
484 }
485 #endif
486 
487 void
488 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
489 {
490 	union ring_state os, ns;
491 
492 	os.state = r->state;
493 	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
494 	    os.pidx_head != os.pidx_tail ||			// Require work to be available
495 	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
496 		return;
497 
498 	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
499 	ns.state = os.state;
500 	ns.flags = BUSY;
501 
502 
503 #ifdef NO_64BIT_ATOMICS
504 	mtx_lock(&r->lock);
505 	if (r->state != os.state) {
506 		mtx_unlock(&r->lock);
507 		return;
508 	}
509 	r->state = ns.state;
510 	drain_ring_locked(r, ns, os.flags, budget);
511 	mtx_unlock(&r->lock);
512 #else
513 	/*
514 	 * The acquire style atomic guarantees visibility of items associated
515 	 * with the pidx that we read here.
516 	 */
517 	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
518 		return;
519 
520 
521 	drain_ring_lockless(r, ns, os.flags, budget);
522 #endif
523 }
524 
525 void
526 ifmp_ring_reset_stats(struct ifmp_ring *r)
527 {
528 
529 	counter_u64_zero(r->enqueues);
530 	counter_u64_zero(r->drops);
531 	counter_u64_zero(r->starts);
532 	counter_u64_zero(r->stalls);
533 	counter_u64_zero(r->restarts);
534 	counter_u64_zero(r->abdications);
535 }
536 
537 int
538 ifmp_ring_is_idle(struct ifmp_ring *r)
539 {
540 	union ring_state s;
541 
542 	s.state = r->state;
543 	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
544 	    s.flags == IDLE)
545 		return (1);
546 
547 	return (0);
548 }
549 
550 int
551 ifmp_ring_is_stalled(struct ifmp_ring *r)
552 {
553 	union ring_state s;
554 
555 	s.state = r->state;
556 	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
557 		return (1);
558 
559 	return (0);
560 }
561