1
0
Fork 0
mirror of https://github.com/ii64/gouring.git synced 2025-04-26 05:42:48 +02:00

queue: fix cqe peek batch, and improve test

Signed-off-by: Nugraha <richiisei@gmail.com>
This commit is contained in:
Xeffy Chen 2022-07-07 23:20:18 +07:00
parent 501014a053
commit f3ba7e7189
Signed by: Xeffy
GPG key ID: E41C08AD390E7C49
2 changed files with 55 additions and 37 deletions

View file

@ -124,9 +124,11 @@ again:
count = ready
}
last = head + count
var i uintptr
for i = 0; head != last; i++ {
var i uintptr = 0
for head != last {
cqes[i] = ioUringCqeArray_Index(ring.Cq.Cqes, uintptr((head&mask)<<uint32(shift)))
i++
head++
}
return count
}
@ -362,7 +364,7 @@ func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
}
if next-head <= *sq.RingEntries {
sqe = ioUringSqeArray_Index(sq.Sqes, uintptr(sq.SqeTail&*sq.RingMask<<shift))
sqe = ioUringSqeArray_Index(sq.Sqes, uintptr((sq.SqeTail&*sq.RingMask)<<shift))
sq.SqeTail = next
return
}

View file

@ -1,6 +1,7 @@
package gouring
import (
"context"
"fmt"
"os"
"sync"
@ -12,12 +13,6 @@ import (
"github.com/stretchr/testify/require"
)
func testNewIoUring(t *testing.T, entries uint32, flags uint32) *IoUring {
h, err := New(entries, flags)
assert.NoError(t, err)
assert.NotNil(t, h)
return h
}
func TestRingQueueGetSQE(t *testing.T) {
h := testNewIoUring(t, 256, 0)
defer h.Close()
@ -31,18 +26,31 @@ func TestRingQueueGetSQE(t *testing.T) {
}
func TestRingQueueSubmitSingleConsumer(t *testing.T) {
ts := []int{
8,
32,
64,
128 + 2,
256,
type opt struct {
name string
jobCount int
entries uint32
p IoUringParams
}
ts := []opt{
{"def-8-256", 8, 256, IoUringParams{}},
{"def-16-256", 16, 256, IoUringParams{}},
{"def-32-256", 32, 256, IoUringParams{}},
{"def-64-256", 64, 256, IoUringParams{}},
{"def-128-256", 128, 256, IoUringParams{}},
{"def-128+2-256", 128 + 2, 256, IoUringParams{}}, // passwd 128
{"def-256-256", 256, 256, IoUringParams{}},
{"sqpoll-127-256", 127, 256, IoUringParams{Flags: IORING_SETUP_SQPOLL, SqThreadCpu: 4, SqThreadIdle: 10_000}},
{"sqpoll-128+2-256", 128 + 2, 256, IoUringParams{Flags: IORING_SETUP_SQPOLL, SqThreadCpu: 4, SqThreadIdle: 10_000}},
{"sqpoll-256-256", 256, 256, IoUringParams{Flags: IORING_SETUP_SQPOLL, SqThreadCpu: 4, SqThreadIdle: 10_000}},
// we can have other test for queue overflow.
}
for i := range ts {
jobCount := ts[i]
for _, tc := range ts {
t.Run(fmt.Sprintf("jobsz-%d", jobCount), func(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ftmp, err := os.CreateTemp(os.TempDir(), "test_iouring_queue_sc_*")
require.NoError(t, err)
defer ftmp.Close()
@ -55,13 +63,13 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
},
}
consumer := func(h *IoUring, wg *sync.WaitGroup) {
consumer := func(h *IoUring, ctx context.Context, wg *sync.WaitGroup) {
var cqe *IoUringCqe
var err error
for {
for ctx.Err() == nil {
err = h.io_uring_wait_cqe(&cqe)
if err == syscall.EINTR {
// ignore interrupt
// ignore INTR
continue
}
if err != nil {
@ -85,26 +93,32 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
}
}
submit := func(t *testing.T, h *IoUring, expectedSubmitCount int) {
submit := func(t *testing.T, opt *IoUringParams, h *IoUring, expectedSubmitCount int) {
submitted, err := h.io_uringn_submit()
assert.NoError(t, err)
assert.Equal(t, expectedSubmitCount, submitted)
if opt.Flags&IORING_SETUP_SQPOLL == 0 {
assert.Equal(t, expectedSubmitCount, submitted)
}
}
t.Run("submit_single", func(t *testing.T) {
var wg sync.WaitGroup
h := testNewIoUring(t, 256, 0)
h := testNewIoUringWithParam(t, 256, &tc.p)
defer h.Close()
wg.Add(jobCount)
go consumer(h, &wg)
wg.Add(tc.jobCount)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go consumer(h, ctx, &wg)
for i := 0; i < jobCount; i++ {
sqe := h.io_uring_get_sqe()
if sqe == nil {
// spin until we got one
continue
for i := 0; i < tc.jobCount; i++ {
var sqe *IoUringSqe
for { // sqe could be nil if SQ is already full so we spin until we got one
sqe = h.io_uring_get_sqe()
if sqe != nil {
break
}
}
bufptr := bufPool.Get().(*[]byte)
@ -115,7 +129,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
sqe.UserData = uint64(uintptr(unsafe.Pointer(bufptr)))
// submit
submit(t, h, 1)
submit(t, &tc.p, h, 1)
}
wg.Wait()
})
@ -123,13 +137,15 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
t.Run("submit_bulk", func(t *testing.T) {
var wg sync.WaitGroup
h := testNewIoUring(t, 256, 0)
h := testNewIoUringWithParam(t, 256, &tc.p)
defer h.Close()
wg.Add(jobCount)
go consumer(h, &wg)
wg.Add(tc.jobCount)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go consumer(h, ctx, &wg)
for i := 0; i < jobCount; i++ {
for i := 0; i < tc.jobCount; i++ {
sqe := h.io_uring_get_sqe()
if sqe == nil {
// spin until we got one
@ -144,7 +160,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
sqe.UserData = uint64(uintptr(unsafe.Pointer(bufptr)))
}
submit(t, h, jobCount)
submit(t, &tc.p, h, tc.jobCount)
wg.Wait()
})