diff --git a/queue_test.go b/queue_test.go index dbe5053..981111b 100644 --- a/queue_test.go +++ b/queue_test.go @@ -121,7 +121,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) { } }() for ctx.Err() == nil { - err = h.io_uring_wait_cqe(&cqe) + err = h.WaitCqe(&cqe) if err == syscall.EINTR { // ignore INTR continue @@ -145,7 +145,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) { _ = buf // fmt.Printf("%+#v %s", buf, buf) - h.io_uring_cqe_seen(cqe) // necessary + h.SeenCqe(cqe) // necessary wg.Done() } } @@ -172,7 +172,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) { for i := 0; i < tc.jobCount; i++ { var sqe *IoUringSqe for { // sqe could be nil if SQ is already full so we spin until we got one - sqe = h.io_uring_get_sqe() + sqe = h.GetSqe() if sqe != nil { break } @@ -205,7 +205,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) { go consumer(h, ctx, &wg) for i := 0; i < tc.jobCount; i++ { - sqe := h.io_uring_get_sqe() + sqe := h.GetSqe() if sqe == nil { // spin until we got one continue @@ -226,3 +226,44 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) { }) } } + +func TestRingQueueConcurrentEnqueue(t *testing.T) { + const entries = 64 + h := testNewIoUring(t, entries, 0) + defer h.Close() + + var wg sync.WaitGroup + var tabChecker sync.Map + wg.Add(entries) + for i := 0; i < entries; i++ { + go func(i int) { + defer wg.Done() + sqe := h.GetSqe() + PrepNop(sqe) + sqe.UserData.SetUint64(uint64(i)) + if _, exist := tabChecker.LoadOrStore(sqe, struct{}{}); exist { + panic("enqueue race detect") + } + }(i) + } + // Join results before submit + wg.Wait() + + // Ring full, this one should be nil. + require.Nil(t, h.GetSqe()) + + // Submit + submitted, err := h.Submit() + require.NoError(t, err) + println(submitted) + + var cqe *IoUringCqe + for i := 0; i < entries; i++ { + h.WaitCqe(&cqe) + if _, exist := tabChecker.LoadOrStore(cqe, struct{}{}); exist { + panic("cqe race detect") + } + h.SeenCqe(cqe) + } + +}