1
0
Fork 0
mirror of https://github.com/ii64/gouring.git synced 2025-04-01 03:41:44 +02:00
This commit is contained in:
Xeffy Chen 2023-05-01 07:17:07 -07:00 committed by GitHub
commit 2c9c47f890
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 3645 additions and 1459 deletions

3
.gitignore vendored
View file

@ -1,4 +1,5 @@
.vscode/
*.cpu
*.mem
*.test
*.test
*.code-workspace

File diff suppressed because it is too large Load diff

Before

(image error) Size: 32 KiB

After

(image error) Size: 44 KiB

File diff suppressed because it is too large Load diff

Before

(image error) Size: 62 KiB

After

(image error) Size: 63 KiB

13
atomic.go Normal file
View file

@ -0,0 +1,13 @@
package gouring
import _ "unsafe"
var io_uring_smp_mb = io_uring_smp_mb_fallback
func io_uring_smp_mb_fallback()
func io_uring_smp_mb_mfence()
func init() {
// temporary
io_uring_smp_mb = io_uring_smp_mb_mfence
}

12
atomic_amd64.s Normal file
View file

@ -0,0 +1,12 @@
#include "go_asm.h"
#include "funcdata.h"
#include "textflag.h"
TEXT ·io_uring_smp_mb_fallback(SB), NOSPLIT, $0
LOCK
ORQ $0, 0(SP)
RET
TEXT ·io_uring_smp_mb_mfence(SB), NOSPLIT, $0
MFENCE
RET

15
examples/go.mod Normal file
View file

@ -0,0 +1,15 @@
module github.com/ii64/gouring/examples
go 1.20
replace github.com/ii64/gouring => ../
require (
github.com/ii64/gouring v0.0.0-00010101000000-000000000000
golang.org/x/sys v0.7.0
)
require (
github.com/alphadose/haxmap v1.2.0 // indirect
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
)

10
examples/go.sum Normal file
View file

@ -0,0 +1,10 @@
github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM=
github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=

174
examples/nvme/nvme.go Normal file
View file

@ -0,0 +1,174 @@
package main
import (
"fmt"
"syscall"
"unsafe"
uring "github.com/ii64/gouring"
nvme "github.com/ii64/gouring/nvme"
"golang.org/x/sys/unix"
)
// NOTICE NOTICE NOTICE NOTICE NOTICE
//
// This example is performing **READ** access to NVMe via low-level control device.
//
// NOTICE NOTICE NOTICE NOTICE NOTICE
var (
// hardcoded device path
// devicePath = "/dev/nvme0n1"
devicePath = "/dev/ng0n1"
nsid uint32
lbaSize uint32
lbaShift int
BS uint64 = 8192
)
func DoNvmeGetInfo(devPath string) error {
fd, err := unix.Open(devPath, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer func() {
if err := unix.Close(fd); err != nil {
panic(err)
}
}()
var (
ns nvme.NvmeIdNs
cmd nvme.NvmePassthruCmd
)
nsidRet, err := sys_ioctl(fd, uintptr(nvme.NVME_IOCTL_ID()), 0)
if err != nil {
return err
}
nsid = uint32(nsidRet)
cmd = nvme.NvmePassthruCmd{
Opcode: nvme.NVME_ADMIN_IDENTIFY,
Nsid: nsid,
Addr: uint64(uintptr(unsafe.Pointer(&ns))),
DataLen: nvme.NVME_IDENTIFY_DATA_SIZE,
Cdw10: nvme.NVME_IDENTIFY_CNS_NS,
Cdw11: nvme.NVME_CSI_NVM << nvme.NVME_IDENTIFY_CSI_SHIFT,
TimeoutMs: nvme.NVME_DEFAULT_IOCTL_TIMEOUT,
}
_, err = sys_ioctl(fd, uintptr(nvme.NVME_IOCTL_ADMIN_CMD()), uintptr(unsafe.Pointer(&cmd)))
if err != nil {
return err
}
lbaSize = 1 << ns.Lbaf[(ns.Flbas&0x0F)].Ds
lbaShift = ilog2(uint32(lbaSize))
return nil
}
func DoIoUring(devPath string) error {
ring, err := uring.New(64,
uring.IORING_SETUP_IOPOLL|
uring.IORING_SETUP_SQE128|uring.IORING_SETUP_CQE32)
if err != nil {
return err
}
defer ring.Close()
fd, err := unix.Open(devicePath, unix.O_RDONLY, 0) // 0 as it O_RDONLY
if err != nil {
panic(err)
}
defer unix.Close(fd)
var bufs [10][0x1000]byte
var sqe *uring.IoUringSqe
sqe = ring.GetSqe()
buf := bufs[1]
bufSz := len(buf)
uring.PrepRead(sqe, fd, &buf[0], bufSz, 0)
sqe.SetCmdOp(uint32(nvme.NVME_URING_CMD_IO()))
sqe.Opcode = uring.IORING_OP_URING_CMD
var off uint64 = 0
var i uint32 = 1
sqe.UserData.SetUint64(uint64(off<<32) | uint64(i)) // temp
var slba uint64 = off >> lbaShift
var nlb uint64 = BS>>lbaShift - 1
// zero and init
cmd := nvme.NvmeUringCmd{
Opcode: nvme.NVME_CMD_READ,
// cdw10 and cdw11 represent starting lba
Cdw10: uint32(slba & 0xffff_ffff),
Cdw11: uint32(slba >> 32),
// represent number of lba's for read/write
Cdw12: uint32(nlb),
Nsid: nsid,
Addr: uint64(uintptr(unsafe.Pointer(&buf[0]))),
DataLen: uint32(bufSz),
}
cmdPtr := (*nvme.NvmeUringCmd)(sqe.GetCmd())
*cmdPtr = cmd // copy
fmt.Printf("CMD %+#v\n", cmdPtr)
submitted, err := ring.SubmitAndWait(1)
if err != nil {
return err
}
fmt.Println("submitted", submitted)
var cqe *uring.IoUringCqe
// for i := 0; i < 2; i++ {
if err := ring.WaitCqe(&cqe); err != nil {
return err
}
fmt.Printf("CQE:\t%+#v\n", cqe)
cqeExtra := (*[2]uint64)(cqe.GetBigCqe())
fmt.Printf("CQE Extra:\t%+#v\n", cqeExtra)
fmt.Printf("Buffer: %+#v\n", buf)
fmt.Printf("=========\n")
ring.SeenCqe(cqe)
// }
return nil
}
func main() {
err := DoNvmeGetInfo(devicePath)
if err != nil {
panic(err)
}
fmt.Printf("lbaSize: %d lbaShift: %d\n", lbaSize, lbaShift)
if err := DoIoUring(devicePath); err != nil {
panic(err)
}
}
func sys_ioctl(fd int, a1, a2 uintptr) (int, error) {
r1, _, err := syscall.Syscall(syscall.SYS_IOCTL,
uintptr(fd), a1, a2)
if err != 0 {
return 0, err
}
return int(r1), nil
}
func ilog2(i uint32) int {
log := -1
for i > 0 {
i >>= 1
log++
}
return log
}

View file

@ -0,0 +1,132 @@
package main
import (
"bytes"
"fmt"
"reflect"
"runtime"
"syscall"
"time"
"unsafe"
uring "github.com/ii64/gouring"
"golang.org/x/sys/unix"
)
func main() {
ring, err := uring.New(64, 0)
if err != nil {
panic(err)
}
defer ring.Close()
fd, err := unix.MemfdCreate("mymemfd", unix.MFD_CLOEXEC)
if err != nil {
panic(err)
}
defer unix.Close(fd)
const BSIZE = 512
unix.Ftruncate(fd, BSIZE)
addr, err := mmap(nil, BSIZE, syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC, syscall.MAP_SHARED, fd, 0)
if err != nil {
panic(err)
}
defer munmap(addr, BSIZE)
var rbuf []byte
sh := (*reflect.SliceHeader)(unsafe.Pointer(&rbuf))
sh.Data = uintptr(addr)
sh.Cap = BSIZE
sh.Len = BSIZE
tnow := func() string { return fmt.Sprintf("CLOCK:%d\n", time.Now().UnixMilli()) }
go func() {
flen := len(tnow())
// monitor written bytes
for {
// copy
payload := string(rbuf[:flen])
fmt.Printf("> %q\n", payload)
time.Sleep(time.Millisecond * 50)
}
}()
var buf [BSIZE]byte
refresh := func() int {
b := []byte(tnow())
copy(buf[:], b)
return len(b)
}
qWrite := func() {
sqe := ring.GetSqe()
uring.PrepWrite(sqe, fd, &buf[0], refresh(), 0)
sqe.UserData.SetUint64(0xaaaaaaaa)
}
qRead := func() {
sqe := ring.GetSqe()
uring.PrepRead(sqe, fd, &buf[0], len(buf), 0)
sqe.UserData.SetUint64(0xbbbbbbbb)
}
qWrite()
submitted, err := ring.SubmitAndWait(1)
if err != nil {
panic(err)
}
println("submitted:", submitted)
var cqe *uring.IoUringCqe
for {
err = ring.WaitCqe(&cqe)
switch err {
case syscall.EINTR, syscall.EAGAIN, syscall.ETIME:
runtime.Gosched()
continue
case nil:
goto cont
default:
panic(err)
}
cont:
switch cqe.UserData {
case 0xaaaaaaaa:
qRead()
case 0xbbbbbbbb:
// verify
if !bytes.Equal(buf[:], rbuf) {
panic("check failed")
}
qWrite()
}
ring.SeenCqe(cqe)
submitted, err := ring.Submit()
if err != nil {
panic(err)
} else {
_ = submitted
// println("submitted:", submitted)
}
}
}
//go:linkname mmap syscall.mmap
func mmap(addr unsafe.Pointer, length uintptr, prot int, flags int, fd int, offset int64) (xaddr unsafe.Pointer, err error)
//go:linkname munmap syscall.munmap
func munmap(addr unsafe.Pointer, length uintptr) (err error)
func msync(addr unsafe.Pointer, length uintptr, flags uintptr) error {
r1, _, e1 := syscall.Syscall(syscall.SYS_MSYNC, uintptr(addr), length, flags)
if e1 != 0 {
return syscall.Errno(e1)
}
_ = r1
return nil
}

View file

@ -0,0 +1,84 @@
package main
import (
"bytes"
"fmt"
"log"
"syscall"
uring "github.com/ii64/gouring"
"golang.org/x/sys/unix"
)
func main() {
ring, err := uring.New(256, 0)
if err != nil {
log.Fatal(err)
}
defer ring.Close()
fd, err := unix.Open("/tmp/test", unix.O_RDWR|unix.O_CREAT, 0677)
if err != nil {
panic(err)
}
sqe := ring.GetSqe()
b := []byte("hello from io_uring!\n")
uring.PrepWrite(sqe, fd, &b[0], len(b), 0)
sqe.UserData.SetUint64(0x0001)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe = ring.GetSqe()
uring.PrepWrite(sqe, syscall.Stdout, &b[0], len(b), 0)
sqe.UserData.SetUint64(0x0002)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe = ring.GetSqe()
var buf = make([]byte, len(b))
uring.PrepRead(sqe, fd, &buf[0], len(buf), 0)
sqe.UserData.SetUint64(0x0003)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe = ring.GetSqe()
uring.PrepClose(sqe, fd)
sqe.UserData.SetUint64(0x0004)
const N = 4
submitted, err := ring.SubmitAndWait(N)
if err != nil { /*...*/
log.Fatal(err)
}
println(submitted) // 1
var cqe *uring.IoUringCqe
for i := 1; i <= N; i++ {
err = ring.WaitCqe(&cqe)
if err != nil {
log.Fatal(err)
} // check also EINTR
if cqe == nil {
log.Fatal("CQE is NULL!")
}
log.Printf("CQE: %+#v\n", cqe)
if uring.UserData(i) != cqe.UserData {
panic("UNORDERED")
}
if cqe.Res < 0 {
panic(syscall.Errno(-cqe.Res))
}
if i == 0x0003 {
retvb := buf[:cqe.Res]
fmt.Printf("retv buf %+#v\n", retvb)
if !bytes.Equal(b, retvb) {
panic("RET BUF NOT EQ")
}
}
ring.SeenCqe(cqe)
}
_ = cqe.UserData
_ = cqe.Res
_ = cqe.Flags
}

View file

@ -0,0 +1,438 @@
package lib
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/alphadose/haxmap"
uring "github.com/ii64/gouring"
)
type Context interface {
Read()
Write(buf []byte)
Close()
SetContext(ctx context.Context)
Context() context.Context
}
type EventHandler interface {
OnAccept(ctx Context, sa syscall.Sockaddr)
OnRead(ctx Context, buf []byte)
OnWrite(ctx Context, nb int)
OnClose(ctx Context)
}
type eventContext struct {
evloop *Eventloop
ud *myUserdata
}
func (e *eventContext) SetContext(ctx context.Context) {
e.ud.ctx = ctx
}
func (e *eventContext) Context() context.Context {
return e.ud.ctx
}
func (e eventContext) Read() {
key, lud := e.evloop.allocUserdata()
sqe := e.evloop.queueRead(e.ud.fd, key)
lud.init(sqe.Opcode)
e.ud.copyTo(lud)
sqe.UserData = key
}
func (e eventContext) Write(b []byte) {
key, lud := e.evloop.allocUserdata()
sqe := e.evloop.queueWrite(e.ud.fd, key, b)
lud.init(sqe.Opcode)
e.ud.copyTo(lud)
sqe.UserData = key
}
func (e eventContext) Close() {
key, lud := e.evloop.allocUserdata()
sqe := e.evloop.queueClose(e.ud.fd, key)
lud.init(sqe.Opcode)
e.ud.copyTo(lud)
sqe.UserData = key
}
type myUserdata struct {
ctx context.Context
rsa syscall.RawSockaddrAny
rsaSz uintptr
fd int
flag uint64
bid int // buffer id
op uring.IoUringOp
}
func (ud *myUserdata) init(op uring.IoUringOp) {
ud.op = op
ud.rsaSz = unsafe.Sizeof(ud.rsa)
}
func (ud *myUserdata) copyTo(dst *myUserdata) {
oldOp := dst.op
*dst = *ud
dst.op = oldOp
}
type Eventloop struct {
ring *uring.IoUring
fd int
bufSize, bufCount int
buffers []byte
handler EventHandler
sema *uint64
userdata *haxmap.Map[uring.UserData, *myUserdata]
bufGroup uint16
}
func New(ent uint32, listenFd int, handler EventHandler) *Eventloop {
ring, err := uring.NewWithParams(ent, &uring.IoUringParams{
Flags: uring.IORING_SETUP_SQPOLL,
SqThreadIdle: 2000,
})
if err != nil {
panic(err)
}
bufSize := 0x1000
bufCount := 2048
var bufGroup uint16 = 0xffff
evloop := &Eventloop{
fd: listenFd,
ring: ring,
bufSize: bufSize,
bufCount: bufCount,
bufGroup: bufGroup,
buffers: make([]byte, bufCount*bufSize),
userdata: haxmap.New[uring.UserData, *myUserdata](),
handler: handler,
sema: new(uint64),
}
if err := evloop.init(); err != nil {
panic(err)
}
go evloop.runDebugInterface()
return evloop
}
func (e *Eventloop) runDebugInterface() {
mux := http.NewServeMux()
mux.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) {
dec := json.NewDecoder(r.Body)
defer r.Body.Close()
type SubmitRequest struct {
EventName string `json:"en"`
Fd int `json:"fd"`
Data any `json:"d"`
}
var req SubmitRequest
if err := dec.Decode(&req); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Printf("debug iface error: %s\n", err)
return
}
cx := &eventContext{
evloop: e,
ud: &myUserdata{
ctx: r.Context(),
fd: req.Fd,
},
}
switch strings.ToUpper(strings.Trim(req.EventName, " ")) {
case "WRITE":
cx.Write([]byte("DEBUG!\n"))
case "READ":
cx.Read()
case "CLOSE":
cx.Close()
}
if _, err := e.ring.Submit(); err != nil {
goto errInvalid
}
w.WriteHeader(http.StatusOK)
return
errInvalid:
w.WriteHeader(http.StatusInternalServerError)
})
srv := &http.Server{
Addr: ":19110",
Handler: mux,
}
if err := srv.ListenAndServe(); err != nil {
panic(err)
}
}
var udPool = sync.Pool{
New: func() any {
return new(myUserdata)
},
}
func (e *Eventloop) allocUserdata() (key uring.UserData, val *myUserdata) {
// val = udPool.Get().(*myUserdata)
val = new(myUserdata)
key.SetUnsafe(unsafe.Pointer(val))
e.userdata.Set(key, val)
return
}
func (e *Eventloop) freeUserdata(key uring.UserData) {
// v, exist := e.userdata.Get(key)
// if exist {
// udPool.Put(v)
e.userdata.Del(key)
// }
}
func (e *Eventloop) getBuf(bid int) []byte {
start := e.bufSize * bid
end := start + e.bufSize
return e.buffers[start:end]
}
func (e *Eventloop) init() error {
// queue accept mshot
sqe := e.ring.GetSqe()
key, ud := e.allocUserdata()
uring.PrepAcceptMultishot(sqe, e.fd, &ud.rsa, &ud.rsaSz, 0)
ud.init(sqe.Opcode)
sqe.UserData = key
// queue init provide buffers
sqe = e.ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&e.buffers[0]), e.bufSize, e.bufCount, e.bufGroup, 0)
// wait for init provide buffers
submitted, err := e.ring.SubmitAndWait(1)
if err != nil {
return err
}
if submitted != 2 {
panic("MUST submit 2 sqes")
}
var cqe *uring.IoUringCqe
if err = e.ring.WaitCqe(&cqe); err != nil {
return err
}
if cqe.Res < 0 {
err = syscall.Errno(-cqe.Res)
return err
}
e.ring.SeenCqe(cqe)
return nil
}
func (e *Eventloop) queueProvideBuffer(bid int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:PRV_BUF\n")
sqe := e.ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&e.getBuf(bid)[0]), e.bufSize, 1, e.bufGroup, bid)
sqe.UserData = ud
sqe.Flags |= uring.IOSQE_IO_LINK
return sqe
}
func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:READ fd=%d\n", fd)
sqe := e.ring.GetSqe()
uring.PrepRead(sqe, fd, nil, e.bufSize, 0)
sqe.Flags |= uring.IOSQE_BUFFER_SELECT
sqe.Flags |= uring.IOSQE_IO_LINK
sqe.SetBufGroup(e.bufGroup)
sqe.UserData = ud
return sqe
}
func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:WRITE fd=%d\n", fd)
sqe := e.ring.GetSqe()
uring.PrepWrite(sqe, fd, &buf[0], len(buf), 0)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe.UserData = ud
return sqe
}
func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:CLOSE fd=%d\n", fd)
sqe := e.ring.GetSqe()
uring.PrepClose(sqe, fd)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe.UserData = ud
return sqe
}
func (e *Eventloop) queueGraceShutdownNop() *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:GRACE_SHUTDOWN\n")
sqe := e.ring.GetSqe()
uring.PrepNop(sqe)
sqe.Flags |= uring.IOSQE_IO_LINK
key, ud := e.allocUserdata()
ud.init(sqe.Opcode)
ud.fd = e.fd
ud.flag = 0xDEAD_DEAD_DEAD_DEAD
sqe.UserData = key
return sqe
}
func (e *Eventloop) Run() {
var cqe *uring.IoUringCqe
var err error
var i = 0
var pokeCounter uint64 = 0
qEventPoke := func() {
sqe := e.ring.GetSqe()
uring.PrepNop(sqe)
sqe.UserData = 0xBEEF_BEEF_BEEF_BEEF
atomic.AddUint64(&pokeCounter, 1)
if _, err := e.ring.Submit(); err != nil {
panic(err)
}
}
if false {
go func() {
for {
fmt.Printf("[MON] POKE COUNT: %d\n", pokeCounter)
time.Sleep(time.Second * 2)
}
}()
qEventPoke()
}
for {
if atomic.CompareAndSwapUint64(e.sema, 1, 0) {
break
}
if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR || err == syscall.EAGAIN || err == syscall.ETIME {
fmt.Printf("[DEBUG][%d] CQE WAIT err=%+#v\n", i, err)
runtime.Gosched() // relax, do other thing while waiting for IO
continue
} else if err != nil {
panic(err)
}
if cqe.UserData == 0xBEEF_BEEF_BEEF_BEEF {
e.ring.SeenCqe(cqe)
qEventPoke()
continue
}
ctx := &eventContext{
evloop: e,
}
ud, ok := e.userdata.Get(cqe.UserData)
if !ok {
goto skip_no_submit
}
ctx.ud = ud
switch ud.op {
case uring.IORING_OP_NOP:
switch {
case e.fd == ud.fd && ud.flag == 0xDEAD_DEAD_DEAD_DEAD:
break
}
case uring.IORING_OP_ACCEPT:
fmt.Printf("[DEBUG][%d] EV:ACCEPT\n", i)
var sa syscall.Sockaddr
sa, err = anyToSockaddr(&ud.rsa)
if err != nil {
panic(err)
}
fd := cqe.Res
if fd < 0 {
goto skip_no_submit
}
ud.fd = int(fd)
e.handler.OnAccept(ctx, sa)
case uring.IORING_OP_READ:
fmt.Printf("[DEBUG][%d] EV:READ\n", i)
if !(cqe.Flags&uring.IORING_CQE_F_BUFFER != 0) {
panic("MUST PROVIDE BUFFER")
}
nb := cqe.Res
bid := uint16(cqe.Flags >> 16)
if cqe.Res <= 0 {
e.queueClose(ud.fd, cqe.UserData)
} else {
e.handler.OnRead(ctx, e.getBuf(int(bid))[:nb])
}
e.queueProvideBuffer(int(bid), 0)
case uring.IORING_OP_WRITE:
fmt.Printf("[DEBUG][%d] EV:WRITE\n", i)
e.handler.OnWrite(ctx, int(cqe.Res))
case uring.IORING_OP_CLOSE:
fmt.Printf("[DEBUG][%d] EV:CLOSE\n", i)
e.handler.OnClose(ctx)
}
if ud.op != uring.IORING_OP_ACCEPT { // don't remove mshot UD
e.freeUserdata(cqe.UserData)
}
e.reportSubmit(-1)
if submitted, err := e.ring.Submit(); err != nil {
panic(err)
} else {
_ = submitted
e.reportSubmit(submitted)
}
skip_no_submit:
e.ring.SeenCqe(cqe)
i++
}
}
func (e *Eventloop) reportSubmit(submitted int) {
sq := &e.ring.Sq
p := (*[2]unsafe.Pointer)(unsafe.Pointer(sq))
khead := p[0]
ktail := p[1]
fmt.Printf("submitted: %d ht=(%d,%d) kht=(%p,%p)\n", submitted,
sq.SqeHead, sq.SqeTail,
khead, ktail)
}
func (e *Eventloop) Stop() error {
// mark spin to stop
atomic.StoreUint64(e.sema, 1)
// if the spin waiting for an event
// submit NOP with flag
e.queueGraceShutdownNop()
if _, err := e.ring.Submit(); err != nil {
return err
}
return nil
}
func (e *Eventloop) Close() {
e.ring.Close()
}
//go:linkname anyToSockaddr syscall.anyToSockaddr
func anyToSockaddr(rsa *syscall.RawSockaddrAny) (syscall.Sockaddr, error)
//go:linkname sockaddrToTCP net.sockaddrToTCP
func sockaddrToTCP(sa syscall.Sockaddr) net.Addr

View file

@ -0,0 +1,153 @@
package main
import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/ii64/gouring/examples/simple-eventloop/lib"
"golang.org/x/sys/unix"
)
type myEchoServer struct{}
func (h myEchoServer) OnAccept(ctx lib.Context, sa syscall.Sockaddr) {
fmt.Printf("accept: %+#v\n", sa)
ctx.SetContext(context.Background())
ctx.Read()
}
func (h myEchoServer) OnRead(ctx lib.Context, b []byte) {
sctx := ctx.Context()
fmt.Printf("read ctx %+#v %+#v\n", sctx, b)
ctx.Write(b)
}
func (h myEchoServer) OnWrite(ctx lib.Context, nb int) {
ctx.Read()
}
func (h myEchoServer) OnClose(ctx lib.Context) {
}
type myHTTP11Server struct{}
func (h myHTTP11Server) OnAccept(ctx lib.Context, sa syscall.Sockaddr) {
ctx.Read()
}
func (h myHTTP11Server) OnRead(ctx lib.Context, b []byte) {
statusCode := http.StatusOK
if !bytes.HasPrefix(b, []byte("GET /")) {
statusCode = 400
}
statusText := http.StatusText(statusCode)
header := []byte(fmt.Sprintf("HTTP/1.1 %d %s\r\nServer: gouring-simple-evloop\r\nConnection: closed\r\nContent-Length: %d\r\n\r\n",
statusCode, statusText,
len(b)))
buf := make([]byte, len(header)+len(b))
copy(buf[0:], header)
copy(buf[len(header):], b)
ctx.Write(buf)
}
func (h myHTTP11Server) OnWrite(ctx lib.Context, nb int) {
ctx.Close()
}
func (h myHTTP11Server) OnClose(ctx lib.Context) {
}
func runServer(wg *sync.WaitGroup, ctx context.Context, addr string, handler lib.EventHandler) {
defer wg.Done()
lis, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
defer lis.Close()
file, err := lis.(*net.TCPListener).File()
if err != nil {
panic(err)
}
defer file.Close()
fd := file.Fd()
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
evloop := lib.New(32, int(fd), handler)
defer evloop.Close()
go func() {
<-ctx.Done()
if err := evloop.Stop(); err != nil {
panic(err)
}
}()
evloop.Run()
}
func runClientEcho(ctx context.Context, id, serverAddr string) {
var c net.Conn
var err error
for {
if c, err = net.Dial("tcp", serverAddr); err == nil {
break
}
time.Sleep(time.Second)
}
defer c.Close()
var buf [512]byte
var nb int
i := 0
for ctx.Err() == nil {
c.SetReadDeadline(time.Now().Add(time.Second * 4))
var wnb int
payload := []byte(fmt.Sprintf("ECHO[%s]:%d", id, time.Now().UnixMilli()))
if wnb, err = c.Write(payload); err != nil {
fmt.Printf("CLIENT[%s] seq=%d WRITE err=%q\n", id, i, err)
// panic(err)
continue
}
if nb, err = c.Read(buf[:]); err != nil {
fmt.Printf("CLIENT[%s] seq=%d READ err=%q\n", id, i, err)
// panic(err)
continue
} else if wnb != nb {
panic("message size not equal")
}
b := buf[:nb]
if !bytes.Equal(payload, b) {
panic("message not equal")
}
fmt.Printf("CLIENT[%s] seq=%d: OK\n", id, i)
i++
}
}
func main() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go runServer(&wg, ctx, "0.0.0.0:11338", myEchoServer{})
// go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{})
for i := 0; i < 1; i++ {
go runClientEcho(ctx, strconv.Itoa(i), "0.0.0.0:11338")
}
<-sig
cancel()
wg.Wait()
}

View file

@ -0,0 +1,246 @@
package main
// Based from https://github.com/frevib/io_uring-echo-server/blob/a42497e4a7b1452329f6b2aa2cbcc25c2e422391/io_uring_echo_server.c
import (
"fmt"
"net"
"os"
"runtime"
"syscall"
"unsafe"
uring "github.com/ii64/gouring"
"golang.org/x/sys/unix"
)
const (
OP_ACCEPT = 1 << 0 // uring.IORING_OP_ACCEPT
OP_READ = 1 << 1 // uring.IORING_OP_READ
OP_WRITE = 1 << 2 // uring.IORING_OP_WRITE
OP_PRBUF = 1 << 3 // uring.IORING_OP_PROVIDE_BUFFERS
)
type MyUserdata struct {
Fd uint32
Flags uint16
BufID uint16
}
func UnpackUD(p uring.UserData) MyUserdata {
return *(*MyUserdata)(unsafe.Pointer(&p))
}
func (ud MyUserdata) PackUD() uring.UserData {
return *(*uring.UserData)(unsafe.Pointer(&ud))
}
func _SizeChecker() {
var x [1]struct{}
_ = x[unsafe.Sizeof(MyUserdata{})-unsafe.Sizeof(uring.UserData(0))]
}
func runServer() (err error) {
var ring *uring.IoUring
ring, err = uring.New(64, 0)
if err != nil {
return
}
defer ring.Close()
probe := ring.GetProbeRing()
fmt.Printf("probe: %+#v\n", probe)
var ln net.Listener
ln, err = net.Listen("tcp", "0.0.0.0:11337")
if err != nil {
return err
}
defer ln.Close()
var file *os.File
if file, err = ln.(*net.TCPListener).File(); err != nil {
return
}
defer file.Close()
fd := int(file.Fd())
if err = unix.SetNonblock(fd, false); err != nil {
return
}
var (
rsa syscall.RawSockaddrAny
rsaSz uintptr
)
rsaSz = unsafe.Sizeof(rsa)
const BUF_GID = 0xdead
const BUF_SIZE = 0x1000
const BUF_COUNT = 2048
UD_ACCEPT_MSHOT := MyUserdata{
Fd: uint32(fd),
Flags: OP_ACCEPT,
BufID: ^uint16(0),
}.PackUD()
var sqe *uring.IoUringSqe
var bufs [BUF_COUNT][BUF_SIZE]byte
var submitted int
// Q accept multishot
sqe = ring.GetSqe()
uring.PrepAcceptMultishot(sqe, fd, &rsa, &rsaSz, 0)
sqe.UserData = UD_ACCEPT_MSHOT
// Q init provide buffers
sqe = ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&bufs[0][0]), BUF_SIZE, BUF_COUNT, BUF_GID, 0)
queueRead := func(fd int, buf *byte, nb int) *uring.IoUringSqe {
sqe := ring.GetSqe()
uring.PrepRead(sqe, fd, buf, nb, 0)
return sqe
}
queueWrite := func(fd int, buf *byte, nb int) *uring.IoUringSqe {
sqe := ring.GetSqe()
uring.PrepWrite(sqe, fd, buf, nb, 0)
return sqe
}
queueProvideBuf := func(index uint16) *uring.IoUringSqe {
sqe := ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&bufs[index]), BUF_SIZE, 1, BUF_GID, int(index))
return sqe
}
_ = queueRead
_ = queueWrite
_ = queueProvideBuf
// wait 1 for provide buf
if submitted, err = ring.SubmitAndWait(1); err != nil {
return
}
fmt.Printf("Submitted: %d\n", submitted)
var cqe *uring.IoUringCqe
err = ring.WaitCqe(&cqe) // init provide buffer result
if err != nil {
panic(err)
} else if cqe.Res < 0 {
panic(syscall.Errno(-cqe.Res))
}
ring.SeenCqe(cqe)
for {
err = ring.WaitCqe(&cqe)
if err == syscall.EINTR {
runtime.Gosched()
continue
} else if err != nil {
return
}
ud := UnpackUD(cqe.UserData)
fmt.Printf("cqe=%+#v ud=%+#v\n", cqe, ud)
switch {
case cqe.Res == -int32(syscall.ENOBUFS):
panic("RAN OUT OF BUFFER!")
case ud.Flags == OP_PRBUF:
if cqe.Res < 0 {
panic(syscall.Errno(-cqe.Res))
}
case ud.Flags == OP_ACCEPT:
var sa syscall.Sockaddr
sa, err = anyToSockaddr(&rsa)
if err != nil {
panic(err)
}
fd := cqe.Res
fmt.Printf("CQE=%+#v rsaSz=%d sa=%+#v\n", cqe, rsaSz, sa)
if fd < 0 {
goto skip_no_submit
}
// Read from client socket
sqe = queueRead(int(fd), nil, BUF_COUNT)
sqe.Flags |= 0 |
uring.IOSQE_BUFFER_SELECT
sqe.SetBufGroup(BUF_GID)
sqe.UserData = MyUserdata{
Fd: uint32(fd),
Flags: OP_READ,
BufID: ^uint16(0),
}.PackUD()
case ud.Flags == OP_READ:
if !(cqe.Flags&uring.IORING_CQE_F_BUFFER != 0) {
panic("MUST PROVIDE BUFFER")
}
nb := cqe.Res
bid := uint16(cqe.Flags >> 16)
if cqe.Res <= 0 {
// read failed, re-add the buffer
sqe = queueProvideBuf(bid)
sqe.UserData = MyUserdata{
Fd: ud.Fd,
Flags: OP_PRBUF,
BufID: ^uint16(0),
}.PackUD()
// connection closed or error
syscall.Close(int(ud.Fd))
} else {
// bytes have been read into bufs, now add write to socket sqe
sqe = queueWrite(int(ud.Fd), &bufs[bid][0], int(nb))
sqe.UserData = MyUserdata{
Fd: ud.Fd,
Flags: OP_WRITE,
BufID: bid,
}.PackUD()
}
case ud.Flags == OP_WRITE:
// write has been completed, first re-add the buffer
sqe = queueProvideBuf(ud.BufID)
sqe.UserData = MyUserdata{
Fd: ud.Fd,
Flags: OP_PRBUF,
BufID: ^uint16(0),
}.PackUD()
// Read from client socket
sqe = queueRead(int(ud.Fd), nil, BUF_COUNT)
sqe.Flags |= 0 |
uring.IOSQE_BUFFER_SELECT
sqe.SetBufGroup(BUF_GID)
sqe.UserData = MyUserdata{
Fd: ud.Fd,
Flags: OP_READ,
BufID: ^uint16(0),
}.PackUD()
}
// skip:
if submitted, err = ring.Submit(); err != nil {
panic(err)
} else {
println("submitted", submitted)
}
skip_no_submit:
ring.SeenCqe(cqe)
}
}
func main() {
if err := runServer(); err != nil {
panic(err)
}
}
//go:linkname anyToSockaddr syscall.anyToSockaddr
func anyToSockaddr(rsa *syscall.RawSockaddrAny) (syscall.Sockaddr, error)
//go:linkname sockaddrToTCP net.sockaddrToTCP
func sockaddrToTCP(sa syscall.Sockaddr) net.Addr

1
go.mod
View file

@ -7,5 +7,6 @@ require github.com/stretchr/testify v1.7.0
require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.1.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)

2
go.sum
View file

@ -5,6 +5,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=

4
go.work Normal file
View file

@ -0,0 +1,4 @@
go 1.20
use ./examples
use ./

200
hdr.go
View file

@ -7,15 +7,26 @@
*/
package gouring
import "unsafe"
import (
"syscall"
"unsafe"
)
/*
* IO submission data structure (Submission Queue Entry)
*/
type IoUringSqe_Union1 uint64
func (u *IoUringSqe_Union1) SetOffset(v uint64) { *u = IoUringSqe_Union1(v) }
func (u *IoUringSqe_Union1) SetAddr2(v uint64) { *u = IoUringSqe_Union1(v) }
func (u *IoUringSqe_Union1) SetOffset(v uint64) { *u = IoUringSqe_Union1(v) }
func (u *IoUringSqe_Union1) SetOffset_RawPtr(v unsafe.Pointer) { *u = IoUringSqe_Union1((uintptr)(v)) }
func (u *IoUringSqe_Union1) SetAddr2(v uint64) { *u = IoUringSqe_Union1(v) }
func (u *IoUringSqe_Union1) SetAddr2_RawPtr(v unsafe.Pointer) { *u = IoUringSqe_Union1((uintptr)(v)) }
func (u *IoUringSqe_Union1) SetCmdOp(v uint32) {
(*struct {
CmdOp uint32
__pad1 uint32
})(unsafe.Pointer(u)).CmdOp = v
}
type IoUringSqe_Union2 uint64
@ -26,6 +37,7 @@ func (u *IoUringSqe_Union2) SetSpliceOffsetIn(v uint64) { *u = IoUringSqe_Union2
type IoUringSqe_Union3 uint32
func (u *IoUringSqe_Union3) SetRwFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetFsyncFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetPollEvents(v uint16) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetPoll32Events(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetSyncRangeFlags(v uint32) { *u = IoUringSqe_Union3(v) }
@ -41,6 +53,8 @@ func (u *IoUringSqe_Union3) SetRenameFlags(v uint32) { *u = IoUringSqe_Union3
func (u *IoUringSqe_Union3) SetUnlinkFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetHardlinkFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetXattrFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetMsgRingFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetUringCmdFlags(v uint32) { *u = IoUringSqe_Union3(v) }
func (u *IoUringSqe_Union3) SetOpFlags(v uint32) { *u = IoUringSqe_Union3(v) } //generic
func (u IoUringSqe_Union3) GetOpFlags() uint32 { return uint32(u) } //generic
@ -53,6 +67,21 @@ type IoUringSqe_Union5 uint32
func (u *IoUringSqe_Union5) SetSpliceFdIn(v int32) { *u = IoUringSqe_Union5(v) }
func (u *IoUringSqe_Union5) SetFileIndex(v uint32) { *u = IoUringSqe_Union5(v) }
func (u *IoUringSqe_Union5) SetAddrLen(v uint16) {
s := (*[2]uint16)(unsafe.Pointer(u))
s[0] = v // addr_len
// s[1] = 0 // __pad3[1]
}
type IoUringSqe_Union6 [2]uint64
func (u *IoUringSqe_Union6) SetAddr3(v uint64) { u[0] = v }
/*
* If the ring is initialized with IORING_SETUP_SQE128, then
* this field is used for 80 bytes of arbitrary command data
*/
func (u *IoUringSqe_Union6) GetCmd() unsafe.Pointer { return unsafe.Pointer(u) }
type IoUringSqe struct {
Opcode IoUringOp /* type of operation for this sqe */
@ -63,6 +92,10 @@ type IoUringSqe struct {
// union {
// __u64 off; /* offset into file */
// __u64 addr2;
// struct {
// __u32 cmd_op;
// __u32 __pad1;
// };
// };
IoUringSqe_Union1
@ -92,6 +125,8 @@ type IoUringSqe struct {
// __u32 unlink_flags;
// __u32 hardlink_flags;
// __u32 xattr_flags;
// __u32 msg_ring_flags;
// __u32 uring_cmd_flags;
// };
IoUringSqe_Union3
@ -109,14 +144,28 @@ type IoUringSqe struct {
/* personality to use, if used */
Personality uint16
// union {
// __s32 splice_fd_in;
// __u32 file_index;
// };
// union {
// __s32 splice_fd_in;
// __u32 file_index;
// struct {
// __u16 addr_len;
// __u16 __pad3[1];
// };
// };
IoUringSqe_Union5
Addr3 uint64
__pad2 [1]uint64
// union {
// struct {
// __u64 addr3;
// __u64 __pad2[1];
// };
// /*
// * If the ring is initialized with IORING_SETUP_SQE128, then
// * this field is used for 80 bytes of arbitrary command data
// */
// __u8 cmd[0];
// };
IoUringSqe_Union6
}
/*
@ -187,14 +236,22 @@ const IORING_SETUP_COOP_TASKRUN = (1 << 8)
* IORING_SQ_TASKRUN in the sq ring flags. Not valid with COOP_TASKRUN.
*/
const IORING_SETUP_TASKRUN_FLAG = (1 << 9)
const IORING_SETUP_SQE128 = (1 << 10) /* SQEs are 128 byte */
const IORING_SETUP_CQE32 = (1 << 11) /* CQEs are 32 byte */
/*
* Only one task is allowed to submit requests
*/
const IORING_SETUP_SINGLE_ISSUER = (1 << 12)
/*
* Defer running task work to get events.
* Rather than running bits of task work whenever the task transitions
* try to do it just before it is needed.
*/
const IORING_SETUP_DEFER_TASKRUN = (1 << 13)
type IoUringOp = uint8
//go:generate stringerx -type=IoUringOp
const (
IORING_OP_NOP IoUringOp = iota
IORING_OP_READV
@ -243,11 +300,20 @@ const (
IORING_OP_GETXATTR
IORING_OP_SOCKET
IORING_OP_URING_CMD
IORING_OP_SEND_ZC
IORING_OP_SENDMSG_ZC
/* this goes last, obviously */
IORING_OP_LAST
)
/*
* sqe->uring_cmd_flags
* IORING_URING_CMD_FIXED use registered buffer; pass thig flag
* along with setting sqe->buf_index.
*/
const IORING_URING_CMD_FIXED = (1 << 0)
/*
* sqe->fsync_flags
*/
@ -283,11 +349,14 @@ const SPLICE_F_FD_IN_FIXED = (1 << 31) /* the last bit of __u32 */
*
* IORING_POLL_UPDATE Update existing poll request, matching
* sqe->addr as the old user_data field.
*
* IORING_POLL_LEVEL Level triggered poll.
*/
const (
IORING_POLL_ADD_MULTI = (1 << 0)
IORING_POLL_UPDATE_EVENTS = (1 << 1)
IORING_POLL_UPDATE_USER_DATA = (1 << 2)
IORING_POLL_ADD_LEVEL = (1 << 3)
)
/*
@ -297,11 +366,13 @@ const (
* IORING_ASYNC_CANCEL_FD Key off 'fd' for cancelation rather than the
* request 'user_data'
* IORING_ASYNC_CANCEL_ANY Match any request
* IORING_ASYNC_CANCEL_FD_FIXED 'fd' passed in is a fixed descriptor
*/
const (
IORING_ASYNC_CANCEL_ALL = (1 << 0)
IORING_ASYNC_CANCEL_FD = (1 << 1)
IORING_ASYNC_CANCEL_ANY = (1 << 2)
IORING_ASYNC_CANCEL_ALL = (1 << 0)
IORING_ASYNC_CANCEL_FD = (1 << 1)
IORING_ASYNC_CANCEL_ANY = (1 << 2)
IORING_ASYNC_CANCEL_FD_FIXED = (1 << 3)
)
/*
@ -311,14 +382,40 @@ const (
* or receive and arm poll if that yields an
* -EAGAIN result, arm poll upfront and skip
* the initial transfer attempt.
* IORING_RECV_MULTISHOT Multishot recv. Sets IORING_CQE_F_MORE if
* the handler will continue to report
* CQEs on behalf of the same SQE.
*
* IORING_RECVSEND_FIXED_BUF Use registered buffers, the index is stored in
* the buf_index field.
*/
const IORING_RECVSEND_POLL_FIRST = (1 << 0)
const (
IORING_RECVSEND_POLL_FIRST = (1 << 0)
IORING_RECV_MULTISHOT = (1 << 1)
IORING_RECVSEND_FIXED_BUF = (1 << 2)
)
/*
* accept flags stored in sqe->ioprio
*/
const IORING_ACCEPT_MULTISHOT = (1 << 0)
/*
* IORING_OP_MSG_RING command types, stored in sqe->addr
*/
const (
IORING_MSG_DATA = iota /* pass sqe->len as 'res' and off as user_data */
IORING_MSG_SEND_FD /* send a registered fd to another ring */
)
/*
* IORING_OP_MSG_RING flags (sqe->msg_ring_flags)
*
* IORING_MSG_RING_CQE_SKIP Don't post a CQE to the target ring. Not
* applicable for IORING_MSG_DATA, obviously.
*/
const IORING_MSG_RING_CQE_SKIP = (1 << 0)
/*
* IO completion data structure (Completion Queue Entry)
*/
@ -332,8 +429,6 @@ type IoUringCqe struct {
* contains 16-bytes of padding, doubling the size of the CQE.
*/
// __u64 big_cqe[];
// 8+4+4 == 16 , correct
}
/*
@ -342,12 +437,15 @@ type IoUringCqe struct {
* IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID
* IORING_CQE_F_MORE If set, parent SQE will generate more CQE entries
* IORING_CQE_F_SOCK_NONEMPTY If set, more data to read after socket recv
* IORING_CQE_F_NOTIF Set for notification CQEs. Can be used to distinct
* them from sends.
*/
const (
IORING_CQE_F_BUFFER = (1 << 0)
IORING_CQE_F_MORE = (1 << 1)
IORING_CQE_F_SOCK_NONEMPTY = (1 << 2)
IORING_CQE_F_NOTIF = (1 << 3)
)
const (
@ -493,6 +591,12 @@ const (
IORING_REGISTER_PBUF_RING = 22
IORING_UNREGISTER_PBUF_RING = 23
/* sync cancelation API */
IORING_REGISTER_SYNC_CANCEL = 24
/* register a range of fixed file slots for automatic slot allocation */
IORING_REGISTER_FILE_ALLOC_RANGE = 25
/* this goes last */
IORING_REGISTER_LAST
)
@ -507,7 +611,7 @@ const (
type IoUringFilesUpdate struct {
Offset uint32
resv uint32
Fds uint64 // __aligned_u64/* __s32 * */
Fds uint64 // TODO: __aligned_u64/* __s32 * */
}
/*
@ -520,25 +624,38 @@ type IoUringRsrcRegister struct {
Nr uint32
Flags uint32
resv2 uint64
Data uint64 // __aligned_u64
Tags uint64 // __aligned_u64
Data uint64 // TODO: __aligned_u64
Tags uint64 // TODO: __aligned_u64
}
type IoUringRsrcUpdate struct {
Offset uint32
resv uint32
Data uint64 // __aligned_u64
Data uint64 // TODO: __aligned_u64
}
type IoUringRsrcUpdate2 struct {
Offset uint32
resv uint32
Data uint64 // __aligned_u64
Tags uint64 // __aligned_u64
Data uint64 // TODO: __aligned_u64
Tags uint64 // TODO: __aligned_u64
Nr uint32
resv2 uint32
}
type IoUringNotificationSlot struct {
tag uint64
resv [3]uint64
}
type IoUringNotificationRegister struct {
nr_slots uint32
resv uint32
resv2 uint64
data uint64
resv3 uint64
}
/* Skip updating fd indexes set to this value in the fd table */
const IORING_REGISTER_FILES_SKIP = (-2)
@ -553,10 +670,12 @@ type IoUringProbeOp struct {
type IoUringProbe struct {
last_op uint8 /* last opcode supported */
uint8 /* length of ops[] array below */
ops_len uint8 /* length of ops[] array below */
resv uint16
resv2 [3]uint32
ops [0]IoUringProbeOp
// IMPLEMENTED ON hdr_extra
// ops [0]IoUringProbeOp
}
type IoUringRestriction struct {
@ -590,6 +709,8 @@ type IoUringBufRing struct {
resv3 uint16
Tail uint16
}
// IMPLEMENTED ON hdr_extra
// bufs [0]IoUringBuf
// };
}
@ -630,6 +751,29 @@ type IoUringGeteventsArg struct {
}
/*
* accept flags stored in sqe->ioprio
* Argument for IORING_REGISTER_SYNC_CANCEL
*/
// const IORING_ACCEPT_MULTISHOT = (1 << 0)
type IouringSyncCancelReg struct {
Addr uint64
Fd int32
Flags uint32
timeout syscall.Timespec
pad [4]uint64
}
/*
* Argument for IORING_REGISTER_FILE_ALLOC_RANGE
* The range is specified as [off, off + len)
*/
type IoUringFileIndexRange struct {
Offset uint32
Len uint32
resv uint64
}
type IoUringRecvmsgOut struct {
Namelen uint32
Controllen uint32
Payloadlen uint32
Flags uint32
}

39
hdr_extra.go Normal file
View file

@ -0,0 +1,39 @@
package gouring
import (
"unsafe"
)
/*
* GetBigCqe
*
* If the ring is initialized with IORING_SETUP_CQE32, then this field
* contains 16-bytes of padding, doubling the size of the CQE.
*/
func (cqe *IoUringCqe) GetBigCqe() unsafe.Pointer {
return unsafe.Add(unsafe.Pointer(cqe), SizeofIoUringCqe)
}
/*
* GetOps
*
* Get io_uring probe ops
*/
func (probe *IoUringProbe) GetOps() unsafe.Pointer {
return unsafe.Add(unsafe.Pointer(probe), SizeofIoUringProbe)
}
func (probe *IoUringProbe) GetOpAt(index int) *IoUringProbeOp {
return (*IoUringProbeOp)(unsafe.Add(probe.GetOps(), SizeofIoUringProbeOp*uintptr(index)))
}
/*
* GetBufs
*
* Get io_uring buf_ring bufs
*/
func (bring *IoUringBufRing) GetBufs() unsafe.Pointer {
return unsafe.Add(unsafe.Pointer(bring), SizeofIoUringBufRing)
}
func (bring *IoUringBufRing) GetBufAt(index int) *IoUringBuf {
return (*IoUringBuf)(unsafe.Add(bring.GetBufs(), SizeofIoUringBuf*uintptr(index)))
}

View file

@ -3,12 +3,29 @@ package gouring
import "unsafe"
const (
SizeofUnsigned = unsafe.Sizeof(uint32(0))
SizeofUint32 = unsafe.Sizeof(uint32(0))
SizeofIoUringSqe = unsafe.Sizeof(IoUringSqe{})
SizeofIoUringCqe = unsafe.Sizeof(IoUringCqe{})
SizeofUnsigned = unsafe.Sizeof(uint32(0))
SizeofUint32 = unsafe.Sizeof(uint32(0))
SizeofIoUringSqe = unsafe.Sizeof(IoUringSqe{})
Align128IoUringSqe = 64
SizeofIoUringCqe = unsafe.Sizeof(IoUringCqe{})
Align32IoUringCqe = SizeofIoUringCqe
SizeofIoUringProbe = unsafe.Sizeof(IoUringProbe{})
SizeofIoUringProbeOp = unsafe.Sizeof(IoUringProbeOp{})
SizeofIoUringBufRing = unsafe.Sizeof(IoUringBufRing{})
SizeofIoUringBuf = unsafe.Sizeof(IoUringBuf{})
)
func _SizeChecker() {
var x [1]struct{}
_ = x[SizeofIoUringSqe-64]
_ = x[SizeofIoUringCqe-16]
_ = x[SizeofIoUringProbe-16]
_ = x[SizeofIoUringProbeOp-8]
_ = x[SizeofIoUringBufRing-16]
_ = x[SizeofIoUringBuf-16]
}
type IoUring struct {
Sq IoUringSq
Cq IoUringCq
@ -24,12 +41,12 @@ type IoUring struct {
}
type IoUringSq struct {
head unsafe.Pointer // *uint32
tail unsafe.Pointer // *uint32
ringMask unsafe.Pointer // *uint32
ringEntries unsafe.Pointer // *uint32
flags unsafe.Pointer // *uint32
dropped unsafe.Pointer // *uint32
khead unsafe.Pointer // *uint32
ktail unsafe.Pointer // *uint32
kringMask unsafe.Pointer // *uint32
kringEntries unsafe.Pointer // *uint32
kflags unsafe.Pointer // *uint32
kdropped unsafe.Pointer // *uint32
Array uint32Array //ptr arith
Sqes ioUringSqeArray //ptr arith
@ -40,35 +57,39 @@ type IoUringSq struct {
RingSz uint32
RingPtr unsafe.Pointer
pad [4]uint32
RingMask, RingEntries uint32
pad [2]uint32
}
func (sq *IoUringSq) _Head() *uint32 { return (*uint32)(sq.head) }
func (sq *IoUringSq) _Tail() *uint32 { return (*uint32)(sq.tail) }
func (sq *IoUringSq) _RingMask() *uint32 { return (*uint32)(sq.ringMask) }
func (sq *IoUringSq) _RingEntries() *uint32 { return (*uint32)(sq.ringEntries) }
func (sq *IoUringSq) _Flags() *uint32 { return (*uint32)(sq.flags) }
func (sq *IoUringSq) _Dropped() *uint32 { return (*uint32)(sq.dropped) }
func (sq *IoUringSq) _KHead() *uint32 { return (*uint32)(sq.khead) }
func (sq *IoUringSq) _KTail() *uint32 { return (*uint32)(sq.ktail) }
func (sq *IoUringSq) _KRingMask() *uint32 { return (*uint32)(sq.kringMask) }
func (sq *IoUringSq) _KRingEntries() *uint32 { return (*uint32)(sq.kringEntries) }
func (sq *IoUringSq) _KFlags() *uint32 { return (*uint32)(sq.kflags) }
func (sq *IoUringSq) _KDropped() *uint32 { return (*uint32)(sq.kdropped) }
type IoUringCq struct {
head unsafe.Pointer // *uint32
tail unsafe.Pointer // *uint32
ringMask unsafe.Pointer // *uint32
ringEntries unsafe.Pointer // *uint32
flags unsafe.Pointer // *uint32
overflow unsafe.Pointer // *uint32
khead unsafe.Pointer // *uint32
ktail unsafe.Pointer // *uint32
kringMask unsafe.Pointer // *uint32
kringEntries unsafe.Pointer // *uint32
kflags unsafe.Pointer // *uint32
koverflow unsafe.Pointer // *uint32
Cqes ioUringCqeArray //ptr arith
RingSz uint32
RingPtr unsafe.Pointer
pad [4]uint32
RingMask, RingEntries uint32
pad [2]uint32
}
func (cq *IoUringCq) _Head() *uint32 { return (*uint32)(cq.head) }
func (cq *IoUringCq) _Tail() *uint32 { return (*uint32)(cq.tail) }
func (cq *IoUringCq) _RingMask() *uint32 { return (*uint32)(cq.ringMask) }
func (cq *IoUringCq) _RingEntries() *uint32 { return (*uint32)(cq.ringEntries) }
func (cq *IoUringCq) _Flags() *uint32 { return (*uint32)(cq.flags) }
func (cq *IoUringCq) _Overflow() *uint32 { return (*uint32)(cq.overflow) }
func (cq *IoUringCq) _KHead() *uint32 { return (*uint32)(cq.khead) }
func (cq *IoUringCq) _KTail() *uint32 { return (*uint32)(cq.ktail) }
func (cq *IoUringCq) _KRingMask() *uint32 { return (*uint32)(cq.kringMask) }
func (cq *IoUringCq) _KRingEntries() *uint32 { return (*uint32)(cq.kringEntries) }
func (cq *IoUringCq) _KFlags() *uint32 { return (*uint32)(cq.kflags) }
func (cq *IoUringCq) _KOverflow() *uint32 { return (*uint32)(cq.koverflow) }

50
ioctl/hdr_ioctl.go Normal file
View file

@ -0,0 +1,50 @@
package ioctl
// Based on `ioctl.h`
const (
_IOC_NRBITS = 8
_IOC_TYPEBITS = 8
_IOC_SIZEBITS = 14 // OVERRIDE
_IOC_DIRBITS = 2 // OVERRIDE
_IOC_NRMASK = (1 << _IOC_NRBITS) - 1
_IOC_TYPEMASK = (1 << _IOC_TYPEBITS) - 1
_IOC_SIZEMASK = (1 << _IOC_SIZEBITS) - 1
_IOC_DIRMASK = (1 << _IOC_DIRBITS) - 1
_IOC_NRSHIFT = 0
_IOC_TYPESHIFT = (_IOC_NRSHIFT + _IOC_NRBITS)
_IOC_SIZESHIFT = (_IOC_TYPESHIFT + _IOC_TYPEBITS)
_IOC_DIRSHIFT = (_IOC_SIZESHIFT + _IOC_SIZEBITS)
_IOC_NONE = 0b00 // OVERRIDE
_IOC_WRITE = 0b01 // OVERRIDE
_IOC_READ = 0b10 // OVERRIDE
)
//go:nosplit
func IOC(dir, typ, nr, siz int) int {
return 0 |
(dir << _IOC_DIRSHIFT) |
(typ << _IOC_TYPESHIFT) |
(nr << _IOC_NRSHIFT) |
(siz << _IOC_SIZESHIFT)
}
/*
IO ops
*/
//go:nosplit
func IO(typ, nr int) int { return IOC(_IOC_NONE, typ, nr, 0) }
//go:nosplit
func IOR(typ, nr, siz int) int { return IOC(_IOC_READ, typ, nr, siz) }
//go:nosplit
func IOW(typ, nr, siz int) int { return IOC(_IOC_WRITE, typ, nr, siz) }
//go:nosplit
func IOWR(typ, nr, siz int) int { return IOC(_IOC_WRITE|_IOC_READ, typ, nr, siz) }

199
nvme/hdr_nvme.go Normal file
View file

@ -0,0 +1,199 @@
package nvme
import (
"unsafe"
"github.com/ii64/gouring/ioctl"
)
// Based on `nvme.h` w.r.t. `linux/nvme_ioctl.h`
const (
SizeofNvmeAdminCmd = unsafe.Sizeof(NvmeAdminCmd{})
SizeofNvmeUserIo = unsafe.Sizeof(NvmeUserIo{})
SizeofNvmePassthruCmd = unsafe.Sizeof(NvmePassthruCmd{})
SizeofNvmePassthruCmd64 = unsafe.Sizeof(NvmePassthruCmd64{})
SizeofNvmeUringCmd = unsafe.Sizeof(NvmeUringCmd{})
SizeofNvmeIdNs = unsafe.Sizeof(NvmeIdNs{})
SizeofNvmeLbaf = unsafe.Sizeof(NvmeLbaf{})
NVME_DEFAULT_IOCTL_TIMEOUT = 0
NVME_IDENTIFY_DATA_SIZE = 0x1000
NVME_IDENTIFY_CSI_SHIFT = 24
NVME_IDENTIFY_CNS_NS = 0
NVME_CSI_NVM = 0
)
func _SizeChecker() {
var x [1]struct{}
_ = x[SizeofNvmeAdminCmd-72]
_ = x[SizeofNvmeUserIo-48]
_ = x[SizeofNvmePassthruCmd-72]
_ = x[SizeofNvmePassthruCmd64-80]
_ = x[SizeofNvmeUringCmd-72]
_ = x[SizeofNvmeLbaf-4]
_ = x[SizeofNvmeIdNs-0x1000]
}
func NVME_IOCTL_ID() int { return ioctl.IO('N', 0x40) }
func NVME_IOCTL_ADMIN_CMD() int { return ioctl.IOWR('N', 0x41, int(SizeofNvmeAdminCmd)) }
func NVME_IOCTL_SUBMIT_IO() int { return ioctl.IOW('N', 0x42, int(SizeofNvmeUserIo)) }
func NVME_IOCTL_IO_CMD() int { return ioctl.IOR('N', 0x43, int(SizeofNvmePassthruCmd)) }
func NVME_IOCTL_RESET() int { return ioctl.IO('N', 0x44) }
func NVME_IOCTL_SUBSYS_RESET() int { return ioctl.IO('N', 0x45) }
func NVME_IOCTL_RESCAN() int { return ioctl.IO('N', 0x46) }
func NVME_IOCTL_ADMIN64_CMD() int { return ioctl.IOWR('N', 0x47, int(SizeofNvmePassthruCmd64)) }
func NVME_IOCTL_IO64_CMD() int { return ioctl.IOWR('N', 0x48, int(SizeofNvmePassthruCmd64)) }
func NVME_IOCTL_IO64_CMD_VEC() int { return ioctl.IOWR('N', 0x49, int(SizeofNvmePassthruCmd64)) }
func NVME_URING_CMD_IO() int { return ioctl.IOWR('N', 0x80, int(SizeofNvmeUringCmd)) }
func NVME_URING_CMD_IO_VEC() int { return ioctl.IOWR('N', 0x81, int(SizeofNvmeUringCmd)) }
func NVME_URING_CMD_ADMIN() int { return ioctl.IOWR('N', 0x82, int(SizeofNvmeUringCmd)) }
func NVME_URING_CMD_ADMIN_VEC() int { return ioctl.IOWR('N', 0x83, int(SizeofNvmeUringCmd)) }
// nvme_admin_opcode
const (
NVME_ADMIN_IDENTIFY = 0x06
)
// nvme_io_opcode
const (
NVME_CMD_WRITE = 0x01
NVME_CMD_READ = 0x02
)
type NvmeAdminCmd = NvmePassthruCmd
type NvmeUserIo struct {
Opcode uint8
Flags uint8
Control uint16
Nblocks uint16
Rsvd uint16
Metadata uint64
Addr uint64
Slba uint64
Dsmgmt uint32
Reftag uint32
Apptag uint16
Appmask uint16
_pad [4]byte
}
type NvmePassthruCmd struct {
Opcode uint8
Flags uint8
Rsvd1 uint16
Nsid uint32
Cdw2,
Cdw3 uint32
Metadata uint64
Addr uint64
MetadataLen uint32
DataLen uint32
Cdw10,
Cdw11,
Cdw12,
Cdw13,
Cdw14,
Cdw15 uint32
TimeoutMs uint32
Result uint32
}
type NvmePassthruCmd64_Union1 uint32
func (u *NvmePassthruCmd64_Union1) SetDataLen(v uint32) { *u = NvmePassthruCmd64_Union1(v) }
func (u *NvmePassthruCmd64_Union1) SetVecCount(v uint32) { *u = NvmePassthruCmd64_Union1(v) }
type NvmePassthruCmd64 struct {
Opcode uint8
Flags uint8
Rsvd1 uint16
Nsid uint32
Cdw2,
Cdw3 uint32
Metadata uint64
Addr uint64
MetadataLen uint32
// union {
// __u32 data_len; /* for non-vectored io */
// __u32 vec_cnt; /* for vectored io */
// };
NvmePassthruCmd64_Union1
Cdw10,
Cdw11,
Cdw12,
Cdw13,
Cdw14,
Cdw15 uint32
TimeoutMs uint32
Rsvd2 uint32
Result uint64
}
type NvmeUringCmd struct {
Opcode uint8
Flags uint8
Rsvd1 uint16
Nsid uint32
Cdw2, Cdw3 uint32
Metadata uint64
Addr uint64
MetadataLen uint32
DataLen uint32
Cdw10, Cdw11, Cdw12, Cdw13, Cdw14, Cdw15 uint32
TimeoutMs uint32
Rsvd2 uint32
}
type NvmeLbaf struct {
Ms uint16 // bo: Little
Ds uint8
Rp uint8
}
type NvmeIdNs struct {
Nsze,
Ncap,
Nuse uint64 // bo: Little
Nsfeat,
Nlbaf,
Flbas,
Mc,
Dpc,
Dps,
Nmic,
Rescap,
Fpi,
Dlfeat uint8
Nawun,
Nawupf,
Nacwu,
Nabsn,
Nabo,
Nabspf,
Noiob uint16 // bo: Little
Nvmcap [16]byte
Npwg,
Npwa,
Npdg,
Npda,
Nows uint16 // bo: Little
Msrl uint16 // bo: Little
Mcl uint32 // bo: Little
Msrc uint8
Resvd81 [11]byte
Anagrpid uint32 // bo: Little
Rsvd96 [3]byte
Nsattr uint8
Nvmsetid uint16 // bo: Little
Endgid uint16 // bo: Little
Nguid [16]byte
Eui64 [8]byte
Lbaf [16]NvmeLbaf
Rsvd192 [192]byte
Vs [3712]byte
}

323
prep.go
View file

@ -3,8 +3,11 @@ package gouring
import (
"syscall"
"unsafe"
"golang.org/x/sys/unix"
)
//go:nosplit
func PrepRW(op IoUringOp, sqe *IoUringSqe, fd int,
addr unsafe.Pointer, len int, offset uint64) {
sqe.Opcode = op
@ -18,8 +21,7 @@ func PrepRW(op IoUringOp, sqe *IoUringSqe, fd int,
sqe.IoUringSqe_Union4 = 0 // sqe.SetBufIndex(0) // union4
sqe.Personality = 0
sqe.IoUringSqe_Union5 = 0 // sqe.SetFileIndex(0) // union5
sqe.Addr3 = 0
sqe.__pad2[0] = 0
sqe.IoUringSqe_Union6 = IoUringSqe_Union6{}
}
func PrepNop(sqe *IoUringSqe) {
@ -43,7 +45,23 @@ func PrepTimeoutUpdate(sqe *IoUringSqe, ts *syscall.Timespec, userData uint64, f
sqe.SetTimeoutFlags(flags | IORING_TIMEOUT_UPDATE)
}
// ** "Syscall" OP
/*
"Syscall" OP
*/
func PrepSplice(sqe *IoUringSqe, fdIn int, offIn uint64, fdOut int, offOut uint64, nb int, spliceFlags uint32) {
PrepRW(IORING_OP_SPLICE, sqe, fdOut, nil, nb, offOut)
sqe.SetSpliceOffsetIn(offIn)
sqe.SetSpliceFdIn(int32(fdIn))
sqe.SetSpliceFlags(spliceFlags)
}
func PrepTee(sqe *IoUringSqe, fdIn int, fdOut int, nb int, spliceFlags uint32) {
PrepRW(IORING_OP_TEE, sqe, fdOut, nil, nb, 0)
sqe.SetSpliceOffsetIn(0)
sqe.SetSpliceFdIn(int32(fdIn))
sqe.SetSpliceFlags(spliceFlags)
}
func PrepRead(sqe *IoUringSqe, fd int, buf *byte, nb int, offset uint64) {
PrepRW(IORING_OP_READ, sqe, fd, unsafe.Pointer(buf), nb, offset)
@ -59,6 +77,12 @@ func PrepReadv2(sqe *IoUringSqe, fd int,
PrepReadv(sqe, fd, iov, nrVecs, offset)
sqe.SetRwFlags(flags)
}
func PrepReadFixed(sqe *IoUringSqe, fd int,
buf *byte, nb int,
offset uint64, bufIndex uint16) {
PrepRW(IORING_OP_READ_FIXED, sqe, fd, unsafe.Pointer(buf), nb, offset)
sqe.SetBufIndex(bufIndex)
}
func PrepWrite(sqe *IoUringSqe, fd int, buf *byte, nb int, offset uint64) {
PrepRW(IORING_OP_WRITE, sqe, fd, unsafe.Pointer(buf), nb, offset)
@ -74,30 +98,303 @@ func PrepWritev2(sqe *IoUringSqe, fd int,
PrepWritev(sqe, fd, iov, nrVecs, offset)
sqe.SetRwFlags(flags)
}
func PrepWriteFixed(sqe *IoUringSqe, fd int,
buf *byte, nb int,
offset uint64, bufIndex uint16) {
PrepRW(IORING_OP_WRITE_FIXED, sqe, fd, unsafe.Pointer(buf), nb, offset)
sqe.SetBufIndex(bufIndex)
}
func PrepAccept(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint) {
// *rsaSz = syscall.SizeofSockaddrAny // leave this out to caller?
func PrepAccept(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint32) {
PrepRW(IORING_OP_ACCEPT, sqe, fd, unsafe.Pointer(rsa), 0, uint64(uintptr(unsafe.Pointer(rsaSz))))
sqe.SetAcceptFlags(uint32(flags))
}
func PrepAcceptDirect(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint32, fileIndex int) {
PrepAccept(sqe, fd, rsa, rsaSz, flags)
__io_uring_set_target_fixed_file(sqe, uint32(fileIndex))
}
func PrepAcceptMultishot(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint32) {
PrepAccept(sqe, fd, rsa, rsaSz, flags)
sqe.IoPrio |= IORING_ACCEPT_MULTISHOT
}
func PrepAcceptMultishotDirect(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint32) {
PrepAcceptMultishot(sqe, fd, rsa, rsaSz, flags)
__io_uring_set_target_fixed_file(sqe, IORING_FILE_INDEX_ALLOC-1)
}
func PrepClose(sqe *IoUringSqe, fd int) {
PrepRW(IORING_OP_CLOSE, sqe, fd, nil, 0, 0)
func PrepConnect(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz uintptr) {
PrepRW(IORING_OP_CONNECT, sqe, fd, unsafe.Pointer(rsa), 0, uint64(rsaSz))
}
func PrepRecvmsg(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint) {
PrepRW(IORING_OP_RECVMSG, sqe, fd, unsafe.Pointer(msg), 1, 0)
sqe.SetMsgFlags(uint32(flags))
}
func PrepRecvmsgMultishot(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint) {
PrepRecvmsg(sqe, fd, msg, flags)
sqe.IoPrio |= IORING_RECV_MULTISHOT
}
func PrepSendmsg(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint) {
func PrepSendmsg(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint32) {
PrepRW(IORING_OP_SENDMSG, sqe, fd, unsafe.Pointer(msg), 1, 0)
sqe.SetMsgFlags(flags)
}
func PrepSendmsgZc(sqe *IoUringSqe, fd int, msg *syscall.Msghdr, flags uint32) {
PrepSendmsg(sqe, fd, msg, flags)
sqe.Opcode |= IORING_OP_SENDMSG_ZC
}
func PrepSendSetAddr(sqe *IoUringSqe, destAddr *syscall.RawSockaddrAny, addrLen uint16) {
sqe.SetAddr2_RawPtr(unsafe.Pointer(destAddr))
sqe.SetAddrLen(addrLen)
}
func PrepClose(sqe *IoUringSqe, fd int) {
PrepRW(IORING_OP_CLOSE, sqe, fd, nil, 0, 0)
}
func PrepCloseDirect(sqe *IoUringSqe, fileIndex uint32) {
PrepClose(sqe, 0)
__io_uring_set_target_fixed_file(sqe, fileIndex)
}
func PrepFilesUpdate(sqe *IoUringSqe, fds []int32, offset int) {
PrepRW(IORING_OP_FILES_UPDATE, sqe, -1, unsafe.Pointer(&fds[0]), len(fds), uint64(offset))
}
func PrepFallocate(sqe *IoUringSqe, fd int, mode int, offset uint64, length int) {
PrepRW(IORING_OP_FALLOCATE, sqe, fd, nil, mode, offset)
sqe.SetAddr_Value(uint64(length))
}
func PrepOpenat(sqe *IoUringSqe, dfd int, path *byte, flags uint32, mode int) {
PrepRW(IORING_OP_OPENAT, sqe, dfd, unsafe.Pointer(path), mode, 0)
sqe.SetOpenFlags(flags)
}
func PrepOpenatDirect(sqe *IoUringSqe, dfd int, path *byte, flags uint32, mode int, fileIndex uint32) {
PrepOpenat(sqe, dfd, path, flags, mode)
__io_uring_set_target_fixed_file(sqe, fileIndex)
}
func PrepOpenat2(sqe *IoUringSqe, dfd int, path *byte, how *unix.OpenHow) {
PrepRW(IORING_OP_OPENAT2, sqe, dfd, unsafe.Pointer(path), int(unsafe.Sizeof(*how)), 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(how))
}
func PrepOpenat2Direct(sqe *IoUringSqe, dfd int, path *byte, how *unix.OpenHow, fileIndex uint32) {
PrepOpenat2(sqe, dfd, path, how)
__io_uring_set_target_fixed_file(sqe, fileIndex)
}
func PrepStatx(sqe *IoUringSqe, dfd int, path *byte, flags uint32, mask uint32, statxbuf *unix.Statx_t) {
PrepRW(IORING_OP_STATX, sqe, dfd, unsafe.Pointer(path), int(mask), 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(statxbuf))
sqe.SetStatxFlags(flags)
}
func PrepFadvise(sqe *IoUringSqe, fd int, offset uint64, length int, advice uint32) {
PrepRW(IORING_OP_FADVISE, sqe, fd, nil, length, offset)
sqe.SetFadviseAdvice(advice)
}
func PrepMadvise(sqe *IoUringSqe, addr unsafe.Pointer, length int, advice uint32) {
PrepRW(IORING_OP_MADVISE, sqe, -1, addr, length, 0)
sqe.SetFadviseAdvice(advice)
}
func PrepSendto(sqe *IoUringSqe, sockfd int, buf *byte, length int, flags uint32,
addr *syscall.RawSockaddrAny, addrLen uint16) {
PrepSend(sqe, sockfd, buf, length, flags)
PrepSendSetAddr(sqe, addr, addrLen)
}
func PrepSend(sqe *IoUringSqe, sockfd int, buf *byte, length int, flags uint32) {
PrepRW(IORING_OP_SEND, sqe, sockfd, unsafe.Pointer(buf), length, 0)
sqe.SetMsgFlags(flags)
}
func PrepSendZc(sqe *IoUringSqe, sockfd int, buf *byte, length int, flags uint32, zcFlags uint16) {
PrepRW(IORING_OP_SEND_ZC, sqe, sockfd, unsafe.Pointer(buf), length, 0)
sqe.SetMsgFlags(flags)
sqe.IoPrio = uint16(zcFlags)
}
func PrepSendZcFixed(sqe *IoUringSqe, sockfd int, buf *byte, length int, flags uint32, zcFlags uint16, bufIndex uint16) {
PrepSendZc(sqe, sockfd, buf, length, flags, zcFlags)
sqe.IoPrio |= IORING_RECVSEND_FIXED_BUF
sqe.SetBufIndex(bufIndex)
}
func PrepRecv(sqe *IoUringSqe, sockfd int, buf *byte, length int, flags int) {
PrepRW(IORING_OP_RECV, sqe, sockfd, unsafe.Pointer(buf), length, 0)
sqe.SetMsgFlags(uint32(flags))
}
// ** Multishot
func PrepMultishotAccept(sqe *IoUringSqe, fd int, rsa *syscall.RawSockaddrAny, rsaSz *uintptr, flags uint) {
PrepAccept(sqe, fd, rsa, rsaSz, flags)
sqe.IoPrio |= IORING_ACCEPT_MULTISHOT
func PrepRecvMultishot(sqe *IoUringSqe, sockfd int, buf *byte, length int, flags int) {
PrepRecv(sqe, sockfd, buf, length, flags)
sqe.IoPrio |= IORING_RECV_MULTISHOT
}
func PrepSocket(sqe *IoUringSqe, domain int, _type int, protocol int, flags uint32) {
PrepRW(IORING_OP_SOCKET, sqe, domain, nil, protocol, uint64(_type))
sqe.SetRwFlags(flags)
}
func PrepSocketDirect(sqe *IoUringSqe, domain int, _type int, protocol int, fileIndex uint32, flags uint32) {
PrepRW(IORING_OP_SOCKET, sqe, domain, nil, protocol, uint64(_type))
sqe.SetRwFlags(flags)
__io_uring_set_target_fixed_file(sqe, fileIndex)
}
func PrepSocketDirectAlloc(sqe *IoUringSqe, domain int, _type int, protocol int, flags uint32) {
PrepRW(IORING_OP_SOCKET, sqe, domain, nil, protocol, uint64(_type))
sqe.SetRwFlags(flags)
__io_uring_set_target_fixed_file(sqe, IORING_FILE_INDEX_ALLOC-1)
}
/*
Poll
*/
// PrepEpollCtl syscall.EpollCtl look-alike
func PrepEpollCtl(sqe *IoUringSqe, epfd int, op int, fd int, ev *syscall.EpollEvent) {
PrepRW(IORING_OP_EPOLL_CTL, sqe, epfd, unsafe.Pointer(ev), op, uint64(fd))
}
func PrepPollAdd(sqe *IoUringSqe, fd int, pollMask uint32) {
PrepRW(IORING_OP_POLL_ADD, sqe, fd, nil, 0, 0)
sqe.SetPoll32Events(pollMask) // TODO: check endiannes
}
func PrepPollMultishot(sqe *IoUringSqe, fd int, pollMask uint32) {
PrepPollAdd(sqe, fd, pollMask)
sqe.Len = IORING_POLL_ADD_MULTI
}
func PrepPollRemove(sqe *IoUringSqe, userdata UserData) {
PrepRW(IORING_OP_POLL_REMOVE, sqe, -1, nil, 0, 0)
sqe.SetAddr(userdata.GetUnsafe())
}
func PrepPollUpdate(sqe *IoUringSqe, oldUserdata UserData, newUserdata UserData, pollMask uint32, flags int) {
PrepRW(IORING_OP_POLL_REMOVE, sqe, -1, nil, flags, newUserdata.GetUint64())
sqe.SetAddr(oldUserdata.GetUnsafe())
sqe.SetPoll32Events(pollMask) // TODO: check endiannes
}
func PrepFsync(sqe *IoUringSqe, fd int, fsyncFlags uint32) {
PrepRW(IORING_OP_FSYNC, sqe, fd, nil, 0, 0)
sqe.SetFsyncFlags(fsyncFlags)
}
/*
Extra
*/
func PrepCancel64(sqe *IoUringSqe, ud UserData, flags uint32) {
PrepRW(IORING_OP_ASYNC_CANCEL, sqe, -1, nil, 0, 0)
sqe.SetAddr(ud.GetUnsafe())
sqe.SetCancelFlags(flags)
}
func PrepCancel(sqe *IoUringSqe, ud UserData, flags uint32) {
PrepCancel64(sqe, UserData(ud.GetUintptr()), flags)
}
func PrepCancelFd(sqe *IoUringSqe, fd int, flags uint32) {
PrepRW(IORING_OP_ASYNC_CANCEL, sqe, fd, nil, 0, 0)
sqe.SetCancelFlags(flags | IORING_ASYNC_CANCEL_FD)
}
func PrepLinkTimeout(sqe *IoUringSqe, ts *syscall.Timespec, flags uint32) {
PrepRW(IORING_OP_LINK_TIMEOUT, sqe, -1, unsafe.Pointer(ts), 1, 0)
sqe.SetTimeoutFlags(flags)
}
func PrepProvideBuffers(sqe *IoUringSqe, addr unsafe.Pointer, length int, nr int, bGid uint16, bId int) {
PrepRW(IORING_OP_PROVIDE_BUFFERS, sqe, nr, addr, length, uint64(bId))
sqe.SetBufGroup(bGid)
}
func PrepRemoveBuffers(sqe *IoUringSqe, nr int, bGid uint16) {
PrepRW(IORING_OP_REMOVE_BUFFERS, sqe, nr, nil, 0, 0)
sqe.SetBufGroup(bGid)
}
func PrepShutdown(sqe *IoUringSqe, fd int, how int) {
PrepRW(IORING_OP_SHUTDOWN, sqe, fd, nil, how, 0)
}
func PrepUnlinkat(sqe *IoUringSqe, dfd int, path *byte, flags uint32) {
PrepRW(IORING_OP_UNLINKAT, sqe, dfd, unsafe.Pointer(path), 0, 0)
sqe.SetUnlinkFlags(flags)
}
func PrepUnlink(sqe *IoUringSqe, path *byte, flags uint32) {
PrepUnlinkat(sqe, unix.AT_FDCWD, path, flags)
}
func PrepRenameat(sqe *IoUringSqe, oldDfd int, oldPath *byte, newDfd int, newPath *byte, flags uint32) {
PrepRW(IORING_OP_RENAMEAT, sqe, oldDfd, unsafe.Pointer(oldPath), newDfd, 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(newPath))
sqe.SetRenameFlags(flags)
}
func PrepRename(sqe *IoUringSqe, oldPath *byte, newPath *byte) {
PrepRenameat(sqe, unix.AT_FDCWD, oldPath, unix.AT_FDCWD, newPath, 0)
}
func PrepSyncFileRange(sqe *IoUringSqe, fd int, length int, offset uint64, flags uint32) {
PrepRW(IORING_OP_SYNC_FILE_RANGE, sqe, fd, nil, length, offset)
sqe.SetSyncRangeFlags(flags)
}
func PrepMkdirat(sqe *IoUringSqe, dfd int, path *byte, mode int) {
PrepRW(IORING_OP_MKDIRAT, sqe, dfd, unsafe.Pointer(path), mode, 0)
}
func PrepMkdir(sqe *IoUringSqe, dfd int, path *byte, mode int) {
PrepMkdirat(sqe, unix.AT_FDCWD, path, mode)
}
func PrepSymlinkat(sqe *IoUringSqe, target *byte, newDirfd int, linkpath *byte) {
PrepRW(IORING_OP_SYMLINKAT, sqe, newDirfd, unsafe.Pointer(target), 0, 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(linkpath))
}
func PrepSymlink(sqe *IoUringSqe, target *byte, linkpath *byte) {
PrepSymlinkat(sqe, target, unix.AT_FDCWD, linkpath)
}
func PrepLinkat(sqe *IoUringSqe, oldDfd int, oldPath *byte, newDfd int, newPath *byte, flags uint32) {
PrepRW(IORING_OP_LINKAT, sqe, oldDfd, unsafe.Pointer(oldPath), newDfd, 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(newPath))
}
func PrepLink(sqe *IoUringSqe, oldPath *byte, newPath *byte, flags uint32) {
PrepLinkat(sqe, unix.AT_FDCWD, oldPath, unix.AT_FDCWD, newPath, flags)
}
func PrepMsgRing(sqe *IoUringSqe, fd int, length int, data uint64, flags uint32) {
PrepRW(IORING_OP_MSG_RING, sqe, fd, nil, length, data)
sqe.SetRwFlags(flags)
}
func PrepGetxattr(sqe *IoUringSqe, name *byte, value *byte, path *byte, length int) {
PrepRW(IORING_OP_GETXATTR, sqe, 0, unsafe.Pointer(name), length, 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(path))
sqe.SetXattrFlags(0)
}
func PrepSetxattr(sqe *IoUringSqe, name *byte, value *byte, path *byte, flags uint32, length int) {
PrepRW(IORING_OP_SETXATTR, sqe, 0, unsafe.Pointer(name), length, 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(value))
sqe.SetXattrFlags(flags)
}
func PrepFgetxattr(sqe *IoUringSqe, fd int, name *byte, value *byte, length uint) {
PrepRW(IORING_OP_FGETXATTR, sqe, fd, unsafe.Pointer(name), int(length), 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(value))
sqe.SetXattrFlags(0)
}
func PrepFsetxattr(sqe *IoUringSqe, fd int, name *byte, value *byte, flags uint32, length uint) {
PrepRW(IORING_OP_FSETXATTR, sqe, fd, unsafe.Pointer(name), int(length), 0)
sqe.SetOffset_RawPtr(unsafe.Pointer(value))
sqe.SetXattrFlags(flags)
}
//go:nosplit
func __io_uring_set_target_fixed_file(sqe *IoUringSqe, fileIndex uint32) {
sqe.SetFileIndex(fileIndex)
}

114
queue.go
View file

@ -14,14 +14,22 @@ const LIBURING_UDATA_TIMEOUT uint64 = ^uint64(0)
* or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
* awakened. For the latter case, we set the thread wakeup flag.
*/
func (ring *IoUring) sq_ring_needs_enter(flags *uint32) bool {
func (ring *IoUring) sq_ring_needs_enter(submitted uint32, flags *uint32) bool {
if submitted == 0 {
return false
}
if ring.Flags&IORING_SETUP_SQPOLL == 0 {
return true
}
// FIXME: io_uring_smp_mb
/*
* Ensure the kernel can see the store to the SQ tail before we read
* the flags.
*/
// FIXME: Extra call - no inline asm.
io_uring_smp_mb()
if atomic.LoadUint32(ring.Sq._Flags())&IORING_SQ_NEED_WAKEUP != 0 {
if atomic.LoadUint32(ring.Sq._KFlags())&IORING_SQ_NEED_WAKEUP != 0 {
*flags |= IORING_ENTER_SQ_WAKEUP
return true
}
@ -29,7 +37,7 @@ func (ring *IoUring) sq_ring_needs_enter(flags *uint32) bool {
}
func (ring *IoUring) cq_ring_needs_flush() bool {
return atomic.LoadUint32(ring.Sq._Flags())&(IORING_SQ_CQ_OVERFLOW|IORING_SQ_TASKRUN) != 0
return atomic.LoadUint32(ring.Sq._KFlags())&(IORING_SQ_CQ_OVERFLOW|IORING_SQ_TASKRUN) != 0
}
func (ring *IoUring) cq_ring_needs_enter() bool {
@ -42,6 +50,7 @@ type get_data struct {
getFlags uint32
sz int32
arg unsafe.Pointer
hasTs bool
}
func (ring *IoUring) _io_uring_get_cqe(cqePtr **IoUringCqe, data *get_data) (err error) {
@ -57,6 +66,11 @@ func (ring *IoUring) _io_uring_get_cqe(cqePtr **IoUringCqe, data *get_data) (err
break
}
if cqe != nil && data.waitNr == 0 && data.submit == 0 {
/*
* If we already looped once, we already entererd
* the kernel. Since there's nothing to submit or
* wait for, don't keep retrying.
*/
if looped || !ring.cq_ring_needs_enter() {
err = syscall.EAGAIN
break
@ -67,12 +81,19 @@ func (ring *IoUring) _io_uring_get_cqe(cqePtr **IoUringCqe, data *get_data) (err
flags = IORING_ENTER_GETEVENTS | data.getFlags
needEnter = true
}
if data.submit > 0 && ring.sq_ring_needs_enter(&flags) {
if data.submit > 0 && ring.sq_ring_needs_enter(data.submit, &flags) {
needEnter = true
}
if !needEnter {
break
}
if looped && data.hasTs {
arg := (*IoUringGeteventsArg)(data.arg)
if cqe == nil && arg.Ts != 0 && err == nil {
err = syscall.ETIME
}
break
}
if ring.IntFlags&INT_FLAG_REG_RING != 0 {
flags |= IORING_ENTER_REGISTERED_RING
@ -118,8 +139,8 @@ func (ring *IoUring) io_uring_peek_batch_cqe(cqes []*IoUringCqe, count uint32) u
again:
ready = ring.io_uring_cq_ready()
if ready > 0 {
var head = *ring.Cq._Head()
var mask = *ring.Cq._RingMask()
var head = *ring.Cq._KHead()
var mask = *ring.Cq._KRingMask()
var last uint32
if count > ready {
count = ready
@ -158,30 +179,22 @@ done:
*/
func (ring *IoUring) __io_uring_flush_sq() uint32 {
sq := &ring.Sq
var mask = *sq._RingMask()
var ktail = *sq._Tail()
var toSubmit = sq.SqeTail - sq.SqeHead
tail := sq.SqeTail
if toSubmit < 1 {
goto out
if sq.SqeHead != tail {
sq.SqeHead = tail
/*
* Ensure kernel sees the SQE updates before the tail update.
*/
atomic.StoreUint32(sq._KTail(), tail)
// if !(ring.Flags&IORING_SETUP_SQPOLL != 0) {
// IO_URING_WRITE_ONCE(*sq.ktail, tail)
// } else {
// io_uring_smp_store_release(sq.ktail, tail)
// }
}
/*
* Fill in sqes that we have queued up, adding them to the kernel ring
*/
for ; toSubmit > 0; toSubmit-- {
*uint32Array_Index(sq.Array, uintptr(ktail&mask)) = sq.SqeHead & mask
ktail++
sq.SqeHead++
}
/*
* Ensure that the kernel sees the SQE updates before it sees the tail
* update.
*/
atomic.StoreUint32(sq._Tail(), ktail)
out:
/*
* This _may_ look problematic, as we're not supposed to be reading
* SQ->head without acquire semantics. When we're in SQPOLL mode, the
@ -193,7 +206,7 @@ out:
* we can submit. The point is, we need to be able to deal with this
* situation regardless of any perceived atomicity.
*/
return ktail - *sq._Head()
return tail - *sq._KHead()
}
/*
@ -232,6 +245,9 @@ func (ring *IoUring) io_uring_wait_cqes_new(cqePtr **IoUringCqe, waitNtr uint32,
* handling between two threads.
*/
func (ring *IoUring) __io_uring_submit_timeout(waitNr uint32, ts *syscall.Timespec) (ret int, err error) {
/*
* If the SQ ring is full, we may need to submit IO first
*/
sqe := ring.io_uring_get_sqe()
if sqe == nil {
ret, err = ring.io_uring_submit()
@ -267,7 +283,7 @@ func (ring *IoUring) io_uring_wait_cqes(cqePtr **IoUringCqe, waitNtr uint32, ts
return
}
func (ring *IoUring) io_uring_submit_and_wait_timeout(cqePtr **IoUringCqe, waitNtr uint32, ts *syscall.Timespec, sigmask *Sigset_t) (err error) {
func (ring *IoUring) io_uring_submit_and_wait_timeout(cqePtr **IoUringCqe, waitNr uint32, ts *syscall.Timespec, sigmask *Sigset_t) (err error) {
var toSubmit int
if ts != nil {
if ring.Features&IORING_FEAT_EXT_ARG != 0 {
@ -278,21 +294,22 @@ func (ring *IoUring) io_uring_submit_and_wait_timeout(cqePtr **IoUringCqe, waitN
}
data := &get_data{
submit: ring.__io_uring_flush_sq(),
waitNr: waitNtr,
waitNr: waitNr,
getFlags: IORING_ENTER_EXT_ARG,
sz: int32(unsafe.Sizeof(arg)),
arg: unsafe.Pointer(&arg),
hasTs: ts != nil,
}
return ring._io_uring_get_cqe(cqePtr, data)
}
toSubmit, err = ring.__io_uring_submit_timeout(waitNtr, ts)
toSubmit, err = ring.__io_uring_submit_timeout(waitNr, ts)
if err != nil {
return
}
} else {
toSubmit = int(ring.__io_uring_flush_sq())
}
err = ring.__io_uring_get_cqe(cqePtr, uint32(toSubmit), waitNtr, sigmask)
err = ring.__io_uring_get_cqe(cqePtr, uint32(toSubmit), waitNr, sigmask)
return
}
@ -328,9 +345,10 @@ func (ring *IoUring) __io_uring_submit_and_wait(waitNr uint32) (int, error) {
func (ring *IoUring) __io_uring_submit(submitted uint32, waitNr uint32) (ret int, err error) {
var flags uint32 = 0
var cq_needs_enter = ring.cq_ring_needs_enter() || waitNr != 0
if ring.sq_ring_needs_enter(&flags) || waitNr != 0 {
if waitNr != 0 || ring.Flags&IORING_SETUP_IOPOLL != 0 {
if ring.sq_ring_needs_enter(submitted, &flags) || cq_needs_enter {
if cq_needs_enter {
flags |= IORING_ENTER_GETEVENTS
}
if ring.IntFlags&INT_FLAG_REG_RING != 0 {
@ -353,10 +371,12 @@ func (ring *IoUring) io_uring_get_sqe() *IoUringSqe {
* function multiple times before calling io_uring_submit().
*
* Returns a vacant sqe, or NULL if we're full.
*
* SAFETY: NO CONCURRENT ACCESS
*/
func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
sq := &ring.Sq
var head = atomic.LoadUint32(sq._Head())
var head = atomic.LoadUint32(sq._KHead())
var next = sq.SqeTail + 1
var shift uint32 = 0
@ -364,8 +384,8 @@ func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
shift = 1
}
if next-head <= *sq._RingEntries() {
sqe = ioUringSqeArray_Index(sq.Sqes, uintptr((sq.SqeTail&*sq._RingMask())<<shift))
if next-head <= *sq._KRingEntries() {
sqe = ioUringSqeArray_Index(sq.Sqes, uintptr((sq.SqeTail&*sq._KRingMask())<<shift))
sq.SqeTail = next
return
}
@ -375,7 +395,7 @@ func (ring *IoUring) _io_uring_get_sqe() (sqe *IoUringSqe) {
}
func (ring *IoUring) io_uring_cq_ready() uint32 {
return atomic.LoadUint32(ring.Cq._Tail()) - *ring.Cq._Head()
return atomic.LoadUint32(ring.Cq._KTail()) - *ring.Cq._KHead()
}
func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) error {
@ -383,7 +403,7 @@ func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) e
var err int32 = 0
var avail int
var mask = *ring.Cq._RingMask()
var mask = *ring.Cq._KRingMask()
var shift uint32 = 0
if ring.Flags&IORING_SETUP_CQE32 != 0 {
@ -391,12 +411,12 @@ func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) e
}
for {
var tail = atomic.LoadUint32(ring.Cq._Tail())
var head = *ring.Cq._Head()
var tail = atomic.LoadUint32(ring.Cq._KTail())
var head = *ring.Cq._KHead()
cqe = nil
avail = int(tail - head)
if avail < 1 {
if avail <= 0 {
break
}
@ -422,15 +442,15 @@ func (ring *IoUring) __io_uring_peek_cqe(cqePtr **IoUringCqe, nrAvail *uint32) e
if nrAvail != nil {
*nrAvail = uint32(avail)
}
if err == 0 {
return nil
if err < 0 {
return syscall.Errno(-err)
}
return syscall.Errno(-err)
return nil
}
func (ring *IoUring) io_uring_cq_advance(nr uint32) {
if nr > 0 {
atomic.StoreUint32(ring.Cq._Head(), *ring.Cq._Head()+nr)
atomic.StoreUint32(ring.Cq._KHead(), *ring.Cq._KHead()+nr)
}
}

View file

@ -121,7 +121,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
}
}()
for ctx.Err() == nil {
err = h.io_uring_wait_cqe(&cqe)
err = h.WaitCqe(&cqe)
if err == syscall.EINTR {
// ignore INTR
continue
@ -145,7 +145,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
_ = buf
// fmt.Printf("%+#v %s", buf, buf)
h.io_uring_cqe_seen(cqe) // necessary
h.SeenCqe(cqe) // necessary
wg.Done()
}
}
@ -172,7 +172,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
for i := 0; i < tc.jobCount; i++ {
var sqe *IoUringSqe
for { // sqe could be nil if SQ is already full so we spin until we got one
sqe = h.io_uring_get_sqe()
sqe = h.GetSqe()
if sqe != nil {
break
}
@ -205,7 +205,7 @@ func TestRingQueueSubmitSingleConsumer(t *testing.T) {
go consumer(h, ctx, &wg)
for i := 0; i < tc.jobCount; i++ {
sqe := h.io_uring_get_sqe()
sqe := h.GetSqe()
if sqe == nil {
// spin until we got one
continue

View file

@ -20,6 +20,14 @@ func io_uring_queue_init_params(entries uint32, ring *IoUring, p *IoUringParams)
if err != nil {
return err
}
// Directly map SQ slots to SQEs
sqArray := ring.Sq.Array
sqEntries := *ring.Sq._KRingEntries()
var index uint32
for index = 0; index < sqEntries; index++ {
*uint32Array_Index(sqArray, uintptr(index)) = index
}
ring.Features = p.Features
return nil
}
@ -29,9 +37,9 @@ func (ring *IoUring) io_uring_queue_exit() {
cq := &ring.Cq
sqeSize := SizeofIoUringSqe
if ring.Flags&IORING_SETUP_SQE128 != 0 {
sqeSize += 64
sqeSize += Align128IoUringSqe
}
munmap(unsafe.Pointer(sq.Sqes), sqeSize*uintptr(*sq._RingEntries()))
munmap(unsafe.Pointer(sq.Sqes), sqeSize*uintptr(*sq._KRingEntries()))
io_uring_unmap_rings(sq, cq)
/*
* Not strictly required, but frees up the slot we used now rather
@ -67,7 +75,7 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
if cq.RingSz > sq.RingSz {
sq.RingSz = cq.RingSz
}
// cq.RingSz = sq.RingSz
cq.RingSz = sq.RingSz
}
// alloc sq ring
sq.RingPtr, err = mmap(nil, uintptr(sq.RingSz),
@ -94,17 +102,17 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
}
//sq
sq.head = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Head)))
sq.tail = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Tail)))
sq.ringMask = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.RingMask)))
sq.ringEntries = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.RingEntries)))
sq.flags = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Flags)))
sq.dropped = (unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Dropped)))
sq.Array = (uint32Array)(unsafe.Pointer(uintptr(sq.RingPtr) + uintptr(p.SqOff.Array)))
sq.khead = unsafe.Add(sq.RingPtr, p.SqOff.Head)
sq.ktail = unsafe.Add(sq.RingPtr, p.SqOff.Tail)
sq.kringMask = unsafe.Add(sq.RingPtr, p.SqOff.RingMask)
sq.kringEntries = unsafe.Add(sq.RingPtr, p.SqOff.RingEntries)
sq.kflags = unsafe.Add(sq.RingPtr, p.SqOff.Flags)
sq.kdropped = unsafe.Add(sq.RingPtr, p.SqOff.Dropped)
sq.Array = (uint32Array)(unsafe.Add(sq.RingPtr, p.SqOff.Array))
size = SizeofIoUringSqe
if p.Flags&IORING_SETUP_SQE128 != 0 {
size += 64
size += Align128IoUringSqe
}
var sqeAddr unsafe.Pointer
sqeAddr, err = mmap(nil, size*uintptr(p.SqEntries),
@ -119,15 +127,21 @@ func io_uring_mmap(fd int, p *IoUringParams, sq *IoUringSq, cq *IoUringCq) (err
sq.Sqes = (ioUringSqeArray)(sqeAddr)
//cq
cq.head = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Head)))
cq.tail = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Tail)))
cq.ringMask = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.RingMask)))
cq.ringEntries = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.RingEntries)))
cq.overflow = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Overflow)))
cq.Cqes = (ioUringCqeArray)(unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Cqes)))
cq.khead = unsafe.Add(cq.RingPtr, p.CqOff.Head)
cq.ktail = unsafe.Add(cq.RingPtr, p.CqOff.Tail)
cq.kringMask = unsafe.Add(cq.RingPtr, p.CqOff.RingMask)
cq.kringEntries = unsafe.Add(cq.RingPtr, p.CqOff.RingEntries)
cq.koverflow = unsafe.Add(cq.RingPtr, p.CqOff.Overflow)
cq.Cqes = (ioUringCqeArray)(unsafe.Add(cq.RingPtr, p.CqOff.Cqes))
if p.CqOff.Flags != 0 {
cq.flags = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Flags)))
cq.kflags = (unsafe.Pointer(uintptr(cq.RingPtr) + uintptr(p.CqOff.Flags)))
}
sq.RingMask = *sq._KRingMask()
sq.RingEntries = *sq._KRingEntries()
cq.RingMask = *cq._KRingMask()
cq.RingEntries = *cq._KRingEntries()
return nil
}
@ -138,3 +152,17 @@ func io_uring_unmap_rings(sq *IoUringSq, cq *IoUringCq) error {
}
return nil
}
func io_uring_get_probe_ring(ring *IoUring) (probe *IoUringProbe) {
// len := SizeofIoUringProbe + 256*SizeofIouringProbeOp
probe = new(IoUringProbe)
r := ring.io_uring_register_probe(probe, 256)
if r >= 0 {
return
}
return nil
}
func (ring *IoUring) io_uring_get_probe_ring() (probe *IoUringProbe) {
return io_uring_get_probe_ring(ring)
}

View file

@ -2,6 +2,8 @@ package gouring
import (
"unsafe"
"golang.org/x/sys/unix"
)
const (
@ -12,8 +14,4 @@ const (
NSIG = (SIGTMAX + 1)
)
type Sigset_t struct {
Val [SIGSET_NWORDS]uint64
}
// https://baike.baidu.com/item/sigset_t/4481187
type Sigset_t = unix.Sigset_t

View file

@ -46,3 +46,7 @@ func (h *IoUring) Submit() (int, error) {
func (h *IoUring) SubmitAndWait(waitNr uint32) (int, error) {
return h.io_uring_submit_and_wait(waitNr)
}
func (h *IoUring) GetProbeRing() *IoUringProbe {
return h.io_uring_get_probe_ring()
}

View file

@ -7,34 +7,42 @@ import (
type uint32Array = unsafe.Pointer // *uint32
func uint32Array_Index(u uint32Array, i uintptr) *uint32 {
return (*uint32)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + SizeofUint32*i))
return (*uint32)(unsafe.Add(u, SizeofUint32*i))
}
type ioUringSqeArray = unsafe.Pointer // *IoUringSqe
// ioUringSqeArray_Index OR SQE64
func ioUringSqeArray_Index(u ioUringSqeArray, i uintptr) *IoUringSqe {
return (*IoUringSqe)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + SizeofIoUringSqe*i))
return (*IoUringSqe)(unsafe.Add(u, SizeofIoUringSqe*i))
}
// ioUringSqe128Array_Index OR SQE128
func ioUringSqe128Array_Index(u ioUringSqeArray, i uintptr) *IoUringSqe {
return (*IoUringSqe)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + (SizeofIoUringSqe+64)*i))
return (*IoUringSqe)(unsafe.Add(u, (SizeofIoUringSqe+Align128IoUringSqe)*i))
}
//
type ioUringCqeArray = unsafe.Pointer // *IoUringCqe
// ioUringCqeArray_Index OR CQE16
func ioUringCqeArray_Index(u ioUringCqeArray, i uintptr) *IoUringCqe {
return (*IoUringCqe)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + SizeofIoUringCqe*i))
return (*IoUringCqe)(unsafe.Add(u, SizeofIoUringCqe*i))
}
// ioUringCqe32Array_Index OR CQE32
func ioUringCqe32Array_Index(u ioUringCqeArray, i uintptr) *IoUringCqe {
return (*IoUringCqe)(unsafe.Pointer(uintptr(unsafe.Pointer(u)) + (SizeofIoUringCqe+SizeofIoUringCqe)*i))
return (*IoUringCqe)(unsafe.Add(u, (SizeofIoUringCqe+Align32IoUringCqe)*i))
}
//
type UserData [8]byte // uint64
type UserData uint64
func (u *UserData) SetUint64(v uint64) {
putUintptr(unsafe.Pointer(u), uintptr(v))
@ -50,6 +58,9 @@ func (u *UserData) SetUnsafe(ptr unsafe.Pointer) {
func (u UserData) GetUnsafe() unsafe.Pointer {
return *(*unsafe.Pointer)(unsafe.Pointer(&u))
}
func (u UserData) GetBytes() [8]byte {
return *(*[8]byte)(u.GetUnsafe())
}
func (u UserData) GetUintptr() uintptr {
return uintptr(u.GetUnsafe())
}

View file

@ -39,6 +39,7 @@ func TestUserdata(t *testing.T) {
var exp [8]byte
bo.PutUint64(exp[:], tc.exp)
assert.Equal(t, exp[:], u[:])
// assert.Equal(t, exp[:], u[:])
assert.Equal(t, tc.exp, u.GetUint64())
}
}