From 06809873b53ccda93577778c06122948659ba37b Mon Sep 17 00:00:00 2001 From: MastahSenpai <26342994+ii64@users.noreply.github.com> Date: Tue, 25 Jan 2022 02:24:01 +0700 Subject: [PATCH] fix: queue sqpoll mode returns zero submission Signed-off-by: MastahSenpai <26342994+ii64@users.noreply.github.com> --- queue/helper.go | 1 + queue/queue.go | 5 +++- queue/queue_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 queue/helper.go diff --git a/queue/helper.go b/queue/helper.go new file mode 100644 index 0000000..c969bad --- /dev/null +++ b/queue/helper.go @@ -0,0 +1 @@ +package queue diff --git a/queue/queue.go b/queue/queue.go index 9538764..fd14a29 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -88,6 +88,8 @@ func (q *Queue) sqFallback(d uint32) { func (q *Queue) sqFlush() uint32 { khead := atomic.LoadUint32(q.sq.Head()) ktail := atomic.LoadUint32(q.sq.Tail()) + + // if sq head equals sq tail if q.sqeHead == q.sqeTail { return ktail - khead } @@ -106,7 +108,7 @@ func (q *Queue) isNeedEnter(flags *uint32) bool { return true } if q.sq.IsNeedWakeup() { - *flags |= gouring.IORING_SQ_NEED_WAKEUP + *flags |= gouring.IORING_ENTER_SQ_WAKEUP return true } return false @@ -125,6 +127,7 @@ func (q *Queue) SubmitAndWait(waitNr uint) (ret int, err error) { var flags uint32 if !q.isNeedEnter(&flags) || submitted == 0 { + ret = int(submitted) return } diff --git a/queue/queue_test.go b/queue/queue_test.go index a835e94..529d107 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -26,6 +26,66 @@ func mkdata(i int) []byte { return []byte("queue pls" + strings.Repeat("!", i) + fmt.Sprintf("%d", i) + "\n") } +func TestQueueSQPoll(t *testing.T) { + ring, err := gouring.New(256, &gouring.IOUringParams{ + Flags: gouring.IORING_SETUP_SQPOLL, + SQThreadIdle: 70 * 1000, + }) + assert.NoError(t, err, "create ring") + defer func() { + err := ring.Close() + assert.NoError(t, err, "close ring") + }() + + N := 64 + 64 + var wg sync.WaitGroup + btests := [][]byte{} + for i := 0; i < N; i++ { + btests = append(btests, mkdata(i)) + } + wg.Add(N) + + // create new queue + q := New(ring) + defer func() { + err := q.Close() + assert.NoError(t, err, "close queue") + }() + + // + + go func() { + for i, b := range btests { + sqe := q.GetSQEntry() + sqe.UserData = uint64(i) + // sqe.Flags = gouring.IOSQE_IO_DRAIN + write(sqe, syscall.Stdout, b) + if (i+1)%2 == 0 { + n, err := q.Submit() + assert.NoError(t, err, "queue submit") + // assert.Equal(t, 2, n, "submit count mismatch") // may varies due to kernel thread consumng submission queue process time + fmt.Printf("submitted %d\n", n) + } + } + }() + go func() { + q.Run(true, func(cqe *gouring.CQEntry) (err error) { + defer wg.Done() + fmt.Printf("cqe: %+#v\n", cqe) + assert.Condition(t, func() (success bool) { + return cqe.UserData < uint64(len(btests)) + }, "userdata is set with the btest index") + assert.Conditionf(t, func() (success bool) { + return len(btests[cqe.UserData]) == int(cqe.Res) + }, "OP_WRITE result mismatch: %+#v", cqe) + + return nil + }) + }() + + wg.Wait() +} + func TestQueue(t *testing.T) { ring, err := gouring.New(256, nil) assert.NoError(t, err, "create ring") @@ -60,7 +120,7 @@ func TestQueue(t *testing.T) { if (i+1)%2 == 0 { n, err := q.Submit() assert.NoError(t, err, "queue submit") - assert.Equal(t, n, 2, "submit count mismatch") + assert.Equal(t, 2, n, "submit count mismatch") fmt.Printf("submitted %d\n", n) } }