1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Copyright (c) 2025 NVIDIA Corporation.
4 */
5 #define _GNU_SOURCE
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <time.h>
10 #include <bpf/bpf.h>
11 #include <scx/common.h>
12 #include <sys/wait.h>
13 #include <sched.h>
14 #include <pthread.h>
15 #include "scx_test.h"
16 #include "dequeue.bpf.skel.h"
17
18 #define NUM_WORKERS 8
19 #define AFFINITY_HAMMER_MS 500
20
21 /*
22 * Worker function that creates enqueue/dequeue events via CPU work and
23 * sleep.
24 */
worker_fn(int id)25 static void worker_fn(int id)
26 {
27 int i;
28 volatile int sum = 0;
29
30 for (i = 0; i < 1000; i++) {
31 volatile int j;
32
33 /* Do some work to trigger scheduling events */
34 for (j = 0; j < 10000; j++)
35 sum += j;
36
37 /* Sleep to trigger dequeue */
38 usleep(1000 + (id * 100));
39 }
40
41 exit(0);
42 }
43
44 /*
45 * This thread changes workers' affinity from outside so that some changes
46 * hit tasks while they are still in the scheduler's queue and trigger
47 * property-change dequeues.
48 */
affinity_hammer_fn(void * arg)49 static void *affinity_hammer_fn(void *arg)
50 {
51 pid_t *pids = arg;
52 cpu_set_t cpuset;
53 int i = 0, n = NUM_WORKERS;
54 struct timespec start, now;
55
56 clock_gettime(CLOCK_MONOTONIC, &start);
57 while (1) {
58 int w = i % n;
59 int cpu = (i / n) % 4;
60
61 CPU_ZERO(&cpuset);
62 CPU_SET(cpu, &cpuset);
63 sched_setaffinity(pids[w], sizeof(cpuset), &cpuset);
64 i++;
65
66 /* Check elapsed time every 256 iterations to limit gettime cost */
67 if ((i & 255) == 0) {
68 long long elapsed_ms;
69
70 clock_gettime(CLOCK_MONOTONIC, &now);
71 elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL +
72 (now.tv_nsec - start.tv_nsec) / 1000000;
73 if (elapsed_ms >= AFFINITY_HAMMER_MS)
74 break;
75 }
76 }
77 return NULL;
78 }
79
run_scenario(struct dequeue * skel,u32 scenario,const char * scenario_name)80 static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario,
81 const char *scenario_name)
82 {
83 struct bpf_link *link;
84 pid_t pids[NUM_WORKERS];
85 pthread_t hammer;
86
87 int i, status;
88 u64 enq_start, deq_start,
89 dispatch_deq_start, change_deq_start, bpf_queue_full_start;
90 u64 enq_delta, deq_delta,
91 dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta;
92
93 /* Set the test scenario */
94 skel->bss->test_scenario = scenario;
95
96 /* Record starting counts */
97 enq_start = skel->bss->enqueue_cnt;
98 deq_start = skel->bss->dequeue_cnt;
99 dispatch_deq_start = skel->bss->dispatch_dequeue_cnt;
100 change_deq_start = skel->bss->change_dequeue_cnt;
101 bpf_queue_full_start = skel->bss->bpf_queue_full;
102
103 link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops);
104 SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name);
105
106 /* Fork worker processes to generate enqueue/dequeue events */
107 for (i = 0; i < NUM_WORKERS; i++) {
108 pids[i] = fork();
109 SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i);
110
111 if (pids[i] == 0) {
112 worker_fn(i);
113 /* Should not reach here */
114 exit(1);
115 }
116 }
117
118 /*
119 * Run an "affinity hammer" so that some property changes hit tasks
120 * while they are still in BPF custody (e.g., in user DSQ or BPF
121 * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues.
122 */
123 SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0,
124 "Failed to create affinity hammer thread");
125 pthread_join(hammer, NULL);
126
127 /* Wait for all workers to complete */
128 for (i = 0; i < NUM_WORKERS; i++) {
129 SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i],
130 "Failed to wait for worker %d", i);
131 SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status);
132 }
133
134 bpf_link__destroy(link);
135
136 SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG));
137
138 /* Calculate deltas */
139 enq_delta = skel->bss->enqueue_cnt - enq_start;
140 deq_delta = skel->bss->dequeue_cnt - deq_start;
141 dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start;
142 change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start;
143 bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start;
144
145 printf("%s:\n", scenario_name);
146 printf(" enqueues: %lu\n", (unsigned long)enq_delta);
147 printf(" dequeues: %lu (dispatch: %lu, property_change: %lu)\n",
148 (unsigned long)deq_delta,
149 (unsigned long)dispatch_deq_delta,
150 (unsigned long)change_deq_delta);
151 printf(" BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta);
152
153 /*
154 * Validate enqueue/dequeue lifecycle tracking.
155 *
156 * For scenarios 0, 1, 3, 4 (local and global DSQs from
157 * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues
158 * should be 0 because tasks bypass the BPF scheduler entirely:
159 * tasks never enter BPF scheduler's custody.
160 *
161 * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect
162 * both enqueues and dequeues.
163 *
164 * The BPF code does strict state machine validation with
165 * scx_bpf_error() to ensure the workflow semantics are correct.
166 *
167 * If we reach this point without errors, the semantics are
168 * validated correctly.
169 */
170 if (scenario == 0 || scenario == 1 ||
171 scenario == 3 || scenario == 4) {
172 /* Tasks bypass BPF scheduler completely */
173 SCX_EQ(enq_delta, 0);
174 SCX_EQ(deq_delta, 0);
175 SCX_EQ(dispatch_deq_delta, 0);
176 SCX_EQ(change_deq_delta, 0);
177 } else {
178 /*
179 * User DSQ from ops.enqueue() or ops.select_cpu(): tasks
180 * enter BPF scheduler's custody.
181 *
182 * Also validate 1:1 enqueue/dequeue pairing.
183 */
184 SCX_GT(enq_delta, 0);
185 SCX_GT(deq_delta, 0);
186 SCX_EQ(enq_delta, deq_delta);
187 }
188
189 return SCX_TEST_PASS;
190 }
191
setup(void ** ctx)192 static enum scx_test_status setup(void **ctx)
193 {
194 struct dequeue *skel;
195
196 skel = dequeue__open();
197 SCX_FAIL_IF(!skel, "Failed to open skel");
198 SCX_ENUM_INIT(skel);
199 SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel");
200
201 *ctx = skel;
202
203 return SCX_TEST_PASS;
204 }
205
run(void * ctx)206 static enum scx_test_status run(void *ctx)
207 {
208 struct dequeue *skel = ctx;
209 enum scx_test_status status;
210
211 status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()");
212 if (status != SCX_TEST_PASS)
213 return status;
214
215 status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()");
216 if (status != SCX_TEST_PASS)
217 return status;
218
219 status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()");
220 if (status != SCX_TEST_PASS)
221 return status;
222
223 status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()");
224 if (status != SCX_TEST_PASS)
225 return status;
226
227 status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()");
228 if (status != SCX_TEST_PASS)
229 return status;
230
231 status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()");
232 if (status != SCX_TEST_PASS)
233 return status;
234
235 status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()");
236 if (status != SCX_TEST_PASS)
237 return status;
238
239 printf("\n=== Summary ===\n");
240 printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt);
241 printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt);
242 printf(" Dispatch dequeues: %lu (no flag, normal workflow)\n",
243 (unsigned long)skel->bss->dispatch_dequeue_cnt);
244 printf(" Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n",
245 (unsigned long)skel->bss->change_dequeue_cnt);
246 printf(" BPF queue full: %lu\n",
247 (unsigned long)skel->bss->bpf_queue_full);
248 printf("\nAll scenarios passed - no state machine violations detected\n");
249 printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n");
250 printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n");
251 printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n");
252 printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n");
253 printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n");
254 printf("-> Validated: No duplicate enqueues or invalid state transitions\n");
255
256 return SCX_TEST_PASS;
257 }
258
cleanup(void * ctx)259 static void cleanup(void *ctx)
260 {
261 struct dequeue *skel = ctx;
262
263 dequeue__destroy(skel);
264 }
265
266 struct scx_test dequeue_test = {
267 .name = "dequeue",
268 .description = "Verify ops.dequeue() semantics",
269 .setup = setup,
270 .run = run,
271 .cleanup = cleanup,
272 };
273
274 REGISTER_SCX_TEST(&dequeue_test)
275