mirror of
https://github.com/ii64/gouring.git
synced 2025-03-23 23:29:29 +01:00
all: use of unsafe pointer for ring, and adjustments
There's other adjustment/addition: * Function name CQE/SQE renamed to Cqe/Sqe. * SQ entry union sets value withput util to reduce function complexity, so leaving function budget for another stuff. * UserData has its own type, implement for uint64, uintptr, and unsafe. * TODO: decide what to use `Syscall` or `RawSyscall`. * Exposed `SeenCqe`. * Ignore *.test file extension. * Inline cost tool. Signed-off-by: Nugraha <richiisei@gmail.com>
This commit is contained in:
parent
56090b50c8
commit
7eedf9754b
18 changed files with 465 additions and 192 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -1,3 +1,4 @@
|
|||
.vscode/
|
||||
*.cpu
|
||||
*.mem
|
||||
*.mem
|
||||
*.test
|
5
Makefile
Normal file
5
Makefile
Normal file
|
@ -0,0 +1,5 @@
|
|||
test:
|
||||
go test -c -gcflags=all=-d=checkptr .
|
||||
./gouring.test
|
||||
clean:
|
||||
rm -rf *.test
|
73
hdr.go
73
hdr.go
|
@ -7,49 +7,52 @@
|
|||
*/
|
||||
package gouring
|
||||
|
||||
import "unsafe"
|
||||
|
||||
/*
|
||||
* IO submission data structure (Submission Queue Entry)
|
||||
*/
|
||||
type IoUringSqe_Union1 union64
|
||||
type IoUringSqe_Union1 uint64
|
||||
|
||||
func (u *IoUringSqe_Union1) SetOffset(v uint64) { (*union64)(u).PutUint64(v) }
|
||||
func (u *IoUringSqe_Union1) SetAddr2(v uint64) { (*union64)(u).PutUint64(v) }
|
||||
func (u *IoUringSqe_Union1) SetOffset(v uint64) { *u = IoUringSqe_Union1(v) }
|
||||
func (u *IoUringSqe_Union1) SetAddr2(v uint64) { *u = IoUringSqe_Union1(v) }
|
||||
|
||||
type IoUringSqe_Union2 union64
|
||||
type IoUringSqe_Union2 uint64
|
||||
|
||||
func (u *IoUringSqe_Union2) SetAddr(v uint64) { (*union64)(u).PutUint64(v) }
|
||||
func (u *IoUringSqe_Union2) SetSpliceOffsetIn(v uint64) { (*union64)(u).PutUint64(v) }
|
||||
func (u *IoUringSqe_Union2) SetAddr_Value(v uint64) { *u = IoUringSqe_Union2(v) }
|
||||
func (u *IoUringSqe_Union2) SetAddr(v unsafe.Pointer) { *u = IoUringSqe_Union2((uintptr)(v)) }
|
||||
func (u *IoUringSqe_Union2) SetSpliceOffsetIn(v uint64) { *u = IoUringSqe_Union2(v) }
|
||||
|
||||
type IoUringSqe_Union3 union32
|
||||
type IoUringSqe_Union3 uint32
|
||||
|
||||
func (u *IoUringSqe_Union3) SetRwFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetPollEvents(v uint16) { (*union32)(u).PutUint16(v) }
|
||||
func (u *IoUringSqe_Union3) SetPoll32Events(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetSyncRangeFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetMsgFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetTimeoutFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetAcceptFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetCancelFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetOpenFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetStatxFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetFadviseAdvice(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetSpliceFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetRenameFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetUnlinkFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetHardlinkFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetXattrFlags(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union3) SetOpFlags(v uint32) { (*union32)(u).PutUint32(v) } //generic
|
||||
func (u *IoUringSqe_Union3) GetOpFlags() uint32 { return (*union32)(u).Uint32() } //generic
|
||||
func (u *IoUringSqe_Union3) SetRwFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetPollEvents(v uint16) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetPoll32Events(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetSyncRangeFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetMsgFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetTimeoutFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetAcceptFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetCancelFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetOpenFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetStatxFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetFadviseAdvice(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetSpliceFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetRenameFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetUnlinkFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetHardlinkFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetXattrFlags(v uint32) { *u = IoUringSqe_Union3(v) }
|
||||
func (u *IoUringSqe_Union3) SetOpFlags(v uint32) { *u = IoUringSqe_Union3(v) } //generic
|
||||
func (u IoUringSqe_Union3) GetOpFlags() uint32 { return uint32(u) } //generic
|
||||
|
||||
type IoUringSqe_Union4 union16
|
||||
type IoUringSqe_Union4 uint16
|
||||
|
||||
func (u *IoUringSqe_Union4) SetBufIndex(v uint16) { (*union16)(u).PutUint16(v) }
|
||||
func (u *IoUringSqe_Union4) SetBufGroup(v uint16) { (*union16)(u).PutUint16(v) }
|
||||
func (u *IoUringSqe_Union4) SetBufIndex(v uint16) { *u = IoUringSqe_Union4(v) }
|
||||
func (u *IoUringSqe_Union4) SetBufGroup(v uint16) { *u = IoUringSqe_Union4(v) }
|
||||
|
||||
type IoUringSqe_Union5 union32
|
||||
type IoUringSqe_Union5 uint32
|
||||
|
||||
func (u *IoUringSqe_Union5) SetSpliceFdIn(v int32) { (*union32)(u).PutInt32(v) }
|
||||
func (u *IoUringSqe_Union5) SetFileIndex(v uint32) { (*union32)(u).PutUint32(v) }
|
||||
func (u *IoUringSqe_Union5) SetSpliceFdIn(v int32) { *u = IoUringSqe_Union5(v) }
|
||||
func (u *IoUringSqe_Union5) SetFileIndex(v uint32) { *u = IoUringSqe_Union5(v) }
|
||||
|
||||
type IoUringSqe struct {
|
||||
Opcode IoUringOp /* type of operation for this sqe */
|
||||
|
@ -92,7 +95,7 @@ type IoUringSqe struct {
|
|||
// };
|
||||
IoUringSqe_Union3
|
||||
|
||||
UserData uint64 /* data to be passed back at completion time */
|
||||
UserData UserData /* data to be passed back at completion time */
|
||||
|
||||
/* pack this to avoid bogus arm OABI complaints */
|
||||
// union {
|
||||
|
@ -190,6 +193,8 @@ const IORING_SETUP_CQE32 = (1 << 11) /* CQEs are 32 byte */
|
|||
|
||||
type IoUringOp = uint8
|
||||
|
||||
//go:generate stringerx -type=IoUringOp
|
||||
|
||||
const (
|
||||
IORING_OP_NOP IoUringOp = iota
|
||||
IORING_OP_READV
|
||||
|
@ -318,8 +323,8 @@ const IORING_ACCEPT_MULTISHOT = (1 << 0)
|
|||
* IO completion data structure (Completion Queue Entry)
|
||||
*/
|
||||
type IoUringCqe struct {
|
||||
UserData uint64 /* sqe->data submission passed back */
|
||||
Res int32 /* result code for this event */
|
||||
UserData UserData /* sqe->data submission passed back */
|
||||
Res int32 /* result code for this event */
|
||||
Flags uint32
|
||||
|
||||
/*
|
||||
|
|
|
@ -13,10 +13,10 @@ type IoUring struct {
|
|||
Sq IoUringSq
|
||||
Cq IoUringCq
|
||||
Flags uint32
|
||||
RingFd int32
|
||||
RingFd int
|
||||
|
||||
Features uint32
|
||||
EnterRingFd int32
|
||||
EnterRingFd int
|
||||
IntFlags uint8
|
||||
|
||||
pad [3]uint8
|
||||
|
@ -24,12 +24,12 @@ type IoUring struct {
|
|||
}
|
||||
|
||||
type IoUringSq struct {
|
||||
Head *uint32
|
||||
Tail *uint32
|
||||
RingMask *uint32
|
||||
RingEntries *uint32
|
||||
Flags *uint32
|
||||
Dropped *uint32
|
||||
head unsafe.Pointer // *uint32
|
||||
tail unsafe.Pointer // *uint32
|
||||
ringMask unsafe.Pointer // *uint32
|
||||
ringEntries unsafe.Pointer // *uint32
|
||||
flags unsafe.Pointer // *uint32
|
||||
dropped unsafe.Pointer // *uint32
|
||||
|
||||
Array uint32Array //ptr arith
|
||||
Sqes ioUringSqeArray //ptr arith
|
||||
|
@ -38,23 +38,37 @@ type IoUringSq struct {
|
|||
SqeTail uint32
|
||||
|
||||
RingSz uint32
|
||||
RingPtr uintptr
|
||||
RingPtr unsafe.Pointer
|
||||
|
||||
pad [4]uint32
|
||||
}
|
||||
|
||||
func (sq *IoUringSq) _Head() *uint32 { return (*uint32)(sq.head) }
|
||||
func (sq *IoUringSq) _Tail() *uint32 { return (*uint32)(sq.tail) }
|
||||
func (sq *IoUringSq) _RingMask() *uint32 { return (*uint32)(sq.ringMask) }
|
||||
func (sq *IoUringSq) _RingEntries() *uint32 { return (*uint32)(sq.ringEntries) }
|
||||
func (sq *IoUringSq) _Flags() *uint32 { return (*uint32)(sq.flags) }
|
||||
func (sq *IoUringSq) _Dropped() *uint32 { return (*uint32)(sq.dropped) }
|
||||
|
||||
type IoUringCq struct {
|
||||
Head *uint32
|
||||
Tail *uint32
|
||||
RingMask *uint32
|
||||
RingEntries *uint32
|
||||
Flags *uint32
|
||||
Overflow *uint32
|
||||
head unsafe.Pointer // *uint32
|
||||
tail unsafe.Pointer // *uint32
|
||||
ringMask unsafe.Pointer // *uint32
|
||||
ringEntries unsafe.Pointer // *uint32
|
||||
flags unsafe.Pointer // *uint32
|
||||
overflow unsafe.Pointer // *uint32
|
||||
|
||||
Cqes ioUringCqeArray //ptr arith
|
||||
|
||||
RingSz uint32
|
||||
RingPtr uintptr
|
||||
RingPtr unsafe.Pointer
|
||||
|
||||
pad [4]uint32
|
||||
}
|
||||
|
||||
func (cq *IoUringCq) _Head() *uint32 { return (*uint32)(cq.head) }
|
||||
func (cq *IoUringCq) _Tail() *uint32 { return (*uint32)(cq.tail) }
|
||||
func (cq *IoUringCq) _RingMask() *uint32 { return (*uint32)(cq.ringMask) }
|
||||
func (cq *IoUringCq) _RingEntries() *uint32 { return (*uint32)(cq.ringEntries) }
|
||||
func (cq *IoUringCq) _Flags() *uint32 { return (*uint32)(cq.flags) }
|
||||
func (cq *IoUringCq) _Overflow() *uint32 { return (*uint32)(cq.overflow) }
|
||||
|
|
63
inline_cost.py
Normal file
63
inline_cost.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
#!/bin/python3
|
||||
import re
|
||||
import subprocess
|
||||
from io import BytesIO
|
||||
from typing import Tuple, List
|
||||
|
||||
from sys import (
|
||||
stdout as sys_stdout,
|
||||
stderr as sys_stderr
|
||||
)
|
||||
|
||||
# https://dave.cheney.net/2020/05/02/mid-stack-inlining-in-go
|
||||
|
||||
gcflags = [
|
||||
"-m=2",
|
||||
]
|
||||
|
||||
re_cost = re.compile(rb"with cost (\d+) as:")
|
||||
re_cost2 = re.compile(rb"cost (\d+) exceeds")
|
||||
|
||||
def main():
|
||||
h = subprocess.Popen(
|
||||
args=["go","build","-gcflags="+" ".join(gcflags),"."],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
ret: Tuple[str, str] = h.communicate()
|
||||
stdout, stderr = ret
|
||||
|
||||
lines: List[bytes] = stderr.split(b"\n")
|
||||
inlined_lines: List[bytes] = [line for line in lines if b"inline" in line]
|
||||
|
||||
can_inline: List[Tuple[bytes, int]] = []
|
||||
cannot_inline: List[Tuple[bytes, int]] = []
|
||||
for line in inlined_lines:
|
||||
if b"can inline" in line:
|
||||
inline_cost = int(re_cost.findall(line)[0])
|
||||
can_inline += [ (line, inline_cost) ]
|
||||
elif b"cannot inline" in line:
|
||||
cur_cost = 0
|
||||
try:
|
||||
cur_cost = int(re_cost2.findall(line)[0])
|
||||
except: pass
|
||||
cannot_inline += [ (line, cur_cost) ]
|
||||
else:
|
||||
sys_stderr.write(b"[UNK] ")
|
||||
sys_stderr.write(line)
|
||||
sys_stderr.write(b"\n")
|
||||
|
||||
# sort by cost
|
||||
# can_inline = sorted(can_inline, key=lambda v: v[1])
|
||||
# cannot_inline = sorted(cannot_inline, key=lambda v: v[1])
|
||||
|
||||
for item in can_inline:
|
||||
print( (str(item[1]).encode() +b"\t"+ item[0]) .decode() )
|
||||
|
||||
print("============")
|
||||
|
||||
for item in cannot_inline:
|
||||
print( (str(item[1]).encode() +b"\t"+ item[0]) .decode() )
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
49
prep.go
49
prep.go
|
@ -11,13 +11,13 @@ func PrepRW(op IoUringOp, sqe *IoUringSqe, fd int,
|
|||
sqe.Flags = 0
|
||||
sqe.IoPrio = 0
|
||||
sqe.Fd = int32(fd)
|
||||
sqe.SetOffset(offset)
|
||||
sqe.SetAddr(uint64(uintptr(addr)))
|
||||
sqe.IoUringSqe_Union1 = IoUringSqe_Union1(offset) // union1
|
||||
sqe.IoUringSqe_Union2 = *(*IoUringSqe_Union2)(unsafe.Pointer(&addr)) // union2
|
||||
sqe.Len = uint32(len)
|
||||
sqe.SetOpFlags(0)
|
||||
sqe.SetBufIndex(0)
|
||||
sqe.IoUringSqe_Union3 = 0 // sqe.SetOpFlags(0) // union3
|
||||
sqe.IoUringSqe_Union4 = 0 // sqe.SetBufIndex(0) // union4
|
||||
sqe.Personality = 0
|
||||
sqe.SetFileIndex(0)
|
||||
sqe.IoUringSqe_Union5 = 0 // sqe.SetFileIndex(0) // union5
|
||||
sqe.Addr3 = 0
|
||||
sqe.__pad2[0] = 0
|
||||
}
|
||||
|
@ -31,6 +31,20 @@ func PrepTimeout(sqe *IoUringSqe, ts *syscall.Timespec, count uint32, flags uint
|
|||
sqe.SetTimeoutFlags(flags)
|
||||
}
|
||||
|
||||
func PrepTimeoutRemove(sqe *IoUringSqe, userDaata uint64, flags uint32) {
|
||||
PrepRW(IORING_OP_TIMEOUT_REMOVE, sqe, -1, nil, 0, 0)
|
||||
sqe.SetAddr_Value(userDaata)
|
||||
sqe.SetTimeoutFlags(flags)
|
||||
}
|
||||
|
||||
func PrepTimeoutUpdate(sqe *IoUringSqe, ts *syscall.Timespec, userData uint64, flags uint32) {
|
||||
PrepRW(IORING_OP_TIMEOUT_REMOVE, sqe, -1, nil, 0, 0)
|
||||
sqe.SetAddr_Value(userData)
|
||||
sqe.SetTimeoutFlags(flags | IORING_TIMEOUT_UPDATE)
|
||||
}
|
||||
|
||||
// ** "Syscall" OP
|
||||
|
||||
func PrepRead(sqe *IoUringSqe, fd int, buf *byte, nb int, offset uint64) {
|
||||
PrepRW(IORING_OP_READ, sqe, fd, unsafe.Pointer(buf), nb, offset)
|
||||
}
|
||||
|
@ -61,8 +75,29 @@ func PrepWritev2(sqe *IoUringSqe, fd int,
|
|||
sqe.SetRwFlags(flags)
|
||||
}
|
||||
|
||||
func PrepAccept(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags int) {
|
||||
*rsaSz = syscall.SizeofSockaddrAny
|
||||
func PrepAccept(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint) {
|
||||
// *rsaSz = syscall.SizeofSockaddrAny // leave this out to caller?
|
||||
PrepRW(IORING_OP_ACCEPT, sqe, fd, unsafe.Pointer(rsa), 0, uint64(uintptr(unsafe.Pointer(rsaSz))))
|
||||
sqe.SetAcceptFlags(uint32(flags))
|
||||
}
|
||||
|
||||
func PrepClose(sqe *IoUringSqe, fd int) {
|
||||
PrepRW(IORING_OP_CLOSE, sqe, fd, nil, 0, 0)
|
||||
}
|
||||
|
||||
func PrepRecvmsg(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint) {
|
||||
PrepRW(IORING_OP_RECVMSG, sqe, fd, unsafe.Pointer(msg), 1, 0)
|
||||
sqe.SetMsgFlags(uint32(flags))
|
||||
}
|
||||
|
||||
func PrepSendmsg(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint) {
|
||||
PrepRW(IORING_OP_SENDMSG, sqe, fd, unsafe.Pointer(msg), 1, 0)
|
||||
sqe.SetMsgFlags(uint32(flags))
|
||||
}
|
||||
|
||||
// ** Multishot
|
||||
|
||||
func PrepMultishotAccept(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint) {
|
||||
PrepAccept(sqe, fd, rsa, rsaSz, flags)
|
||||
sqe.IoPrio |= IORING_ACCEPT_MULTISHOT
|
||||
}
|
||||
|
|
45
queue.go
45
queue.go
|
@ -1,6 +1,7 @@
|
|||
package gouring
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
@ -20,7 +21,7 @@ func (ring *IoUring) sq_ring_needs_enter(flags *uint32) bool {
|
|||
|
||||
// FIXME: io_uring_smp_mb
|
||||
|
||||
if atomic.LoadUint32(ring.Sq.Flags)&IORING_SQ_NEED_WAKEUP != 0 {
|
||||
if atomic.LoadUint32(ring.Sq._Flags())&IORING_SQ_NEED_WAKEUP != 0 {
|
||||
*flags |= IORING_ENTER_SQ_WAKEUP
|
||||
return true
|
||||
}
|
||||
|
@ -28,7 +29,7 @@ func (ring *IoUring) sq_ring_needs_enter(flags *uint32) bool {
|
|||
}
|
||||
|
||||
func (ring *IoUring) cq_ring_needs_flush() bool {
|
||||
return atomic.LoadUint32(ring.Sq.Flags)&(IORING_SQ_CQ_OVERFLOW|IORING_SQ_TASKRUN) != 0
|
||||
return atomic.LoadUint32(ring.Sq._Flags())&(IORING_SQ_CQ_OVERFLOW|IORING_SQ_TASKRUN) != 0
|
||||
}
|
||||
|
||||
func (ring *IoUring) cq_ring_needs_enter() bool {
|
||||
|
@ -80,7 +81,7 @@ func (ring *IoUring) _io_uring_get_cqe(cqePtr **IoUringCqe, data *get_data) (err
|
|||
if err != nil {
|
||||
break
|
||||
}
|
||||
data.submit = data.submit - uint32(ret)
|
||||
data.submit -= uint32(ret)
|
||||
if cqe != nil {
|
||||
break
|
||||
}
|
||||
|
@ -117,8 +118,8 @@ func (ring *IoUring) io_uring_peek_batch_cqe(cqes []*IoUringCqe, count uint32) u
|
|||
again:
|
||||
ready = ring.io_uring_cq_ready()
|
||||
if ready > 0 {
|
||||
var head = *ring.Cq.Head
|
||||
var mask = *ring.Cq.RingMask
|
||||
var head = *ring.Cq._Head()
|
||||
var mask = *ring.Cq._RingMask()
|
||||
var last uint32
|
||||
if count > ready {
|
||||
count = ready
|
||||
|
@ -157,8 +158,8 @@ done:
|
|||
*/
|
||||
func (ring *IoUring) __io_uring_flush_sq() uint32 {
|
||||
sq := &ring.Sq
|
||||
var mask = *sq.RingMask
|
||||
var ktail = *sq.Tail
|
||||
var mask = *sq._RingMask()
|
||||
var ktail = *sq._Tail()
|
||||
var toSubmit = sq.SqeTail - sq.SqeHead
|
||||
|
||||
if toSubmit < 1 {
|
||||
|
@ -178,7 +179,7 @@ func (ring *IoUring) __io_uring_flush_sq() uint32 {
|
|||
* Ensure that the kernel sees the SQE updates before it sees the tail
|
||||
* update.
|
||||
*/
|
||||
atomic.StoreUint32(sq.Tail, ktail)
|
||||
atomic.StoreUint32(sq._Tail(), ktail)
|
||||
|
||||
out:
|
||||
/*
|
||||
|
@ -192,7 +193,7 @@ out:
|
|||
* we can submit. The point is, we need to be able to deal with this
|
||||
* situation regardless of any perceived atomicity.
|
||||
*/
|
||||
return ktail - *sq.Head
|
||||
return ktail - *sq._Head()
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -245,7 +246,7 @@ func (ring *IoUring) __io_uring_submit_timeout(waitNr uint32, ts *syscall.Timesp
|
|||
}
|
||||
|
||||
PrepTimeout(sqe, ts, waitNr, 0)
|
||||
sqe.UserData = LIBURING_UDATA_TIMEOUT
|
||||
sqe.UserData.SetUint64(LIBURING_UDATA_TIMEOUT)
|
||||
ret = int(ring.__io_uring_flush_sq())
|
||||
return
|
||||
}
|
||||
|
@ -355,7 +356,7 @@ func (ring *IoUring) io_uring_get_sqe() *IoUringSqe {
|
|||
*/
|
||||
func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
|
||||
sq := &ring.Sq
|
||||
var head = atomic.LoadUint32(sq.Head)
|
||||
var head = atomic.LoadUint32(sq._Head())
|
||||
var next = sq.SqeTail + 1
|
||||
var shift uint32 = 0
|
||||
|
||||
|
@ -363,8 +364,8 @@ func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
|
|||
shift = 1
|
||||
}
|
||||
|
||||
if next-head <= *sq.RingEntries {
|
||||
sqe = ioUringSqeArray_Index(sq.Sqes, uintptr((sq.SqeTail&*sq.RingMask)<<shift))
|
||||
if next-head <= *sq._RingEntries() {
|
||||
sqe = ioUringSqeArray_Index(sq.Sqes, uintptr((sq.SqeTail&*sq._RingMask())<<shift))
|
||||
sq.SqeTail = next
|
||||
return
|
||||
}
|
||||
|
@ -374,14 +375,15 @@ func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
|
|||
}
|
||||
|
||||
func (ring *IoUring) io_uring_cq_ready() uint32 {
|
||||
return atomic.LoadUint32(ring.Cq.Tail) - *ring.Cq.Head
|
||||
return atomic.LoadUint32(ring.Cq._Tail()) - *ring.Cq._Head()
|
||||
}
|
||||
|
||||
func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) error {
|
||||
var cqe *IoUringCqe
|
||||
var err int32 = 0
|
||||
var avail int
|
||||
var mask = *ring.Cq.RingMask
|
||||
|
||||
var mask = *ring.Cq._RingMask()
|
||||
var shift uint32 = 0
|
||||
|
||||
if ring.Flags&IORING_SETUP_CQE32 != 0 {
|
||||
|
@ -389,9 +391,10 @@ func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) e
|
|||
}
|
||||
|
||||
for {
|
||||
var tail = atomic.LoadUint32(ring.Cq.Tail)
|
||||
var head = *ring.Cq.Head
|
||||
var tail = atomic.LoadUint32(ring.Cq._Tail())
|
||||
var head = *ring.Cq._Head()
|
||||
|
||||
cqe = nil
|
||||
avail = int(tail - head)
|
||||
if avail < 1 {
|
||||
break
|
||||
|
@ -399,12 +402,14 @@ func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) e
|
|||
|
||||
cqe = ioUringCqeArray_Index(ring.Cq.Cqes, uintptr((head&mask)<<shift))
|
||||
if ring.Features&IORING_FEAT_EXT_ARG == 0 &&
|
||||
cqe.UserData == LIBURING_UDATA_TIMEOUT {
|
||||
cqe.UserData.GetUint64() == LIBURING_UDATA_TIMEOUT {
|
||||
if cqe.Res < 0 {
|
||||
err = cqe.Res
|
||||
}
|
||||
ring.io_uring_cq_advance(1)
|
||||
if err != 0 {
|
||||
if err == 0 {
|
||||
// yields G
|
||||
runtime.Gosched()
|
||||
continue
|
||||
}
|
||||
cqe = nil
|
||||
|
@ -425,7 +430,7 @@ func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) e
|
|||
|
||||
func (ring *IoUring) io_uring_cq_advance(nr uint32) {
|
||||
if nr > 0 {
|
||||
atomic.StoreUint32(ring.Cq.Head, *ring.Cq.Head+nr)
|
||||
atomic.StoreUint32(ring.Cq._Head(), *ring.Cq._Head()+nr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
105
queue_test.go
105
queue_test.go
|
@ -4,11 +4,14 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"unsafe"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -25,6 +28,54 @@ func TestRingQueueGetSQE(t *testing.T) {
|
|||
fmt.Printf("%+#v\n", sqe)
|
||||
}
|
||||
|
||||
// func TestRingSqpollOnly(t *testing.T) {
|
||||
// h := testNewIoUringWithParams(t, 256, &IoUringParams{
|
||||
// Flags: IORING_SETUP_SQPOLL,
|
||||
// SqThreadCpu: 10, // ms
|
||||
// SqThreadIdle: 10_000,
|
||||
// })
|
||||
// for i := 0; i < 10; i++ {
|
||||
// sqe := h.GetSqe()
|
||||
// PrepNop(sqe)
|
||||
// }
|
||||
// h.Submit()
|
||||
// var cqe *IoUringCqe
|
||||
|
||||
// for {
|
||||
// h.WaitCqe(&cqe)
|
||||
// spew.Dump(cqe)
|
||||
// h.SeenCqe(cqe)
|
||||
// }
|
||||
// }
|
||||
|
||||
func TestRingQueueOrderRetrieval(t *testing.T) {
|
||||
const entries = 256
|
||||
h := testNewIoUring(t, entries, 0)
|
||||
defer h.Close()
|
||||
|
||||
var i uint64
|
||||
for i = 0; i < entries; i++ {
|
||||
sqe := h.GetSqe()
|
||||
PrepNop(sqe)
|
||||
sqe.UserData.SetUint64(i)
|
||||
sqe.Flags |= IOSQE_IO_LINK // ordered
|
||||
}
|
||||
|
||||
submitted, err := h.SubmitAndWait(entries)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int(entries), submitted)
|
||||
|
||||
var cqe *IoUringCqe
|
||||
for i = 0; i < entries; i++ {
|
||||
err = h.WaitCqe(&cqe)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cqe)
|
||||
require.Equal(t, i, cqe.UserData.GetUint64())
|
||||
h.SeenCqe(cqe)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
||||
type opt struct {
|
||||
name string
|
||||
|
@ -34,12 +85,16 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
p IoUringParams
|
||||
}
|
||||
ts := []opt{
|
||||
{"def-1-256", 1, 256, IoUringParams{}},
|
||||
{"def-128-256", 256, 256, IoUringParams{}}, // passed 128
|
||||
{"def-128-256", 256, 256, IoUringParams{}}, // passed 128
|
||||
{"def-8-256", 8, 256, IoUringParams{}},
|
||||
{"def-16-256", 16, 256, IoUringParams{}},
|
||||
{"def-32-256", 32, 256, IoUringParams{}},
|
||||
{"def-64-256", 64, 256, IoUringParams{}},
|
||||
{"def-128-256", 128, 256, IoUringParams{}},
|
||||
{"def-128+2-256", 128 + 2, 256, IoUringParams{}}, // passwd 128
|
||||
{"def-128+1-256", 128 + 1, 256, IoUringParams{}}, // passed 128
|
||||
{"def-128+2-256", 128 + 2, 256, IoUringParams{}}, // passed 128
|
||||
{"def-256-256", 256, 256, IoUringParams{}},
|
||||
|
||||
{"sqpoll-127-256", 127, 256, IoUringParams{Flags: IORING_SETUP_SQPOLL, SqThreadCpu: 4, SqThreadIdle: 10_000}},
|
||||
|
@ -56,16 +111,15 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
defer ftmp.Close()
|
||||
fdTemp := ftmp.Fd()
|
||||
|
||||
bufPool := sync.Pool{
|
||||
New: func() any {
|
||||
x := make([]byte, 0, 32)
|
||||
return &x
|
||||
},
|
||||
}
|
||||
|
||||
consumer := func(h *IoUring, ctx context.Context, wg *sync.WaitGroup) {
|
||||
var cqe *IoUringCqe
|
||||
var err error
|
||||
defer func() {
|
||||
rec := recover()
|
||||
if rec != nil {
|
||||
spew.Dump(cqe)
|
||||
}
|
||||
}()
|
||||
for ctx.Err() == nil {
|
||||
err = h.io_uring_wait_cqe(&cqe)
|
||||
if err == syscall.EINTR {
|
||||
|
@ -82,12 +136,15 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
if int(cqe.Res) < len("data ") {
|
||||
panic(fmt.Sprintf("write less that it should"))
|
||||
}
|
||||
if (cqe.UserData>>(8<<2))&0xff == 0x00 {
|
||||
if (cqe.UserData.GetUintptr()>>(8<<2))&0xff == 0x00 {
|
||||
panic(fmt.Sprintf("cqe userdata should contain canonical address got %+#v", cqe.UserData))
|
||||
}
|
||||
|
||||
// put back buf
|
||||
bufPool.Put((*[]byte)(unsafe.Pointer(uintptr(cqe.UserData))))
|
||||
bufPtr := (*[]byte)(cqe.UserData.GetUnsafe())
|
||||
buf := *bufPtr // deref check
|
||||
_ = buf
|
||||
// fmt.Printf("%+#v %s", buf, buf)
|
||||
|
||||
h.io_uring_cqe_seen(cqe) // necessary
|
||||
wg.Done()
|
||||
}
|
||||
|
@ -104,7 +161,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
t.Run("submit_single", func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
h := testNewIoUringWithParam(t, 256, &tc.p)
|
||||
h := testNewIoUringWithParams(t, 256, &tc.p)
|
||||
defer h.Close()
|
||||
|
||||
wg.Add(tc.jobCount)
|
||||
|
@ -121,23 +178,25 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
bufptr := bufPool.Get().(*[]byte)
|
||||
buf := (*bufptr)[:0]
|
||||
buf = append(buf, []byte(fmt.Sprintf("data %d\n", i))...)
|
||||
var buf = new([]byte)
|
||||
*buf = append(*buf, []byte(fmt.Sprintf("data %d\n", i))...)
|
||||
reflect.ValueOf(buf) // escape the `buf`
|
||||
|
||||
PrepWrite(sqe, int(fdTemp), &buf[0], len(buf), 0)
|
||||
sqe.UserData = uint64(uintptr(unsafe.Pointer(bufptr)))
|
||||
PrepWrite(sqe, int(fdTemp), &(*buf)[0], len((*buf)), 0)
|
||||
runtime.KeepAlive(buf)
|
||||
sqe.UserData.SetUnsafe(unsafe.Pointer(buf))
|
||||
|
||||
// submit
|
||||
submit(t, &tc.p, h, 1)
|
||||
}
|
||||
runtime.GC()
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
t.Run("submit_bulk", func(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
h := testNewIoUringWithParam(t, 256, &tc.p)
|
||||
h := testNewIoUringWithParams(t, 256, &tc.p)
|
||||
defer h.Close()
|
||||
|
||||
wg.Add(tc.jobCount)
|
||||
|
@ -152,15 +211,15 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
|
|||
continue
|
||||
}
|
||||
|
||||
bufptr := bufPool.Get().(*[]byte)
|
||||
buf := (*bufptr)[:0]
|
||||
buf = append(buf, []byte(fmt.Sprintf("data %d\n", i))...)
|
||||
buf := new([]byte)
|
||||
*buf = append(*buf, []byte(fmt.Sprintf("data %d\n", i))...)
|
||||
|
||||
PrepWrite(sqe, int(fdTemp), &buf[0], len(buf), 0)
|
||||
sqe.UserData = uint64(uintptr(unsafe.Pointer(bufptr)))
|
||||
PrepWrite(sqe, int(fdTemp), &(*buf)[0], len((*buf)), 0)
|
||||
sqe.UserData.SetUnsafe(unsafe.Pointer(buf))
|
||||
}
|
||||
|
||||
submit(t, &tc.p, h, tc.jobCount)
|
||||
runtime.GC()
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
|
|
|
@ -243,7 +243,7 @@ func (ring *IoUring) io_uring_register_ring_fd() (int, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
ring.EnterRingFd = int32(up.Offset)
|
||||
ring.EnterRingFd = int(up.Offset)
|
||||
ring.IntFlags |= INT_FLAG_REG_RING
|
||||
return ret, nil
|
||||
}
|
||||
|
|
48
setup.go
48
setup.go
|
@ -12,7 +12,7 @@ import (
|
|||
// }
|
||||
|
||||
func io_uring_queue_init_params(entries uint32, ring *IoUring, p *IoUringParams) error {
|
||||
fd, err := io_uring_setup(entries, p)
|
||||
fd, err := io_uring_setup(uintptr(entries), p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ func (ring *IoUring) io_uring_queue_exit() {
|
|||
if ring.Flags&IORING_SETUP_SQE128 != 0 {
|
||||
sqeSize += 64
|
||||
}
|
||||
munmap(uintptr(unsafe.Pointer(sq.Sqes)), sqeSize*uintptr(*sq.RingEntries))
|
||||
munmap(unsafe.Pointer(sq.Sqes), sqeSize*uintptr(*sq._RingEntries()))
|
||||
io_uring_unmap_rings(sq, cq)
|
||||
/*
|
||||
* Not strictly required, but frees up the slot we used now rather
|
||||
|
@ -49,13 +49,12 @@ func io_uring_queue_mmap(fd int, p *IoUringParams, ring *IoUring) error {
|
|||
return err
|
||||
}
|
||||
ring.Flags = p.Flags
|
||||
ring.RingFd, ring.EnterRingFd = int32(fd), int32(fd)
|
||||
ring.RingFd, ring.EnterRingFd = fd, fd
|
||||
ring.IntFlags = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err error) {
|
||||
|
||||
size := SizeofIoUringCqe
|
||||
if p.Flags&IORING_SETUP_CQE32 != 0 {
|
||||
size += SizeofIoUringCqe
|
||||
|
@ -71,7 +70,7 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
|
|||
// cq.RingSz = sq.RingSz
|
||||
}
|
||||
// alloc sq ring
|
||||
sq.RingPtr, err = mmap(0, uintptr(sq.RingSz),
|
||||
sq.RingPtr, err = mmap(nil, uintptr(sq.RingSz),
|
||||
syscall.PROT_READ|syscall.PROT_WRITE,
|
||||
syscall.MAP_SHARED|syscall.MAP_POPULATE,
|
||||
fd, IORING_OFF_SQ_RING)
|
||||
|
@ -83,7 +82,7 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
|
|||
cq.RingPtr = sq.RingPtr
|
||||
} else {
|
||||
// alloc cq ring
|
||||
cq.RingPtr, err = mmap(0, uintptr(cq.RingSz),
|
||||
cq.RingPtr, err = mmap(nil, uintptr(cq.RingSz),
|
||||
syscall.PROT_READ|syscall.PROT_WRITE,
|
||||
syscall.MAP_SHARED|syscall.MAP_POPULATE,
|
||||
fd, IORING_OFF_CQ_RING)
|
||||
|
@ -95,20 +94,20 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
|
|||
}
|
||||
|
||||
//sq
|
||||
sq.Head = (*uint32)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.Head)))
|
||||
sq.Tail = (*uint32)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.Tail)))
|
||||
sq.RingMask = (*uint32)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.RingMask)))
|
||||
sq.RingEntries = (*uint32)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.RingEntries)))
|
||||
sq.Flags = (*uint32)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.Flags)))
|
||||
sq.Dropped = (*uint32)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.Dropped)))
|
||||
sq.Array = (uint32Array)(unsafe.Pointer(sq.RingPtr + uintptr(p.SqOff.Array)))
|
||||
sq.head = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Head)))
|
||||
sq.tail = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Tail)))
|
||||
sq.ringMask = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.RingMask)))
|
||||
sq.ringEntries = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.RingEntries)))
|
||||
sq.flags = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Flags)))
|
||||
sq.dropped = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Dropped)))
|
||||
sq.Array = (uint32Array)(unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Array)))
|
||||
|
||||
size = SizeofIoUringSqe
|
||||
if p.Flags&IORING_SETUP_SQE128 != 0 {
|
||||
size += 64
|
||||
}
|
||||
var sqeAddr uintptr
|
||||
sqeAddr, err = mmap(0, size*uintptr(p.SqEntries),
|
||||
var sqeAddr unsafe.Pointer
|
||||
sqeAddr, err = mmap(nil, size*uintptr(p.SqEntries),
|
||||
syscall.PROT_READ|syscall.PROT_WRITE,
|
||||
syscall.MAP_SHARED|syscall.MAP_POPULATE,
|
||||
fd, IORING_OFF_SQES)
|
||||
|
@ -117,25 +116,24 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
|
|||
io_uring_unmap_rings(sq, cq)
|
||||
return
|
||||
}
|
||||
sq.Sqes = (ioUringSqeArray)(unsafe.Pointer(sqeAddr))
|
||||
sq.Sqes = (ioUringSqeArray)(sqeAddr)
|
||||
|
||||
//cq
|
||||
cq.Head = (*uint32)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.Head)))
|
||||
cq.Tail = (*uint32)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.Tail)))
|
||||
cq.RingMask = (*uint32)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.RingMask)))
|
||||
cq.RingEntries = (*uint32)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.RingEntries)))
|
||||
cq.Overflow = (*uint32)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.Overflow)))
|
||||
cq.Cqes = (ioUringCqeArray)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.Cqes)))
|
||||
cq.head = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Head)))
|
||||
cq.tail = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Tail)))
|
||||
cq.ringMask = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.RingMask)))
|
||||
cq.ringEntries = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.RingEntries)))
|
||||
cq.overflow = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Overflow)))
|
||||
cq.Cqes = (ioUringCqeArray)(unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Cqes)))
|
||||
if p.CqOff.Flags != 0 {
|
||||
cq.Flags = (*uint32)(unsafe.Pointer(cq.RingPtr + uintptr(p.CqOff.Flags)))
|
||||
cq.flags = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Flags)))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func io_uring_unmap_rings(sq *IoUringSq, cq *IoUringCq) error {
|
||||
munmap(sq.RingPtr, uintptr(sq.RingSz))
|
||||
if cq.RingPtr != 0 && cq.RingPtr != sq.RingPtr {
|
||||
if cq.RingPtr != nil && cq.RingPtr != sq.RingPtr {
|
||||
munmap(cq.RingPtr, uintptr(cq.RingSz))
|
||||
}
|
||||
return nil
|
||||
|
|
30
syscall.go
30
syscall.go
|
@ -5,29 +5,23 @@ import (
|
|||
"unsafe"
|
||||
)
|
||||
|
||||
const (
|
||||
// uring syscall no.
|
||||
|
||||
SYS_IO_URING_SETUP = 425
|
||||
SYS_IO_URING_ENTER = 426
|
||||
SYS_IO_URING_REGISTER = 427
|
||||
)
|
||||
|
||||
func io_uring_setup(entries uint32, params *IoUringParams) (ret int, err error) {
|
||||
r1, _, e1 := syscall.RawSyscall(SYS_IO_URING_SETUP, uintptr(entries), uintptr(unsafe.Pointer(params)), 0)
|
||||
func io_uring_setup(entries uintptr, params *IoUringParams) (ret int, err error) {
|
||||
r1, _, e1 := syscall.Syscall(SYS_IO_URING_SETUP, entries, uintptr(unsafe.Pointer(params)), 0)
|
||||
ret = int(r1)
|
||||
if e1 != 0 {
|
||||
if e1 < 0 {
|
||||
err = e1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func io_uring_enter(fd int32, toSubmit uint32, minComplete uint32, flags uint32, sig *Sigset_t) (ret int, err error) {
|
||||
func io_uring_enter(fd int, toSubmit uint32, minComplete uint32, flags uint32, sig *Sigset_t) (ret int, err error) {
|
||||
return io_uring_enter2(fd, toSubmit, minComplete, flags, sig, NSIG/8)
|
||||
}
|
||||
|
||||
func io_uring_enter2(fd int32, toSubmit uint32, minComplete uint32, flags uint32, sig *Sigset_t, sz int32) (ret int, err error) {
|
||||
r1, _, e1 := syscall.RawSyscall6(SYS_IO_URING_ENTER,
|
||||
// TODO: decide to use Syscall or RawSyscall
|
||||
|
||||
func io_uring_enter2(fd int, toSubmit uint32, minComplete uint32, flags uint32, sig *Sigset_t, sz int32) (ret int, err error) {
|
||||
r1, _, e1 := syscall.Syscall6(SYS_IO_URING_ENTER,
|
||||
uintptr(fd),
|
||||
uintptr(toSubmit), uintptr(minComplete),
|
||||
uintptr(flags), uintptr(unsafe.Pointer(sig)), uintptr(sz))
|
||||
|
@ -38,8 +32,8 @@ func io_uring_enter2(fd int32, toSubmit uint32, minComplete uint32, flags uint32
|
|||
return
|
||||
}
|
||||
|
||||
func io_uring_register(fd int32, opcode uint32, arg unsafe.Pointer, nrArgs uintptr) (ret int, err error) {
|
||||
r1, _, e1 := syscall.RawSyscall6(SYS_IO_URING_REGISTER, uintptr(fd), uintptr(opcode), uintptr(arg), uintptr(nrArgs), 0, 0)
|
||||
func io_uring_register(fd int, opcode uint32, arg unsafe.Pointer, nrArgs uintptr) (ret int, err error) {
|
||||
r1, _, e1 := syscall.Syscall6(SYS_IO_URING_REGISTER, uintptr(fd), uintptr(opcode), uintptr(arg), uintptr(nrArgs), 0, 0)
|
||||
ret = int(r1)
|
||||
if e1 != 0 {
|
||||
err = e1
|
||||
|
@ -48,10 +42,10 @@ func io_uring_register(fd int32, opcode uint32, arg unsafe.Pointer, nrArgs uintp
|
|||
}
|
||||
|
||||
//go:linkname mmap syscall.mmap
|
||||
func mmap(addr uintptr, length uintptr, prot int, flags int, fd int, offset int64) (xaddr uintptr, err error)
|
||||
func mmap(addr unsafe.Pointer, length uintptr, prot int, flags int, fd int, offset int64) (xaddr unsafe.Pointer, err error)
|
||||
|
||||
//go:linkname munmap syscall.munmap
|
||||
func munmap(addr uintptr, length uintptr) (err error)
|
||||
func munmap(addr unsafe.Pointer, length uintptr) (err error)
|
||||
|
||||
//
|
||||
|
||||
|
|
9
syscall_nr_amd64.go
Normal file
9
syscall_nr_amd64.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package gouring
|
||||
|
||||
const (
|
||||
// uring syscall no.
|
||||
|
||||
SYS_IO_URING_SETUP = 425
|
||||
SYS_IO_URING_ENTER = 426
|
||||
SYS_IO_URING_REGISTER = 427
|
||||
)
|
11
uring.go
11
uring.go
|
@ -13,6 +13,9 @@ func New(entries uint32, flags uint32) (*IoUring, error) {
|
|||
|
||||
func NewWithParams(entries uint32, params *IoUringParams) (*IoUring, error) {
|
||||
ring := &IoUring{}
|
||||
if params == nil {
|
||||
params = new(IoUringParams)
|
||||
}
|
||||
err := io_uring_queue_init_params(entries, ring, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -24,14 +27,18 @@ func (h *IoUring) Close() {
|
|||
h.io_uring_queue_exit()
|
||||
}
|
||||
|
||||
func (h *IoUring) GetSQE() *IoUringSqe {
|
||||
func (h *IoUring) GetSqe() *IoUringSqe {
|
||||
return h.io_uring_get_sqe()
|
||||
}
|
||||
|
||||
func (h *IoUring) WaitCQE(cqePtr **IoUringCqe) error {
|
||||
func (h *IoUring) WaitCqe(cqePtr **IoUringCqe) error {
|
||||
return h.io_uring_wait_cqe(cqePtr)
|
||||
}
|
||||
|
||||
func (h *IoUring) SeenCqe(cqe *IoUringCqe) {
|
||||
h.io_uring_cqe_seen(cqe)
|
||||
}
|
||||
|
||||
func (h *IoUring) Submit() (int, error) {
|
||||
return h.io_uringn_submit()
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ func BenchmarkQueueNop(b *testing.B) {
|
|||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
err = h.WaitCQE(&cqe)
|
||||
err = h.WaitCqe(&cqe)
|
||||
if err == syscall.EINTR {
|
||||
continue // ignore INTR
|
||||
} else if err != nil {
|
||||
|
@ -35,12 +35,14 @@ func BenchmarkQueueNop(b *testing.B) {
|
|||
if cqe.Res < 0 {
|
||||
panic(syscall.Errno(-cqe.Res))
|
||||
}
|
||||
|
||||
h.SeenCqe(cqe)
|
||||
}
|
||||
}
|
||||
|
||||
for _, tc := range ts {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
h := testNewIoUringWithParam(b, tc.entries, &tc.p)
|
||||
h := testNewIoUringWithParams(b, tc.entries, &tc.p)
|
||||
defer h.Close()
|
||||
var (
|
||||
j uint32
|
||||
|
@ -57,14 +59,14 @@ func BenchmarkQueueNop(b *testing.B) {
|
|||
for j = 0; j < tc.entries; j++ {
|
||||
for {
|
||||
// sqe could be nil if SQ is already full so we spin until we got one
|
||||
sqe = h.GetSQE()
|
||||
sqe = h.GetSqe()
|
||||
if sqe != nil {
|
||||
break
|
||||
}
|
||||
runtime.Gosched()
|
||||
}
|
||||
PrepNop(sqe)
|
||||
sqe.UserData = uint64(i + int(j))
|
||||
sqe.UserData.SetUint64(uint64(i + int(j)))
|
||||
}
|
||||
submitted, err = h.Submit()
|
||||
if err != nil {
|
||||
|
|
|
@ -21,7 +21,7 @@ func testNewIoUring(t genericTestingT, entries uint32, flags uint32) *IoUring {
|
|||
return h
|
||||
}
|
||||
|
||||
func testNewIoUringWithParam(t genericTestingT, entries uint32, p *IoUringParams) *IoUring {
|
||||
func testNewIoUringWithParams(t genericTestingT, entries uint32, p *IoUringParams) *IoUring {
|
||||
h, err := NewWithParams(entries, p)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, h)
|
||||
|
@ -45,7 +45,7 @@ func TestRingWrapper(t *testing.T) {
|
|||
}
|
||||
var off uint64 = 0
|
||||
for _, bs := range whatToWrite {
|
||||
sqe := h.GetSQE()
|
||||
sqe := h.GetSqe()
|
||||
PrepWrite(sqe, int(fd), &bs[0], len(bs), off)
|
||||
sqe.Flags = IOSQE_IO_LINK
|
||||
off = off + uint64(len(bs))
|
||||
|
|
|
@ -1,21 +1,88 @@
|
|||
package gouring
|
||||
|
||||
import "unsafe"
|
||||
import (
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type uint32Array *uint32
|
||||
type uint32Array = unsafe.Pointer // *uint32
|
||||
|
||||
func uint32Array_Index(u uint32Array, i uintptr) *uint32 {
|
||||
return (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + SizeofUint32*i))
|
||||
}
|
||||
|
||||
type ioUringSqeArray *IoUringSqe
|
||||
type ioUringSqeArray = unsafe.Pointer // *IoUringSqe
|
||||
|
||||
func ioUringSqeArray_Index(u ioUringSqeArray, i uintptr) *IoUringSqe {
|
||||
return (*IoUringSqe)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + SizeofIoUringSqe*i))
|
||||
}
|
||||
|
||||
type ioUringCqeArray *IoUringCqe
|
||||
//
|
||||
|
||||
type ioUringCqeArray = unsafe.Pointer // *IoUringCqe
|
||||
|
||||
func ioUringCqeArray_Index(u ioUringCqeArray, i uintptr) *IoUringCqe {
|
||||
return (*IoUringCqe)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + SizeofIoUringCqe*i))
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
type UserData [8]byte // uint64
|
||||
|
||||
func (u *UserData) SetUint64(v uint64) {
|
||||
putUintptr(unsafe.Pointer(u), uintptr(v))
|
||||
|
||||
}
|
||||
func (u *UserData) SetUintptr(v uintptr) {
|
||||
putUintptr(unsafe.Pointer(u), v)
|
||||
}
|
||||
func (u *UserData) SetUnsafe(ptr unsafe.Pointer) {
|
||||
putUnsafe(unsafe.Pointer(u), ptr)
|
||||
}
|
||||
|
||||
func (u UserData) GetUnsafe() unsafe.Pointer {
|
||||
return *(*unsafe.Pointer)(unsafe.Pointer(&u))
|
||||
}
|
||||
func (u UserData) GetUintptr() uintptr {
|
||||
return uintptr(u.GetUnsafe())
|
||||
}
|
||||
func (u UserData) GetUint64() uint64 {
|
||||
return uint64(u.GetUintptr())
|
||||
}
|
||||
func (u UserData) IsZero() bool {
|
||||
return u.GetUintptr() == 0
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
func putUnsafe(ptr unsafe.Pointer, v unsafe.Pointer) {
|
||||
*(*unsafe.Pointer)(ptr) = v
|
||||
}
|
||||
|
||||
func putUintptr(ptr unsafe.Pointer, v uintptr) {
|
||||
*(*uintptr)(ptr) = v
|
||||
}
|
||||
func putUint64(ptr unsafe.Pointer, v uint64) {
|
||||
*(*uint64)(ptr) = v
|
||||
}
|
||||
func putUint32(ptr unsafe.Pointer, v uint32) {
|
||||
*(*uint32)(ptr) = v
|
||||
}
|
||||
func putUint16(ptr unsafe.Pointer, v uint16) {
|
||||
*(*uint16)(ptr) = v
|
||||
}
|
||||
func putUint8(ptr unsafe.Pointer, v uint8) {
|
||||
*(*uint8)(ptr) = v
|
||||
}
|
||||
|
||||
func putInt64(ptr unsafe.Pointer, v int64) {
|
||||
*(*int64)(ptr) = v
|
||||
}
|
||||
func putInt32(ptr unsafe.Pointer, v int32) {
|
||||
*(*int32)(ptr) = v
|
||||
}
|
||||
func putInt16(ptr unsafe.Pointer, v int16) {
|
||||
*(*int16)(ptr) = v
|
||||
}
|
||||
func putInt8(ptr unsafe.Pointer, v int8) {
|
||||
*(*int8)(ptr) = v
|
||||
}
|
||||
|
|
44
util_ptr_arith_test.go
Normal file
44
util_ptr_arith_test.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package gouring
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"testing"
|
||||
"unsafe"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestUserdata(t *testing.T) {
|
||||
type test struct {
|
||||
v any
|
||||
exp uint64
|
||||
}
|
||||
ts := []test{
|
||||
{uint64(0), 0},
|
||||
{uint64(0xff), 0xff},
|
||||
{uint64(0xfffefd), 0xfffefd},
|
||||
{uintptr(0xcafeba), 0xcafeba},
|
||||
{unsafe.Pointer(nil), 0},
|
||||
}
|
||||
bo := binary.LittleEndian
|
||||
for _, tc := range ts {
|
||||
var u UserData
|
||||
switch v := tc.v.(type) {
|
||||
case uint64:
|
||||
u.SetUint64(v)
|
||||
case uintptr:
|
||||
u.SetUintptr(v)
|
||||
case unsafe.Pointer:
|
||||
u.SetUnsafe(v)
|
||||
default:
|
||||
panic(fmt.Sprintf("unhandled type: %T", v))
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.exp, u.GetUint64())
|
||||
|
||||
var exp [8]byte
|
||||
bo.PutUint64(exp[:], tc.exp)
|
||||
assert.Equal(t, exp[:], u[:])
|
||||
}
|
||||
}
|
|
@ -50,38 +50,3 @@ func (u *union16) Uint64() uint64 { return *(*uint64)(unsafe.Pointer(u))
|
|||
func (u *union16) Uint32() uint32 { return *(*uint32)(unsafe.Pointer(u)) }
|
||||
func (u *union16) Uint16() uint16 { return *(*uint16)(unsafe.Pointer(u)) }
|
||||
func (u *union16) Uint8() uint8 { return *(*uint8)(unsafe.Pointer(u)) }
|
||||
|
||||
//
|
||||
|
||||
func putUnsafe(ptr unsafe.Pointer, v unsafe.Pointer) {
|
||||
*(*unsafe.Pointer)(ptr) = v
|
||||
}
|
||||
|
||||
func putUintptr(ptr unsafe.Pointer, v uintptr) {
|
||||
*(*uintptr)(ptr) = v
|
||||
}
|
||||
func putUint64(ptr unsafe.Pointer, v uint64) {
|
||||
*(*uint64)(ptr) = v
|
||||
}
|
||||
func putUint32(ptr unsafe.Pointer, v uint32) {
|
||||
*(*uint32)(ptr) = v
|
||||
}
|
||||
func putUint16(ptr unsafe.Pointer, v uint16) {
|
||||
*(*uint16)(ptr) = v
|
||||
}
|
||||
func putUint8(ptr unsafe.Pointer, v uint8) {
|
||||
*(*uint8)(ptr) = v
|
||||
}
|
||||
|
||||
func putInt64(ptr unsafe.Pointer, v int64) {
|
||||
*(*int64)(ptr) = v
|
||||
}
|
||||
func putInt32(ptr unsafe.Pointer, v int32) {
|
||||
*(*int32)(ptr) = v
|
||||
}
|
||||
func putInt16(ptr unsafe.Pointer, v int16) {
|
||||
*(*int16)(ptr) = v
|
||||
}
|
||||
func putInt8(ptr unsafe.Pointer, v int8) {
|
||||
*(*int8)(ptr) = v
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue