mirror of
https://github.com/ii64/gouring.git
synced 2025-04-26 05:42:48 +02:00
feat(queue/perf): impl locks polling
Signed-off-by: MastahSenpai <26342994+ii64@users.noreply.github.com>
This commit is contained in:
parent
d9f93fdd62
commit
57604628c2
2 changed files with 68 additions and 0 deletions
|
@ -40,6 +40,30 @@ func (q *QueueLocks) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) {
|
|||
return q.Queue.GetCQEntry(wait)
|
||||
}
|
||||
|
||||
func (q *QueueLocks) GetCQEntryWait(wait bool, waitNr uint) (cqe *gouring.CQEntry, err error) {
|
||||
q.cqMx.Lock()
|
||||
defer q.cqMx.Unlock()
|
||||
return q.Queue.GetCQEntryWait(wait, waitNr)
|
||||
}
|
||||
|
||||
func (q *QueueLocks) RunPoll(wait bool, waitNr uint, f QueueCQEHandler) (err error) {
|
||||
for q.precheck() == nil {
|
||||
cqe, err := q.GetCQEntryWait(wait, waitNr)
|
||||
if cqe == nil || err != nil {
|
||||
if err == ErrQueueClosed {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
err = f(cqe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *QueueLocks) Run(wait bool, f QueueCQEHandler) (err error) {
|
||||
for q.precheck() == nil {
|
||||
cqe, err := q.GetCQEntry(wait)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ii64/gouring"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -42,6 +43,7 @@ func TestQueue(t *testing.T) {
|
|||
|
||||
// create new queue
|
||||
q := New(ring)
|
||||
defer q.Close()
|
||||
go func() {
|
||||
for i, b := range btests {
|
||||
sqe := q.GetSQEntry()
|
||||
|
@ -73,3 +75,45 @@ func TestQueue(t *testing.T) {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestQueuePolling(t *testing.T) {
|
||||
ring, err := gouring.New(64, &gouring.IOUringParams{})
|
||||
assert.NoError(t, err, "create ring")
|
||||
defer func() {
|
||||
err := ring.Close()
|
||||
assert.NoError(t, err, "close ring")
|
||||
}()
|
||||
|
||||
q := New(ring)
|
||||
defer q.Close()
|
||||
|
||||
var tb = []byte("write me on stdout\n")
|
||||
var tu uint64 = 0xfafa
|
||||
chDone := make(chan struct{}, 1)
|
||||
|
||||
go q.RunPoll(true, 1, func(cqe *gouring.CQEntry) (err error) {
|
||||
if cqe.Res < 0 {
|
||||
t.Error(syscall.Errno(cqe.Res * -1))
|
||||
}
|
||||
assert.Equal(t, uint64(tu), uint64(cqe.UserData), "mismatch userdata")
|
||||
assert.Equal(t, uint64(len(tb)), uint64(cqe.Res), "mismatch written size")
|
||||
chDone <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
|
||||
t.Log("wait 3 second...")
|
||||
sqe := q.GetSQEntry()
|
||||
sqe.UserData = tu
|
||||
write(sqe, syscall.Stdout, tb)
|
||||
|
||||
n, err := q.Submit()
|
||||
assert.NoError(t, err, "submit")
|
||||
assert.Equal(t, 1, n, "submitted count")
|
||||
|
||||
select {
|
||||
case <-chDone:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Error("timeout wait")
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue