// SPDX-License-Identifier: GPL-2.0 /* * Copyright (c) 2025 NVIDIA Corporation. */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include "scx_test.h" #include "dequeue.bpf.skel.h" #define NUM_WORKERS 8 #define AFFINITY_HAMMER_MS 500 /* * Worker function that creates enqueue/dequeue events via CPU work and * sleep. */ static void worker_fn(int id) { int i; volatile int sum = 0; for (i = 0; i < 1000; i++) { volatile int j; /* Do some work to trigger scheduling events */ for (j = 0; j < 10000; j++) sum += j; /* Sleep to trigger dequeue */ usleep(1000 + (id * 100)); } exit(0); } /* * This thread changes workers' affinity from outside so that some changes * hit tasks while they are still in the scheduler's queue and trigger * property-change dequeues. */ static void *affinity_hammer_fn(void *arg) { pid_t *pids = arg; cpu_set_t cpuset; int i = 0, n = NUM_WORKERS; struct timespec start, now; clock_gettime(CLOCK_MONOTONIC, &start); while (1) { int w = i % n; int cpu = (i / n) % 4; CPU_ZERO(&cpuset); CPU_SET(cpu, &cpuset); sched_setaffinity(pids[w], sizeof(cpuset), &cpuset); i++; /* Check elapsed time every 256 iterations to limit gettime cost */ if ((i & 255) == 0) { long long elapsed_ms; clock_gettime(CLOCK_MONOTONIC, &now); elapsed_ms = (now.tv_sec - start.tv_sec) * 1000LL + (now.tv_nsec - start.tv_nsec) / 1000000; if (elapsed_ms >= AFFINITY_HAMMER_MS) break; } } return NULL; } static enum scx_test_status run_scenario(struct dequeue *skel, u32 scenario, const char *scenario_name) { struct bpf_link *link; pid_t pids[NUM_WORKERS]; pthread_t hammer; int i, status; u64 enq_start, deq_start, dispatch_deq_start, change_deq_start, bpf_queue_full_start; u64 enq_delta, deq_delta, dispatch_deq_delta, change_deq_delta, bpf_queue_full_delta; /* Set the test scenario */ skel->bss->test_scenario = scenario; /* Record starting counts */ enq_start = skel->bss->enqueue_cnt; deq_start = skel->bss->dequeue_cnt; dispatch_deq_start = skel->bss->dispatch_dequeue_cnt; change_deq_start = skel->bss->change_dequeue_cnt; bpf_queue_full_start = skel->bss->bpf_queue_full; link = bpf_map__attach_struct_ops(skel->maps.dequeue_ops); SCX_FAIL_IF(!link, "Failed to attach struct_ops for scenario %s", scenario_name); /* Fork worker processes to generate enqueue/dequeue events */ for (i = 0; i < NUM_WORKERS; i++) { pids[i] = fork(); SCX_FAIL_IF(pids[i] < 0, "Failed to fork worker %d", i); if (pids[i] == 0) { worker_fn(i); /* Should not reach here */ exit(1); } } /* * Run an "affinity hammer" so that some property changes hit tasks * while they are still in BPF custody (e.g., in user DSQ or BPF * queue), triggering SCX_DEQ_SCHED_CHANGE dequeues. */ SCX_FAIL_IF(pthread_create(&hammer, NULL, affinity_hammer_fn, pids) != 0, "Failed to create affinity hammer thread"); pthread_join(hammer, NULL); /* Wait for all workers to complete */ for (i = 0; i < NUM_WORKERS; i++) { SCX_FAIL_IF(waitpid(pids[i], &status, 0) != pids[i], "Failed to wait for worker %d", i); SCX_FAIL_IF(status != 0, "Worker %d exited with status %d", i, status); } bpf_link__destroy(link); SCX_EQ(skel->data->uei.kind, EXIT_KIND(SCX_EXIT_UNREG)); /* Calculate deltas */ enq_delta = skel->bss->enqueue_cnt - enq_start; deq_delta = skel->bss->dequeue_cnt - deq_start; dispatch_deq_delta = skel->bss->dispatch_dequeue_cnt - dispatch_deq_start; change_deq_delta = skel->bss->change_dequeue_cnt - change_deq_start; bpf_queue_full_delta = skel->bss->bpf_queue_full - bpf_queue_full_start; printf("%s:\n", scenario_name); printf(" enqueues: %lu\n", (unsigned long)enq_delta); printf(" dequeues: %lu (dispatch: %lu, property_change: %lu)\n", (unsigned long)deq_delta, (unsigned long)dispatch_deq_delta, (unsigned long)change_deq_delta); printf(" BPF queue full: %lu\n", (unsigned long)bpf_queue_full_delta); /* * Validate enqueue/dequeue lifecycle tracking. * * For scenarios 0, 1, 3, 4 (local and global DSQs from * ops.select_cpu() and ops.enqueue()), both enqueues and dequeues * should be 0 because tasks bypass the BPF scheduler entirely: * tasks never enter BPF scheduler's custody. * * For scenarios 2, 5, 6 (user DSQ or BPF internal queue) we expect * both enqueues and dequeues. * * The BPF code does strict state machine validation with * scx_bpf_error() to ensure the workflow semantics are correct. * * If we reach this point without errors, the semantics are * validated correctly. */ if (scenario == 0 || scenario == 1 || scenario == 3 || scenario == 4) { /* Tasks bypass BPF scheduler completely */ SCX_EQ(enq_delta, 0); SCX_EQ(deq_delta, 0); SCX_EQ(dispatch_deq_delta, 0); SCX_EQ(change_deq_delta, 0); } else { /* * User DSQ from ops.enqueue() or ops.select_cpu(): tasks * enter BPF scheduler's custody. * * Also validate 1:1 enqueue/dequeue pairing. */ SCX_GT(enq_delta, 0); SCX_GT(deq_delta, 0); SCX_EQ(enq_delta, deq_delta); } return SCX_TEST_PASS; } static enum scx_test_status setup(void **ctx) { struct dequeue *skel; skel = dequeue__open(); SCX_FAIL_IF(!skel, "Failed to open skel"); SCX_ENUM_INIT(skel); SCX_FAIL_IF(dequeue__load(skel), "Failed to load skel"); *ctx = skel; return SCX_TEST_PASS; } static enum scx_test_status run(void *ctx) { struct dequeue *skel = ctx; enum scx_test_status status; status = run_scenario(skel, 0, "Scenario 0: Local DSQ from ops.select_cpu()"); if (status != SCX_TEST_PASS) return status; status = run_scenario(skel, 1, "Scenario 1: Global DSQ from ops.select_cpu()"); if (status != SCX_TEST_PASS) return status; status = run_scenario(skel, 2, "Scenario 2: User DSQ from ops.select_cpu()"); if (status != SCX_TEST_PASS) return status; status = run_scenario(skel, 3, "Scenario 3: Local DSQ from ops.enqueue()"); if (status != SCX_TEST_PASS) return status; status = run_scenario(skel, 4, "Scenario 4: Global DSQ from ops.enqueue()"); if (status != SCX_TEST_PASS) return status; status = run_scenario(skel, 5, "Scenario 5: User DSQ from ops.enqueue()"); if (status != SCX_TEST_PASS) return status; status = run_scenario(skel, 6, "Scenario 6: BPF queue from ops.enqueue()"); if (status != SCX_TEST_PASS) return status; printf("\n=== Summary ===\n"); printf("Total enqueues: %lu\n", (unsigned long)skel->bss->enqueue_cnt); printf("Total dequeues: %lu\n", (unsigned long)skel->bss->dequeue_cnt); printf(" Dispatch dequeues: %lu (no flag, normal workflow)\n", (unsigned long)skel->bss->dispatch_dequeue_cnt); printf(" Property change dequeues: %lu (SCX_DEQ_SCHED_CHANGE flag)\n", (unsigned long)skel->bss->change_dequeue_cnt); printf(" BPF queue full: %lu\n", (unsigned long)skel->bss->bpf_queue_full); printf("\nAll scenarios passed - no state machine violations detected\n"); printf("-> Validated: Local DSQ dispatch bypasses BPF scheduler\n"); printf("-> Validated: Global DSQ dispatch bypasses BPF scheduler\n"); printf("-> Validated: User DSQ dispatch triggers ops.dequeue() callbacks\n"); printf("-> Validated: Dispatch dequeues have no flags (normal workflow)\n"); printf("-> Validated: Property change dequeues have SCX_DEQ_SCHED_CHANGE flag\n"); printf("-> Validated: No duplicate enqueues or invalid state transitions\n"); return SCX_TEST_PASS; } static void cleanup(void *ctx) { struct dequeue *skel = ctx; dequeue__destroy(skel); } struct scx_test dequeue_test = { .name = "dequeue", .description = "Verify ops.dequeue() semantics", .setup = setup, .run = run, .cleanup = cleanup, }; REGISTER_SCX_TEST(&dequeue_test)