From d9f93fdd626579408a1a60617d037d36fedf2594 Mon Sep 17 00:00:00 2001 From: MastahSenpai <26342994+ii64@users.noreply.github.com> Date: Mon, 27 Dec 2021 13:05:48 +0700 Subject: [PATCH] fix(queue/perf): cpu usage Signed-off-by: MastahSenpai <26342994+ii64@users.noreply.github.com> --- queue/queue.go | 39 ++++++++++++++++++++++++++++++++++++--- queue/queue_lock.go | 6 ++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 08eda32..19538df 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -113,6 +113,12 @@ func (q *Queue) isNeedEnter(flags *uint32) bool { } func (q *Queue) Submit() (ret int, err error) { + // q.sMx.Lock() + // defer q.sMx.Unlock() + return q.SubmitAndWait(0) +} + +func (q *Queue) SubmitAndWait(waitNr uint) (ret int, err error) { // q.sMx.Lock() // defer q.sMx.Unlock() submitted := q.sqFlush() @@ -126,7 +132,7 @@ func (q *Queue) Submit() (ret int, err error) { flags |= gouring.IORING_ENTER_GETEVENTS } - ret, err = q.ring.Enter(uint(submitted), 0, flags, nil) + ret, err = q.ring.Enter(uint(submitted), waitNr, flags, nil) return } @@ -147,6 +153,10 @@ func (q *Queue) cqAdvance(d uint32) { } func (q *Queue) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) { + return q.GetCQEntryWait(wait, 0) +} + +func (q *Queue) GetCQEntryWait(wait bool, waitNr uint) (cqe *gouring.CQEntry, err error) { // q.cqMx.Lock() // defer q.cqMx.Unlock() if err = q.precheck(); err != nil { @@ -162,10 +172,15 @@ func (q *Queue) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) { if !wait && !q.sq.IsCQOverflow() { err = syscall.EAGAIN return + } else if waitNr > 0 { + _, err = q.ring.Enter(0, waitNr, gouring.IORING_ENTER_GETEVENTS, nil) + if err != nil { + return + } } if q.sq.IsCQOverflow() { - _, err = q.ring.Enter(0, 0, gouring.IORING_ENTER_GETEVENTS, nil) + _, err = q.ring.Enter(0, waitNr, gouring.IORING_ENTER_GETEVENTS, nil) if err != nil { return } @@ -176,7 +191,6 @@ func (q *Queue) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) { runtime.Gosched() continue } - // implement interrupt } } @@ -185,6 +199,25 @@ func (q *Queue) Err() error { return q.err } +func (q *Queue) RunPoll(wait bool, waitNr uint, f QueueCQEHandler) (err error) { + for q.precheck() == nil { + cqe, err := q.GetCQEntryWait(wait, waitNr) + if cqe == nil || err != nil { + q.err = err + if err == ErrQueueClosed { + return err + } + continue + } + + err = f(cqe) + if err != nil { + return err + } + } + return nil +} + func (q *Queue) Run(wait bool, f QueueCQEHandler) (err error) { for q.precheck() == nil { cqe, err := q.GetCQEntry(wait) diff --git a/queue/queue_lock.go b/queue/queue_lock.go index 2551f7b..b01eefb 100644 --- a/queue/queue_lock.go +++ b/queue/queue_lock.go @@ -26,6 +26,12 @@ func (q *QueueLocks) Submit() (ret int, err error) { return q.Queue.Submit() } +func (q *QueueLocks) SubmitAndWait(waitNr uint) (ret int, err error) { + q.sMx.Lock() + defer q.sMx.Unlock() + return q.Queue.SubmitAndWait(waitNr) +} + // func (q *QueueLocks) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) {