diff --git a/queue/queue.go b/queue/queue.go index fd14a29..7c14876 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -20,8 +20,8 @@ type QueueCQEHandler func(cqe *gouring.CQEntry) (err error) type Queue struct { ring *gouring.Ring - sq *gouring.SQRing - cq *gouring.CQRing + sq gouring.SQRing + cq gouring.CQRing sqeHead uint32 sqeTail uint32 @@ -59,11 +59,12 @@ func (q *Queue) precheck() error { // +// SQEntry ptr returned is passed by value func (q *Queue) _getSQEntry() *gouring.SQEntry { - head := atomic.LoadUint32(q.sq.Head()) + head := atomic.LoadUint32(q.sq.Head) next := q.sqeTail + 1 - if (next - head) <= atomic.LoadUint32(q.sq.RingEntries()) { - sqe := q.sq.Get(q.sqeTail & (*q.sq.RingMask())) + if (next - head) <= atomic.LoadUint32(q.sq.RingEntries) { + sqe := &q.sq.Event[q.sqeTail&(*q.sq.RingMask)] q.sqeTail = next sqe.Reset() return sqe @@ -77,7 +78,8 @@ func (q *Queue) GetSQEntry() (sqe *gouring.SQEntry) { if sqe != nil { return } - // runtime.Gosched() + + runtime.Gosched() } } @@ -86,8 +88,8 @@ func (q *Queue) sqFallback(d uint32) { } func (q *Queue) sqFlush() uint32 { - khead := atomic.LoadUint32(q.sq.Head()) - ktail := atomic.LoadUint32(q.sq.Tail()) + khead := atomic.LoadUint32(q.sq.Head) + ktail := atomic.LoadUint32(q.sq.Tail) // if sq head equals sq tail if q.sqeHead == q.sqeTail { @@ -95,11 +97,11 @@ func (q *Queue) sqFlush() uint32 { } for toSubmit := q.sqeTail - q.sqeHead; toSubmit > 0; toSubmit-- { - *q.sq.Array().Get(ktail & (*q.sq.RingMask())) = q.sqeHead & (*q.sq.RingMask()) + *q.sq.Array.Get(ktail & (*q.sq.RingMask)) = q.sqeHead & (*q.sq.RingMask) ktail++ q.sqeHead++ } - atomic.StoreUint32(q.sq.Tail(), ktail) + atomic.StoreUint32(q.sq.Tail, ktail) return ktail - khead } @@ -142,16 +144,16 @@ func (q *Queue) SubmitAndWait(waitNr uint) (ret int, err error) { // func (q *Queue) cqPeek() (cqe *gouring.CQEntry) { - khead := atomic.LoadUint32(q.cq.Head()) - if khead != atomic.LoadUint32(q.cq.Tail()) { - cqe = q.cq.Get(khead & atomic.LoadUint32(q.cq.RingMask())) + khead := atomic.LoadUint32(q.cq.Head) + if khead != atomic.LoadUint32(q.cq.Tail) { + cqe = &q.cq.Event[khead&atomic.LoadUint32(q.cq.RingMask)] } return } func (q *Queue) cqAdvance(d uint32) { if d != 0 { - atomic.AddUint32(q.cq.Head(), d) // mark readed + atomic.AddUint32(q.cq.Head, d) // mark readed } } @@ -165,7 +167,7 @@ func (q *Queue) GetCQEntryWait(wait bool, waitNr uint) (cqe *gouring.CQEntry, er if err = q.precheck(); err != nil { return } - var tryPeeks int + // var tryPeeks int for { if cqe = q.cqPeek(); cqe != nil { q.cqAdvance(1) @@ -190,10 +192,10 @@ func (q *Queue) GetCQEntryWait(wait bool, waitNr uint) (cqe *gouring.CQEntry, er continue } - if tryPeeks++; tryPeeks < 3 { - runtime.Gosched() - continue - } + // if tryPeeks++; tryPeeks < 3 { + runtime.Gosched() + // continue + // } // implement interrupt } }