mirror of
https://github.com/ii64/gouring.git
synced 2025-01-11 12:31:18 +01:00
fix: queue sqpoll mode returns zero submission
Signed-off-by: MastahSenpai <26342994+ii64@users.noreply.github.com>
This commit is contained in:
parent
c87f2013c0
commit
06809873b5
3 changed files with 66 additions and 2 deletions
1
queue/helper.go
Normal file
1
queue/helper.go
Normal file
|
@ -0,0 +1 @@
|
||||||
|
package queue
|
|
@ -88,6 +88,8 @@ func (q *Queue) sqFallback(d uint32) {
|
||||||
func (q *Queue) sqFlush() uint32 {
|
func (q *Queue) sqFlush() uint32 {
|
||||||
khead := atomic.LoadUint32(q.sq.Head())
|
khead := atomic.LoadUint32(q.sq.Head())
|
||||||
ktail := atomic.LoadUint32(q.sq.Tail())
|
ktail := atomic.LoadUint32(q.sq.Tail())
|
||||||
|
|
||||||
|
// if sq head equals sq tail
|
||||||
if q.sqeHead == q.sqeTail {
|
if q.sqeHead == q.sqeTail {
|
||||||
return ktail - khead
|
return ktail - khead
|
||||||
}
|
}
|
||||||
|
@ -106,7 +108,7 @@ func (q *Queue) isNeedEnter(flags *uint32) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if q.sq.IsNeedWakeup() {
|
if q.sq.IsNeedWakeup() {
|
||||||
*flags |= gouring.IORING_SQ_NEED_WAKEUP
|
*flags |= gouring.IORING_ENTER_SQ_WAKEUP
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
@ -125,6 +127,7 @@ func (q *Queue) SubmitAndWait(waitNr uint) (ret int, err error) {
|
||||||
|
|
||||||
var flags uint32
|
var flags uint32
|
||||||
if !q.isNeedEnter(&flags) || submitted == 0 {
|
if !q.isNeedEnter(&flags) || submitted == 0 {
|
||||||
|
ret = int(submitted)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,66 @@ func mkdata(i int) []byte {
|
||||||
return []byte("queue pls" + strings.Repeat("!", i) + fmt.Sprintf("%d", i) + "\n")
|
return []byte("queue pls" + strings.Repeat("!", i) + fmt.Sprintf("%d", i) + "\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestQueueSQPoll(t *testing.T) {
|
||||||
|
ring, err := gouring.New(256, &gouring.IOUringParams{
|
||||||
|
Flags: gouring.IORING_SETUP_SQPOLL,
|
||||||
|
SQThreadIdle: 70 * 1000,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err, "create ring")
|
||||||
|
defer func() {
|
||||||
|
err := ring.Close()
|
||||||
|
assert.NoError(t, err, "close ring")
|
||||||
|
}()
|
||||||
|
|
||||||
|
N := 64 + 64
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
btests := [][]byte{}
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
btests = append(btests, mkdata(i))
|
||||||
|
}
|
||||||
|
wg.Add(N)
|
||||||
|
|
||||||
|
// create new queue
|
||||||
|
q := New(ring)
|
||||||
|
defer func() {
|
||||||
|
err := q.Close()
|
||||||
|
assert.NoError(t, err, "close queue")
|
||||||
|
}()
|
||||||
|
|
||||||
|
//
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for i, b := range btests {
|
||||||
|
sqe := q.GetSQEntry()
|
||||||
|
sqe.UserData = uint64(i)
|
||||||
|
// sqe.Flags = gouring.IOSQE_IO_DRAIN
|
||||||
|
write(sqe, syscall.Stdout, b)
|
||||||
|
if (i+1)%2 == 0 {
|
||||||
|
n, err := q.Submit()
|
||||||
|
assert.NoError(t, err, "queue submit")
|
||||||
|
// assert.Equal(t, 2, n, "submit count mismatch") // may varies due to kernel thread consumng submission queue process time
|
||||||
|
fmt.Printf("submitted %d\n", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
q.Run(true, func(cqe *gouring.CQEntry) (err error) {
|
||||||
|
defer wg.Done()
|
||||||
|
fmt.Printf("cqe: %+#v\n", cqe)
|
||||||
|
assert.Condition(t, func() (success bool) {
|
||||||
|
return cqe.UserData < uint64(len(btests))
|
||||||
|
}, "userdata is set with the btest index")
|
||||||
|
assert.Conditionf(t, func() (success bool) {
|
||||||
|
return len(btests[cqe.UserData]) == int(cqe.Res)
|
||||||
|
}, "OP_WRITE result mismatch: %+#v", cqe)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestQueue(t *testing.T) {
|
func TestQueue(t *testing.T) {
|
||||||
ring, err := gouring.New(256, nil)
|
ring, err := gouring.New(256, nil)
|
||||||
assert.NoError(t, err, "create ring")
|
assert.NoError(t, err, "create ring")
|
||||||
|
@ -60,7 +120,7 @@ func TestQueue(t *testing.T) {
|
||||||
if (i+1)%2 == 0 {
|
if (i+1)%2 == 0 {
|
||||||
n, err := q.Submit()
|
n, err := q.Submit()
|
||||||
assert.NoError(t, err, "queue submit")
|
assert.NoError(t, err, "queue submit")
|
||||||
assert.Equal(t, n, 2, "submit count mismatch")
|
assert.Equal(t, 2, n, "submit count mismatch")
|
||||||
fmt.Printf("submitted %d\n", n)
|
fmt.Printf("submitted %d\n", n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue