xref: /linux/tools/testing/selftests/sched_ext/dequeue.c (revision 5bdb4078e1efba9650c03753616866192d680718)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2025 NVIDIA Corporation.
4  */
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <time.h>
10 #include <bpf/bpf.h>
11 #include <scx/common.h>
12 #include <sys/wait.h>
13 #include <sched.h>
14 #include <pthread.h>
15 #include "scx_test.h"
16 #include "dequeue.bpf.skel.h"
17 
18 #define NUM_WORKERS 8
19 #define AFFINITY_HAMMER_MS 500
20 
21 /*
22  * Worker function that creates enqueue/dequeue events via CPU work and
23  * sleep.
24  */
worker_fn(int id)25 static void worker_fn(int id)
26 {
27 	int i;
28 	volatile int sum = 0;
29 
30 	for (i = 0; i < 1000; i++) {
31 		volatile int j;
32 
33 		/* Do some work to trigger scheduling events */
34 		for (j = 0; j < 10000; j++)
35 			sum += j;
36 
37 		/* Sleep to trigger dequeue */
38 		usleep(1000 + (id * 100));
39 	}
40 
41 	exit(0);
42 }
43 
44 /*
45  * This thread changes workers' affinity from outside so that some changes
46  * hit tasks while they are still in the scheduler's queue and trigger
47  * property-change dequeues.
48  */
affinity_hammer_fn(void * arg)49 static void *affinity_hammer_fn(void *arg)
50 {
51 	pid_t *pids = arg;
52 	cpu_set_t cpuset;
53 	int i = 0, n = NUM_WORKERS;
54 	struct timespec start, now;
55 
56 	clock_gettime(CLOCK_MONOTONIC, &start);
57 	while (1) {
58 		int w = i % n;
59 		int cpu = (i / n) % 4;
60 
61 		CPU_ZERO(&cpuset);
62 		CPU_SET(cpu, &cpuset);
63 		sched_setaffinity(pids[w], sizeof(cpuset), &cpuset);
64 		i++;
65 
66 		/* Check elapsed time every 256 iterations to limit gettime cost */
67 		if ((i & 255) == 0) {
68 			long long elapsed_ms;
69 
70 			clock_gettime(CLOCK_MONOTONIC, &now);
71 			elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL +
72 				     (now.tv_nsec - start.tv_nsec) / 1000000;
73 			if (elapsed_ms >= AFFINITY_HAMMER_MS)
74 				break;
75 		}
76 	}
77 	return NULL;
78 }
79 
run_scenario(struct dequeue * skel,u32 scenario,const char * scenario_name)80 static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario,
81 					 const char *scenario_name)
82 {
83 	struct bpf_link *link;
84 	pid_t pids[NUM_WORKERS];
85 	pthread_t hammer;
86 
87 	int i, status;
88 	u64 enq_start, deq_start,
89 	    dispatch_deq_start, change_deq_start, bpf_queue_full_start;
90 	u64 enq_delta, deq_delta,
91 	    dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta;
92 
93 	/* Set the test scenario */
94 	skel->bss->test_scenario = scenario;
95 
96 	/* Record starting counts */
97 	enq_start = skel->bss->enqueue_cnt;
98 	deq_start = skel->bss->dequeue_cnt;
99 	dispatch_deq_start = skel->bss->dispatch_dequeue_cnt;
100 	change_deq_start = skel->bss->change_dequeue_cnt;
101 	bpf_queue_full_start = skel->bss->bpf_queue_full;
102 
103 	link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops);
104 	SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name);
105 
106 	/* Fork worker processes to generate enqueue/dequeue events */
107 	for (i = 0; i < NUM_WORKERS; i++) {
108 		pids[i] = fork();
109 		SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i);
110 
111 		if (pids[i] == 0) {
112 			worker_fn(i);
113 			/* Should not reach here */
114 			exit(1);
115 		}
116 	}
117 
118 	/*
119 	 * Run an "affinity hammer" so that some property changes hit tasks
120 	 * while they are still in BPF custody (e.g., in user DSQ or BPF
121 	 * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues.
122 	 */
123 	SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0,
124 		    "Failed to create affinity hammer thread");
125 	pthread_join(hammer, NULL);
126 
127 	/* Wait for all workers to complete */
128 	for (i = 0; i < NUM_WORKERS; i++) {
129 		SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i],
130 			    "Failed to wait for worker %d", i);
131 		SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status);
132 	}
133 
134 	bpf_link__destroy(link);
135 
136 	SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG));
137 
138 	/* Calculate deltas */
139 	enq_delta = skel->bss->enqueue_cnt - enq_start;
140 	deq_delta = skel->bss->dequeue_cnt - deq_start;
141 	dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start;
142 	change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start;
143 	bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start;
144 
145 	printf("%s:\n", scenario_name);
146 	printf("  enqueues: %lu\n", (unsigned long)enq_delta);
147 	printf("  dequeues: %lu (dispatch: %lu, property_change: %lu)\n",
148 	       (unsigned long)deq_delta,
149 	       (unsigned long)dispatch_deq_delta,
150 	       (unsigned long)change_deq_delta);
151 	printf("  BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta);
152 
153 	/*
154 	 * Validate enqueue/dequeue lifecycle tracking.
155 	 *
156 	 * For scenarios 0, 1, 3, 4 (local and global DSQs from
157 	 * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues
158 	 * should be 0 because tasks bypass the BPF scheduler entirely:
159 	 * tasks never enter BPF scheduler's custody.
160 	 *
161 	 * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect
162 	 * both enqueues and dequeues.
163 	 *
164 	 * The BPF code does strict state machine validation with
165 	 * scx_bpf_error() to ensure the workflow semantics are correct.
166 	 *
167 	 * If we reach this point without errors, the semantics are
168 	 * validated correctly.
169 	 */
170 	if (scenario == 0 || scenario == 1 ||
171 	    scenario == 3 || scenario == 4) {
172 		/* Tasks bypass BPF scheduler completely */
173 		SCX_EQ(enq_delta, 0);
174 		SCX_EQ(deq_delta, 0);
175 		SCX_EQ(dispatch_deq_delta, 0);
176 		SCX_EQ(change_deq_delta, 0);
177 	} else {
178 		/*
179 		 * User DSQ from ops.enqueue() or ops.select_cpu(): tasks
180 		 * enter BPF scheduler's custody.
181 		 *
182 		 * Also validate 1:1 enqueue/dequeue pairing.
183 		 */
184 		SCX_GT(enq_delta, 0);
185 		SCX_GT(deq_delta, 0);
186 		SCX_EQ(enq_delta, deq_delta);
187 	}
188 
189 	return SCX_TEST_PASS;
190 }
191 
setup(void ** ctx)192 static enum scx_test_status setup(void **ctx)
193 {
194 	struct dequeue *skel;
195 
196 	skel = dequeue__open();
197 	SCX_FAIL_IF(!skel, "Failed to open skel");
198 	SCX_ENUM_INIT(skel);
199 	SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel");
200 
201 	*ctx = skel;
202 
203 	return SCX_TEST_PASS;
204 }
205 
run(void * ctx)206 static enum scx_test_status run(void *ctx)
207 {
208 	struct dequeue *skel = ctx;
209 	enum scx_test_status status;
210 
211 	status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()");
212 	if (status != SCX_TEST_PASS)
213 		return status;
214 
215 	status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()");
216 	if (status != SCX_TEST_PASS)
217 		return status;
218 
219 	status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()");
220 	if (status != SCX_TEST_PASS)
221 		return status;
222 
223 	status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()");
224 	if (status != SCX_TEST_PASS)
225 		return status;
226 
227 	status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()");
228 	if (status != SCX_TEST_PASS)
229 		return status;
230 
231 	status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()");
232 	if (status != SCX_TEST_PASS)
233 		return status;
234 
235 	status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()");
236 	if (status != SCX_TEST_PASS)
237 		return status;
238 
239 	printf("\n=== Summary ===\n");
240 	printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt);
241 	printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt);
242 	printf("  Dispatch dequeues: %lu (no flag, normal workflow)\n",
243 	       (unsigned long)skel->bss->dispatch_dequeue_cnt);
244 	printf("  Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n",
245 	       (unsigned long)skel->bss->change_dequeue_cnt);
246 	printf("  BPF queue full: %lu\n",
247 	       (unsigned long)skel->bss->bpf_queue_full);
248 	printf("\nAll scenarios passed - no state machine violations detected\n");
249 	printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n");
250 	printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n");
251 	printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n");
252 	printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n");
253 	printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n");
254 	printf("-> Validated: No duplicate enqueues or invalid state transitions\n");
255 
256 	return SCX_TEST_PASS;
257 }
258 
cleanup(void * ctx)259 static void cleanup(void *ctx)
260 {
261 	struct dequeue *skel = ctx;
262 
263 	dequeue__destroy(skel);
264 }
265 
266 struct scx_test dequeue_test = {
267 	.name = "dequeue",
268 	.description = "Verify ops.dequeue() semantics",
269 	.setup = setup,
270 	.run = run,
271 	.cleanup = cleanup,
272 };
273 
274 REGISTER_SCX_TEST(&dequeue_test)
275