mirror of
https://github.com/ii64/gouring.git
synced 2025-04-01 03:41:44 +02:00
test: ring concurrent enqeue
Add test for ring concurrent enqueue.
This commit is contained in:
parent
b97c7aa767
commit
0ebe45c00d
1 changed files with 45 additions and 4 deletions
|
@ -121,7 +121,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
for ctx.Err() == nil {
|
||||
err = h.io_uring_wait_cqe(&cqe)
|
||||
err = h.WaitCqe(&cqe)
|
||||
if err == syscall.EINTR {
|
||||
// ignore INTR
|
||||
continue
|
||||
|
@ -145,7 +145,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
_ = buf
|
||||
// fmt.Printf("%+#v %s", buf, buf)
|
||||
|
||||
h.io_uring_cqe_seen(cqe) // necessary
|
||||
h.SeenCqe(cqe) // necessary
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
for i := 0; i < tc.jobCount; i++ {
|
||||
var sqe *IoUringSqe
|
||||
for { // sqe could be nil if SQ is already full so we spin until we got one
|
||||
sqe = h.io_uring_get_sqe()
|
||||
sqe = h.GetSqe()
|
||||
if sqe != nil {
|
||||
break
|
||||
}
|
||||
|
@ -205,7 +205,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
go consumer(h, ctx, &wg)
|
||||
|
||||
for i := 0; i < tc.jobCount; i++ {
|
||||
sqe := h.io_uring_get_sqe()
|
||||
sqe := h.GetSqe()
|
||||
if sqe == nil {
|
||||
// spin until we got one
|
||||
continue
|
||||
|
@ -226,3 +226,44 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRingQueueConcurrentEnqueue(t *testing.T) {
|
||||
const entries = 64
|
||||
h := testNewIoUring(t, entries, 0)
|
||||
defer h.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var tabChecker sync.Map
|
||||
wg.Add(entries)
|
||||
for i := 0; i < entries; i++ {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
sqe := h.GetSqe()
|
||||
PrepNop(sqe)
|
||||
sqe.UserData.SetUint64(uint64(i))
|
||||
if _, exist := tabChecker.LoadOrStore(sqe, struct{}{}); exist {
|
||||
panic("enqueue race detect")
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
// Join results before submit
|
||||
wg.Wait()
|
||||
|
||||
// Ring full, this one should be nil.
|
||||
require.Nil(t, h.GetSqe())
|
||||
|
||||
// Submit
|
||||
submitted, err := h.Submit()
|
||||
require.NoError(t, err)
|
||||
println(submitted)
|
||||
|
||||
var cqe *IoUringCqe
|
||||
for i := 0; i < entries; i++ {
|
||||
h.WaitCqe(&cqe)
|
||||
if _, exist := tabChecker.LoadOrStore(cqe, struct{}{}); exist {
|
||||
panic("cqe race detect")
|
||||
}
|
||||
h.SeenCqe(cqe)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue