1
0
Fork 0
mirror of https://github.com/ii64/gouring.git synced 2025-04-01 03:41:44 +02:00

fix(queue/perf): cpu usage

Signed-off-by: MastahSenpai <26342994+ii64@users.noreply.github.com>
This commit is contained in:
MastahSenpai 2021-12-27 13:05:48 +07:00
parent 64b72d47c8
commit d9f93fdd62
Signed by untrusted user who does not match committer: Xeffy
GPG key ID: E41C08AD390E7C49
2 changed files with 42 additions and 3 deletions

View file

@ -113,6 +113,12 @@ func (q *Queue) isNeedEnter(flags *uint32) bool {
}
func (q *Queue) Submit() (ret int, err error) {
// q.sMx.Lock()
// defer q.sMx.Unlock()
return q.SubmitAndWait(0)
}
func (q *Queue) SubmitAndWait(waitNr uint) (ret int, err error) {
// q.sMx.Lock()
// defer q.sMx.Unlock()
submitted := q.sqFlush()
@ -126,7 +132,7 @@ func (q *Queue) Submit() (ret int, err error) {
flags |= gouring.IORING_ENTER_GETEVENTS
}
ret, err = q.ring.Enter(uint(submitted), 0, flags, nil)
ret, err = q.ring.Enter(uint(submitted), waitNr, flags, nil)
return
}
@ -147,6 +153,10 @@ func (q *Queue) cqAdvance(d uint32) {
}
func (q *Queue) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) {
return q.GetCQEntryWait(wait, 0)
}
func (q *Queue) GetCQEntryWait(wait bool, waitNr uint) (cqe *gouring.CQEntry, err error) {
// q.cqMx.Lock()
// defer q.cqMx.Unlock()
if err = q.precheck(); err != nil {
@ -162,10 +172,15 @@ func (q *Queue) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) {
if !wait && !q.sq.IsCQOverflow() {
err = syscall.EAGAIN
return
} else if waitNr > 0 {
_, err = q.ring.Enter(0, waitNr, gouring.IORING_ENTER_GETEVENTS, nil)
if err != nil {
return
}
}
if q.sq.IsCQOverflow() {
_, err = q.ring.Enter(0, 0, gouring.IORING_ENTER_GETEVENTS, nil)
_, err = q.ring.Enter(0, waitNr, gouring.IORING_ENTER_GETEVENTS, nil)
if err != nil {
return
}
@ -176,7 +191,6 @@ func (q *Queue) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) {
runtime.Gosched()
continue
}
// implement interrupt
}
}
@ -185,6 +199,25 @@ func (q *Queue) Err() error {
return q.err
}
func (q *Queue) RunPoll(wait bool, waitNr uint, f QueueCQEHandler) (err error) {
for q.precheck() == nil {
cqe, err := q.GetCQEntryWait(wait, waitNr)
if cqe == nil || err != nil {
q.err = err
if err == ErrQueueClosed {
return err
}
continue
}
err = f(cqe)
if err != nil {
return err
}
}
return nil
}
func (q *Queue) Run(wait bool, f QueueCQEHandler) (err error) {
for q.precheck() == nil {
cqe, err := q.GetCQEntry(wait)

View file

@ -26,6 +26,12 @@ func (q *QueueLocks) Submit() (ret int, err error) {
return q.Queue.Submit()
}
func (q *QueueLocks) SubmitAndWait(waitNr uint) (ret int, err error) {
q.sMx.Lock()
defer q.sMx.Unlock()
return q.Queue.SubmitAndWait(waitNr)
}
//
func (q *QueueLocks) GetCQEntry(wait bool) (cqe *gouring.CQEntry, err error) {