xref: /linux/tools/testing/selftests/bpf/progs/bpf_qdisc_fq.c (revision ab93e0dd72c37d378dd936f031ffb83ff2bd87ce)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 /* bpf_fq is intended for testing the bpf qdisc infrastructure and not a direct
4  * copy of sch_fq. bpf_fq implements the scheduling algorithm of sch_fq before
5  * 29f834aa326e ("net_sched: sch_fq: add 3 bands and WRR scheduling") was
6  * introduced. It gives each flow a fair chance to transmit packets in a
7  * round-robin fashion. Note that for flow pacing, bpf_fq currently only
8  * respects skb->tstamp but not skb->sk->sk_pacing_rate. In addition, if there
9  * are multiple bpf_fq instances, they will have a shared view of flows and
10  * configuration since some key data structure such as fq_prio_flows,
11  * fq_nonprio_flows, and fq_bpf_data are global.
12  *
13  * To use bpf_fq alone without running selftests, use the following commands.
14  *
15  * 1. Register bpf_fq to the kernel
16  *     bpftool struct_ops register bpf_qdisc_fq.bpf.o /sys/fs/bpf
17  * 2. Add bpf_fq to an interface
18  *     tc qdisc add dev <interface name> root handle <handle> bpf_fq
19  * 3. Delete bpf_fq attached to the interface
20  *     tc qdisc delete dev <interface name> root
21  * 4. Unregister bpf_fq
22  *     bpftool struct_ops unregister name fq
23  *
24  * The qdisc name, bpf_fq, used in tc commands is defined by Qdisc_ops.id.
25  * The struct_ops_map_name, fq, used in the bpftool command is the name of the
26  * Qdisc_ops.
27  *
28  * SEC(".struct_ops")
29  * struct Qdisc_ops fq = {
30  *         ...
31  *         .id        = "bpf_fq",
32  * };
33  */
34 
35 #include <vmlinux.h>
36 #include <errno.h>
37 #include <bpf/bpf_helpers.h>
38 #include "bpf_experimental.h"
39 #include "bpf_qdisc_common.h"
40 
41 char _license[] SEC("license") = "GPL";
42 
43 #define NSEC_PER_USEC 1000L
44 #define NSEC_PER_SEC 1000000000L
45 
46 #define NUM_QUEUE (1 << 20)
47 
48 struct fq_bpf_data {
49 	u32 quantum;
50 	u32 initial_quantum;
51 	u32 flow_refill_delay;
52 	u32 flow_plimit;
53 	u64 horizon;
54 	u32 orphan_mask;
55 	u32 timer_slack;
56 	u64 time_next_delayed_flow;
57 	u64 unthrottle_latency_ns;
58 	u8 horizon_drop;
59 	u32 new_flow_cnt;
60 	u32 old_flow_cnt;
61 	u64 ktime_cache;
62 };
63 
64 enum {
65 	CLS_RET_PRIO	= 0,
66 	CLS_RET_NONPRIO = 1,
67 	CLS_RET_ERR	= 2,
68 };
69 
70 struct skb_node {
71 	u64 tstamp;
72 	struct sk_buff __kptr * skb;
73 	struct bpf_rb_node node;
74 };
75 
76 struct fq_flow_node {
77 	int credit;
78 	u32 qlen;
79 	u64 age;
80 	u64 time_next_packet;
81 	struct bpf_list_node list_node;
82 	struct bpf_rb_node rb_node;
83 	struct bpf_rb_root queue __contains(skb_node, node);
84 	struct bpf_spin_lock lock;
85 	struct bpf_refcount refcount;
86 };
87 
88 struct dequeue_nonprio_ctx {
89 	bool stop_iter;
90 	u64 expire;
91 	u64 now;
92 };
93 
94 struct remove_flows_ctx {
95 	bool gc_only;
96 	u32 reset_cnt;
97 	u32 reset_max;
98 };
99 
100 struct unset_throttled_flows_ctx {
101 	bool unset_all;
102 	u64 now;
103 };
104 
105 struct fq_stashed_flow {
106 	struct fq_flow_node __kptr * flow;
107 };
108 
109 struct {
110 	__uint(type, BPF_MAP_TYPE_HASH);
111 	__type(key, __u64);
112 	__type(value, struct fq_stashed_flow);
113 	__uint(max_entries, NUM_QUEUE);
114 } fq_nonprio_flows SEC(".maps");
115 
116 struct {
117 	__uint(type, BPF_MAP_TYPE_HASH);
118 	__type(key, __u64);
119 	__type(value, struct fq_stashed_flow);
120 	__uint(max_entries, 1);
121 } fq_prio_flows SEC(".maps");
122 
123 private(A) struct bpf_spin_lock fq_delayed_lock;
124 private(A) struct bpf_rb_root fq_delayed __contains(fq_flow_node, rb_node);
125 
126 private(B) struct bpf_spin_lock fq_new_flows_lock;
127 private(B) struct bpf_list_head fq_new_flows __contains(fq_flow_node, list_node);
128 
129 private(C) struct bpf_spin_lock fq_old_flows_lock;
130 private(C) struct bpf_list_head fq_old_flows __contains(fq_flow_node, list_node);
131 
132 private(D) struct fq_bpf_data q;
133 
134 /* Wrapper for bpf_kptr_xchg that expects NULL dst */
bpf_kptr_xchg_back(void * map_val,void * ptr)135 static void bpf_kptr_xchg_back(void *map_val, void *ptr)
136 {
137 	void *ret;
138 
139 	ret = bpf_kptr_xchg(map_val, ptr);
140 	if (ret)
141 		bpf_obj_drop(ret);
142 }
143 
skbn_tstamp_less(struct bpf_rb_node * a,const struct bpf_rb_node * b)144 static bool skbn_tstamp_less(struct bpf_rb_node *a, const struct bpf_rb_node *b)
145 {
146 	struct skb_node *skbn_a;
147 	struct skb_node *skbn_b;
148 
149 	skbn_a = container_of(a, struct skb_node, node);
150 	skbn_b = container_of(b, struct skb_node, node);
151 
152 	return skbn_a->tstamp < skbn_b->tstamp;
153 }
154 
fn_time_next_packet_less(struct bpf_rb_node * a,const struct bpf_rb_node * b)155 static bool fn_time_next_packet_less(struct bpf_rb_node *a, const struct bpf_rb_node *b)
156 {
157 	struct fq_flow_node *flow_a;
158 	struct fq_flow_node *flow_b;
159 
160 	flow_a = container_of(a, struct fq_flow_node, rb_node);
161 	flow_b = container_of(b, struct fq_flow_node, rb_node);
162 
163 	return flow_a->time_next_packet < flow_b->time_next_packet;
164 }
165 
166 static void
fq_flows_add_head(struct bpf_list_head * head,struct bpf_spin_lock * lock,struct fq_flow_node * flow,u32 * flow_cnt)167 fq_flows_add_head(struct bpf_list_head *head, struct bpf_spin_lock *lock,
168 		  struct fq_flow_node *flow, u32 *flow_cnt)
169 {
170 	bpf_spin_lock(lock);
171 	bpf_list_push_front(head, &flow->list_node);
172 	bpf_spin_unlock(lock);
173 	*flow_cnt += 1;
174 }
175 
176 static void
fq_flows_add_tail(struct bpf_list_head * head,struct bpf_spin_lock * lock,struct fq_flow_node * flow,u32 * flow_cnt)177 fq_flows_add_tail(struct bpf_list_head *head, struct bpf_spin_lock *lock,
178 		  struct fq_flow_node *flow, u32 *flow_cnt)
179 {
180 	bpf_spin_lock(lock);
181 	bpf_list_push_back(head, &flow->list_node);
182 	bpf_spin_unlock(lock);
183 	*flow_cnt += 1;
184 }
185 
186 static void
fq_flows_remove_front(struct bpf_list_head * head,struct bpf_spin_lock * lock,struct bpf_list_node ** node,u32 * flow_cnt)187 fq_flows_remove_front(struct bpf_list_head *head, struct bpf_spin_lock *lock,
188 		      struct bpf_list_node **node, u32 *flow_cnt)
189 {
190 	bpf_spin_lock(lock);
191 	*node = bpf_list_pop_front(head);
192 	bpf_spin_unlock(lock);
193 	*flow_cnt -= 1;
194 }
195 
196 static bool
fq_flows_is_empty(struct bpf_list_head * head,struct bpf_spin_lock * lock)197 fq_flows_is_empty(struct bpf_list_head *head, struct bpf_spin_lock *lock)
198 {
199 	struct bpf_list_node *node;
200 
201 	bpf_spin_lock(lock);
202 	node = bpf_list_pop_front(head);
203 	if (node) {
204 		bpf_list_push_front(head, node);
205 		bpf_spin_unlock(lock);
206 		return false;
207 	}
208 	bpf_spin_unlock(lock);
209 
210 	return true;
211 }
212 
213 /* flow->age is used to denote the state of the flow (not-detached, detached, throttled)
214  * as well as the timestamp when the flow is detached.
215  *
216  * 0: not-detached
217  * 1 - (~0ULL-1): detached
218  * ~0ULL: throttled
219  */
fq_flow_set_detached(struct fq_flow_node * flow)220 static void fq_flow_set_detached(struct fq_flow_node *flow)
221 {
222 	flow->age = bpf_jiffies64();
223 }
224 
fq_flow_is_detached(struct fq_flow_node * flow)225 static bool fq_flow_is_detached(struct fq_flow_node *flow)
226 {
227 	return flow->age != 0 && flow->age != ~0ULL;
228 }
229 
sk_listener(struct sock * sk)230 static bool sk_listener(struct sock *sk)
231 {
232 	return (1 << sk->__sk_common.skc_state) & (TCPF_LISTEN | TCPF_NEW_SYN_RECV);
233 }
234 
235 static void fq_gc(void);
236 
fq_new_flow(void * flow_map,struct fq_stashed_flow ** sflow,u64 hash)237 static int fq_new_flow(void *flow_map, struct fq_stashed_flow **sflow, u64 hash)
238 {
239 	struct fq_stashed_flow tmp = {};
240 	struct fq_flow_node *flow;
241 	int ret;
242 
243 	flow = bpf_obj_new(typeof(*flow));
244 	if (!flow)
245 		return -ENOMEM;
246 
247 	flow->credit = q.initial_quantum,
248 	flow->qlen = 0,
249 	flow->age = 1,
250 	flow->time_next_packet = 0,
251 
252 	ret = bpf_map_update_elem(flow_map, &hash, &tmp, 0);
253 	if (ret == -ENOMEM || ret == -E2BIG) {
254 		fq_gc();
255 		bpf_map_update_elem(&fq_nonprio_flows, &hash, &tmp, 0);
256 	}
257 
258 	*sflow = bpf_map_lookup_elem(flow_map, &hash);
259 	if (!*sflow) {
260 		bpf_obj_drop(flow);
261 		return -ENOMEM;
262 	}
263 
264 	bpf_kptr_xchg_back(&(*sflow)->flow, flow);
265 	return 0;
266 }
267 
268 static int
fq_classify(struct sk_buff * skb,struct fq_stashed_flow ** sflow)269 fq_classify(struct sk_buff *skb, struct fq_stashed_flow **sflow)
270 {
271 	struct sock *sk = skb->sk;
272 	int ret = CLS_RET_NONPRIO;
273 	u64 hash = 0;
274 
275 	if ((skb->priority & TC_PRIO_MAX) == TC_PRIO_CONTROL) {
276 		*sflow = bpf_map_lookup_elem(&fq_prio_flows, &hash);
277 		ret = CLS_RET_PRIO;
278 	} else {
279 		if (!sk || sk_listener(sk)) {
280 			hash = bpf_skb_get_hash(skb) & q.orphan_mask;
281 			/* Avoid collision with an existing flow hash, which
282 			 * only uses the lower 32 bits of hash, by setting the
283 			 * upper half of hash to 1.
284 			 */
285 			hash |= (1ULL << 32);
286 		} else if (sk->__sk_common.skc_state == TCP_CLOSE) {
287 			hash = bpf_skb_get_hash(skb) & q.orphan_mask;
288 			hash |= (1ULL << 32);
289 		} else {
290 			hash = sk->__sk_common.skc_hash;
291 		}
292 		*sflow = bpf_map_lookup_elem(&fq_nonprio_flows, &hash);
293 	}
294 
295 	if (!*sflow)
296 		ret = fq_new_flow(&fq_nonprio_flows, sflow, hash) < 0 ?
297 		      CLS_RET_ERR : CLS_RET_NONPRIO;
298 
299 	return ret;
300 }
301 
fq_packet_beyond_horizon(struct sk_buff * skb)302 static bool fq_packet_beyond_horizon(struct sk_buff *skb)
303 {
304 	return (s64)skb->tstamp > (s64)(q.ktime_cache + q.horizon);
305 }
306 
307 SEC("struct_ops/bpf_fq_enqueue")
BPF_PROG(bpf_fq_enqueue,struct sk_buff * skb,struct Qdisc * sch,struct bpf_sk_buff_ptr * to_free)308 int BPF_PROG(bpf_fq_enqueue, struct sk_buff *skb, struct Qdisc *sch,
309 	     struct bpf_sk_buff_ptr *to_free)
310 {
311 	struct fq_flow_node *flow = NULL, *flow_copy;
312 	struct fq_stashed_flow *sflow;
313 	u64 time_to_send, jiffies;
314 	struct skb_node *skbn;
315 	int ret;
316 
317 	if (sch->q.qlen >= sch->limit)
318 		goto drop;
319 
320 	if (!skb->tstamp) {
321 		time_to_send = q.ktime_cache = bpf_ktime_get_ns();
322 	} else {
323 		if (fq_packet_beyond_horizon(skb)) {
324 			q.ktime_cache = bpf_ktime_get_ns();
325 			if (fq_packet_beyond_horizon(skb)) {
326 				if (q.horizon_drop)
327 					goto drop;
328 
329 				skb->tstamp = q.ktime_cache + q.horizon;
330 			}
331 		}
332 		time_to_send = skb->tstamp;
333 	}
334 
335 	ret = fq_classify(skb, &sflow);
336 	if (ret == CLS_RET_ERR)
337 		goto drop;
338 
339 	flow = bpf_kptr_xchg(&sflow->flow, flow);
340 	if (!flow)
341 		goto drop;
342 
343 	if (ret == CLS_RET_NONPRIO) {
344 		if (flow->qlen >= q.flow_plimit) {
345 			bpf_kptr_xchg_back(&sflow->flow, flow);
346 			goto drop;
347 		}
348 
349 		if (fq_flow_is_detached(flow)) {
350 			flow_copy = bpf_refcount_acquire(flow);
351 
352 			jiffies = bpf_jiffies64();
353 			if ((s64)(jiffies - (flow_copy->age + q.flow_refill_delay)) > 0) {
354 				if (flow_copy->credit < q.quantum)
355 					flow_copy->credit = q.quantum;
356 			}
357 			flow_copy->age = 0;
358 			fq_flows_add_tail(&fq_new_flows, &fq_new_flows_lock, flow_copy,
359 					  &q.new_flow_cnt);
360 		}
361 	}
362 
363 	skbn = bpf_obj_new(typeof(*skbn));
364 	if (!skbn) {
365 		bpf_kptr_xchg_back(&sflow->flow, flow);
366 		goto drop;
367 	}
368 
369 	skbn->tstamp = skb->tstamp = time_to_send;
370 
371 	sch->qstats.backlog += qdisc_pkt_len(skb);
372 
373 	skb = bpf_kptr_xchg(&skbn->skb, skb);
374 	if (skb)
375 		bpf_qdisc_skb_drop(skb, to_free);
376 
377 	bpf_spin_lock(&flow->lock);
378 	bpf_rbtree_add(&flow->queue, &skbn->node, skbn_tstamp_less);
379 	bpf_spin_unlock(&flow->lock);
380 
381 	flow->qlen++;
382 	bpf_kptr_xchg_back(&sflow->flow, flow);
383 
384 	sch->q.qlen++;
385 	return NET_XMIT_SUCCESS;
386 
387 drop:
388 	bpf_qdisc_skb_drop(skb, to_free);
389 	sch->qstats.drops++;
390 	return NET_XMIT_DROP;
391 }
392 
fq_unset_throttled_flows(u32 index,struct unset_throttled_flows_ctx * ctx)393 static int fq_unset_throttled_flows(u32 index, struct unset_throttled_flows_ctx *ctx)
394 {
395 	struct bpf_rb_node *node = NULL;
396 	struct fq_flow_node *flow;
397 
398 	bpf_spin_lock(&fq_delayed_lock);
399 
400 	node = bpf_rbtree_first(&fq_delayed);
401 	if (!node) {
402 		bpf_spin_unlock(&fq_delayed_lock);
403 		return 1;
404 	}
405 
406 	flow = container_of(node, struct fq_flow_node, rb_node);
407 	if (!ctx->unset_all && flow->time_next_packet > ctx->now) {
408 		q.time_next_delayed_flow = flow->time_next_packet;
409 		bpf_spin_unlock(&fq_delayed_lock);
410 		return 1;
411 	}
412 
413 	node = bpf_rbtree_remove(&fq_delayed, &flow->rb_node);
414 
415 	bpf_spin_unlock(&fq_delayed_lock);
416 
417 	if (!node)
418 		return 1;
419 
420 	flow = container_of(node, struct fq_flow_node, rb_node);
421 	flow->age = 0;
422 	fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow, &q.old_flow_cnt);
423 
424 	return 0;
425 }
426 
fq_flow_set_throttled(struct fq_flow_node * flow)427 static void fq_flow_set_throttled(struct fq_flow_node *flow)
428 {
429 	flow->age = ~0ULL;
430 
431 	if (q.time_next_delayed_flow > flow->time_next_packet)
432 		q.time_next_delayed_flow = flow->time_next_packet;
433 
434 	bpf_spin_lock(&fq_delayed_lock);
435 	bpf_rbtree_add(&fq_delayed, &flow->rb_node, fn_time_next_packet_less);
436 	bpf_spin_unlock(&fq_delayed_lock);
437 }
438 
fq_check_throttled(u64 now)439 static void fq_check_throttled(u64 now)
440 {
441 	struct unset_throttled_flows_ctx ctx = {
442 		.unset_all = false,
443 		.now = now,
444 	};
445 	unsigned long sample;
446 
447 	if (q.time_next_delayed_flow > now)
448 		return;
449 
450 	sample = (unsigned long)(now - q.time_next_delayed_flow);
451 	q.unthrottle_latency_ns -= q.unthrottle_latency_ns >> 3;
452 	q.unthrottle_latency_ns += sample >> 3;
453 
454 	q.time_next_delayed_flow = ~0ULL;
455 	bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &ctx, 0);
456 }
457 
458 static struct sk_buff*
fq_dequeue_nonprio_flows(u32 index,struct dequeue_nonprio_ctx * ctx)459 fq_dequeue_nonprio_flows(u32 index, struct dequeue_nonprio_ctx *ctx)
460 {
461 	u64 time_next_packet, time_to_send;
462 	struct bpf_rb_node *rb_node;
463 	struct sk_buff *skb = NULL;
464 	struct bpf_list_head *head;
465 	struct bpf_list_node *node;
466 	struct bpf_spin_lock *lock;
467 	struct fq_flow_node *flow;
468 	struct skb_node *skbn;
469 	bool is_empty;
470 	u32 *cnt;
471 
472 	if (q.new_flow_cnt) {
473 		head = &fq_new_flows;
474 		lock = &fq_new_flows_lock;
475 		cnt = &q.new_flow_cnt;
476 	} else if (q.old_flow_cnt) {
477 		head = &fq_old_flows;
478 		lock = &fq_old_flows_lock;
479 		cnt = &q.old_flow_cnt;
480 	} else {
481 		if (q.time_next_delayed_flow != ~0ULL)
482 			ctx->expire = q.time_next_delayed_flow;
483 		goto break_loop;
484 	}
485 
486 	fq_flows_remove_front(head, lock, &node, cnt);
487 	if (!node)
488 		goto break_loop;
489 
490 	flow = container_of(node, struct fq_flow_node, list_node);
491 	if (flow->credit <= 0) {
492 		flow->credit += q.quantum;
493 		fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow, &q.old_flow_cnt);
494 		return NULL;
495 	}
496 
497 	bpf_spin_lock(&flow->lock);
498 	rb_node = bpf_rbtree_first(&flow->queue);
499 	if (!rb_node) {
500 		bpf_spin_unlock(&flow->lock);
501 		is_empty = fq_flows_is_empty(&fq_old_flows, &fq_old_flows_lock);
502 		if (head == &fq_new_flows && !is_empty) {
503 			fq_flows_add_tail(&fq_old_flows, &fq_old_flows_lock, flow, &q.old_flow_cnt);
504 		} else {
505 			fq_flow_set_detached(flow);
506 			bpf_obj_drop(flow);
507 		}
508 		return NULL;
509 	}
510 
511 	skbn = container_of(rb_node, struct skb_node, node);
512 	time_to_send = skbn->tstamp;
513 
514 	time_next_packet = (time_to_send > flow->time_next_packet) ?
515 		time_to_send : flow->time_next_packet;
516 	if (ctx->now < time_next_packet) {
517 		bpf_spin_unlock(&flow->lock);
518 		flow->time_next_packet = time_next_packet;
519 		fq_flow_set_throttled(flow);
520 		return NULL;
521 	}
522 
523 	rb_node = bpf_rbtree_remove(&flow->queue, rb_node);
524 	bpf_spin_unlock(&flow->lock);
525 
526 	if (!rb_node)
527 		goto add_flow_and_break;
528 
529 	skbn = container_of(rb_node, struct skb_node, node);
530 	skb = bpf_kptr_xchg(&skbn->skb, skb);
531 	bpf_obj_drop(skbn);
532 
533 	if (!skb)
534 		goto add_flow_and_break;
535 
536 	flow->credit -= qdisc_skb_cb(skb)->pkt_len;
537 	flow->qlen--;
538 
539 add_flow_and_break:
540 	fq_flows_add_head(head, lock, flow, cnt);
541 
542 break_loop:
543 	ctx->stop_iter = true;
544 	return skb;
545 }
546 
fq_dequeue_prio(void)547 static struct sk_buff *fq_dequeue_prio(void)
548 {
549 	struct fq_flow_node *flow = NULL;
550 	struct fq_stashed_flow *sflow;
551 	struct bpf_rb_node *rb_node;
552 	struct sk_buff *skb = NULL;
553 	struct skb_node *skbn;
554 	u64 hash = 0;
555 
556 	sflow = bpf_map_lookup_elem(&fq_prio_flows, &hash);
557 	if (!sflow)
558 		return NULL;
559 
560 	flow = bpf_kptr_xchg(&sflow->flow, flow);
561 	if (!flow)
562 		return NULL;
563 
564 	bpf_spin_lock(&flow->lock);
565 	rb_node = bpf_rbtree_first(&flow->queue);
566 	if (!rb_node) {
567 		bpf_spin_unlock(&flow->lock);
568 		goto out;
569 	}
570 
571 	skbn = container_of(rb_node, struct skb_node, node);
572 	rb_node = bpf_rbtree_remove(&flow->queue, &skbn->node);
573 	bpf_spin_unlock(&flow->lock);
574 
575 	if (!rb_node)
576 		goto out;
577 
578 	skbn = container_of(rb_node, struct skb_node, node);
579 	skb = bpf_kptr_xchg(&skbn->skb, skb);
580 	bpf_obj_drop(skbn);
581 
582 out:
583 	bpf_kptr_xchg_back(&sflow->flow, flow);
584 
585 	return skb;
586 }
587 
588 SEC("struct_ops/bpf_fq_dequeue")
BPF_PROG(bpf_fq_dequeue,struct Qdisc * sch)589 struct sk_buff *BPF_PROG(bpf_fq_dequeue, struct Qdisc *sch)
590 {
591 	struct dequeue_nonprio_ctx cb_ctx = {};
592 	struct sk_buff *skb = NULL;
593 	int i;
594 
595 	if (!sch->q.qlen)
596 		goto out;
597 
598 	skb = fq_dequeue_prio();
599 	if (skb)
600 		goto dequeue;
601 
602 	q.ktime_cache = cb_ctx.now = bpf_ktime_get_ns();
603 	fq_check_throttled(q.ktime_cache);
604 	bpf_for(i, 0, sch->limit) {
605 		skb = fq_dequeue_nonprio_flows(i, &cb_ctx);
606 		if (cb_ctx.stop_iter)
607 			break;
608 	};
609 
610 	if (skb) {
611 dequeue:
612 		sch->q.qlen--;
613 		sch->qstats.backlog -= qdisc_pkt_len(skb);
614 		bpf_qdisc_bstats_update(sch, skb);
615 		return skb;
616 	}
617 
618 	if (cb_ctx.expire)
619 		bpf_qdisc_watchdog_schedule(sch, cb_ctx.expire, q.timer_slack);
620 out:
621 	return NULL;
622 }
623 
fq_remove_flows_in_list(u32 index,void * ctx)624 static int fq_remove_flows_in_list(u32 index, void *ctx)
625 {
626 	struct bpf_list_node *node;
627 	struct fq_flow_node *flow;
628 
629 	bpf_spin_lock(&fq_new_flows_lock);
630 	node = bpf_list_pop_front(&fq_new_flows);
631 	bpf_spin_unlock(&fq_new_flows_lock);
632 	if (!node) {
633 		bpf_spin_lock(&fq_old_flows_lock);
634 		node = bpf_list_pop_front(&fq_old_flows);
635 		bpf_spin_unlock(&fq_old_flows_lock);
636 		if (!node)
637 			return 1;
638 	}
639 
640 	flow = container_of(node, struct fq_flow_node, list_node);
641 	bpf_obj_drop(flow);
642 
643 	return 0;
644 }
645 
646 extern unsigned CONFIG_HZ __kconfig;
647 
648 /* limit number of collected flows per round */
649 #define FQ_GC_MAX 8
650 #define FQ_GC_AGE (3*CONFIG_HZ)
651 
fq_gc_candidate(struct fq_flow_node * flow)652 static bool fq_gc_candidate(struct fq_flow_node *flow)
653 {
654 	u64 jiffies = bpf_jiffies64();
655 
656 	return fq_flow_is_detached(flow) &&
657 	       ((s64)(jiffies - (flow->age + FQ_GC_AGE)) > 0);
658 }
659 
660 static int
fq_remove_flows(struct bpf_map * flow_map,u64 * hash,struct fq_stashed_flow * sflow,struct remove_flows_ctx * ctx)661 fq_remove_flows(struct bpf_map *flow_map, u64 *hash,
662 		struct fq_stashed_flow *sflow, struct remove_flows_ctx *ctx)
663 {
664 	if (sflow->flow &&
665 	    (!ctx->gc_only || fq_gc_candidate(sflow->flow))) {
666 		bpf_map_delete_elem(flow_map, hash);
667 		ctx->reset_cnt++;
668 	}
669 
670 	return ctx->reset_cnt < ctx->reset_max ? 0 : 1;
671 }
672 
fq_gc(void)673 static void fq_gc(void)
674 {
675 	struct remove_flows_ctx cb_ctx = {
676 		.gc_only = true,
677 		.reset_cnt = 0,
678 		.reset_max = FQ_GC_MAX,
679 	};
680 
681 	bpf_for_each_map_elem(&fq_nonprio_flows, fq_remove_flows, &cb_ctx, 0);
682 }
683 
684 SEC("struct_ops/bpf_fq_reset")
BPF_PROG(bpf_fq_reset,struct Qdisc * sch)685 void BPF_PROG(bpf_fq_reset, struct Qdisc *sch)
686 {
687 	struct unset_throttled_flows_ctx utf_ctx = {
688 		.unset_all = true,
689 	};
690 	struct remove_flows_ctx rf_ctx = {
691 		.gc_only = false,
692 		.reset_cnt = 0,
693 		.reset_max = NUM_QUEUE,
694 	};
695 	struct fq_stashed_flow *sflow;
696 	u64 hash = 0;
697 
698 	sch->q.qlen = 0;
699 	sch->qstats.backlog = 0;
700 
701 	bpf_for_each_map_elem(&fq_nonprio_flows, fq_remove_flows, &rf_ctx, 0);
702 
703 	rf_ctx.reset_cnt = 0;
704 	bpf_for_each_map_elem(&fq_prio_flows, fq_remove_flows, &rf_ctx, 0);
705 	fq_new_flow(&fq_prio_flows, &sflow, hash);
706 
707 	bpf_loop(NUM_QUEUE, fq_remove_flows_in_list, NULL, 0);
708 	q.new_flow_cnt = 0;
709 	q.old_flow_cnt = 0;
710 
711 	bpf_loop(NUM_QUEUE, fq_unset_throttled_flows, &utf_ctx, 0);
712 }
713 
714 SEC("struct_ops/bpf_fq_init")
BPF_PROG(bpf_fq_init,struct Qdisc * sch,struct nlattr * opt,struct netlink_ext_ack * extack)715 int BPF_PROG(bpf_fq_init, struct Qdisc *sch, struct nlattr *opt,
716 	     struct netlink_ext_ack *extack)
717 {
718 	struct net_device *dev = sch->dev_queue->dev;
719 	u32 psched_mtu = dev->mtu + dev->hard_header_len;
720 	struct fq_stashed_flow *sflow;
721 	u64 hash = 0;
722 
723 	if (fq_new_flow(&fq_prio_flows, &sflow, hash) < 0)
724 		return -ENOMEM;
725 
726 	sch->limit = 10000;
727 	q.initial_quantum = 10 * psched_mtu;
728 	q.quantum = 2 * psched_mtu;
729 	q.flow_refill_delay = 40;
730 	q.flow_plimit = 100;
731 	q.horizon = 10ULL * NSEC_PER_SEC;
732 	q.horizon_drop = 1;
733 	q.orphan_mask = 1024 - 1;
734 	q.timer_slack = 10 * NSEC_PER_USEC;
735 	q.time_next_delayed_flow = ~0ULL;
736 	q.unthrottle_latency_ns = 0ULL;
737 	q.new_flow_cnt = 0;
738 	q.old_flow_cnt = 0;
739 
740 	return 0;
741 }
742 
743 SEC("struct_ops")
BPF_PROG(bpf_fq_destroy,struct Qdisc * sch)744 void BPF_PROG(bpf_fq_destroy, struct Qdisc *sch)
745 {
746 }
747 
748 SEC(".struct_ops")
749 struct Qdisc_ops fq = {
750 	.enqueue   = (void *)bpf_fq_enqueue,
751 	.dequeue   = (void *)bpf_fq_dequeue,
752 	.reset     = (void *)bpf_fq_reset,
753 	.init      = (void *)bpf_fq_init,
754 	.destroy   = (void *)bpf_fq_destroy,
755 	.id        = "bpf_fq",
756 };
757