1
0
Fork 0
mirror of https://github.com/ii64/gouring.git synced 2025-04-26 05:42:48 +02:00

chore: simple eventloop example

This commit is contained in:
Xeffy Chen 2023-04-18 02:41:38 +07:00
parent f4992f93e1
commit 64d8c0e3b6
Signed by: Xeffy
GPG key ID: E41C08AD390E7C49
4 changed files with 369 additions and 0 deletions
examples
go.modgo.sum
simple-eventloop

View file

@ -8,3 +8,8 @@ require (
github.com/ii64/gouring v0.0.0-00010101000000-000000000000
golang.org/x/sys v0.7.0
)
require (
github.com/alphadose/haxmap v1.2.0 // indirect
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 // indirect
)

View file

@ -1,6 +1,10 @@
github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM=
github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=

View file

@ -0,0 +1,274 @@
package lib
import (
"context"
"net"
"runtime"
"syscall"
"unsafe"
"github.com/alphadose/haxmap"
uring "github.com/ii64/gouring"
)
type Context interface {
Read()
Write(buf []byte)
Close()
SetContext(ctx context.Context)
Context() context.Context
}
type EventHandler interface {
OnAccept(ctx Context, sa syscall.Sockaddr)
OnRead(ctx Context, buf []byte)
OnWrite(ctx Context, nb int)
OnClose(ctx Context)
}
type eventContext struct {
evloop *Eventloop
ud *myUserdata
}
func (e *eventContext) SetContext(ctx context.Context) {
e.ud.ctx = ctx
}
func (e *eventContext) Context() context.Context {
return e.ud.ctx
}
func (e eventContext) Read() {
key, lud := e.evloop.allocUserdata()
sqe := e.evloop.queueRead(e.ud.fd, key)
lud.init(sqe.Opcode)
e.ud.copyTo(lud)
sqe.UserData = key
}
func (e eventContext) Write(b []byte) {
key, lud := e.evloop.allocUserdata()
sqe := e.evloop.queueWrite(e.ud.fd, key, b)
lud.init(sqe.Opcode)
e.ud.copyTo(lud)
sqe.UserData = key
}
func (e eventContext) Close() {
key, lud := e.evloop.allocUserdata()
sqe := e.evloop.queueClose(e.ud.fd, key)
lud.init(sqe.Opcode)
e.ud.copyTo(lud)
sqe.UserData = key
}
type myUserdata struct {
ctx context.Context
rsa syscall.RawSockaddrAny
rsaSz uintptr
fd int
bid int // buffer id
op uring.IoUringOp
}
func (ud *myUserdata) init(op uring.IoUringOp) {
ud.op = op
ud.rsaSz = unsafe.Sizeof(ud.rsa)
}
func (ud *myUserdata) copyTo(dst *myUserdata) {
oldOp := dst.op
*dst = *ud
dst.op = oldOp
}
type Eventloop struct {
ring *uring.IoUring
fd int
bufSize, bufCount int
buffers []byte
handler EventHandler
userdata *haxmap.Map[uring.UserData, *myUserdata]
bufGroup uint16
}
func New(ent uint32, listenFd int, handler EventHandler) *Eventloop {
ring, err := uring.New(ent, 0)
if err != nil {
panic(err)
}
bufSize := 0x1000
bufCount := 2048
var bufGroup uint16 = 0xffff
evloop := &Eventloop{
fd: listenFd,
ring: ring,
bufSize: bufSize,
bufCount: bufCount,
bufGroup: bufGroup,
buffers: make([]byte, bufCount*bufSize),
userdata: haxmap.New[uring.UserData, *myUserdata](),
handler: handler,
}
if err := evloop.init(); err != nil {
panic(err)
}
return evloop
}
func (e *Eventloop) allocUserdata() (key uring.UserData, val *myUserdata) {
val = new(myUserdata)
key.SetUnsafe(unsafe.Pointer(val))
e.userdata.Set(key, val)
return
}
func (e *Eventloop) freeUserdata(key uring.UserData) {
e.userdata.Del(key)
}
func (e *Eventloop) getBuf(bid int) []byte {
start := e.bufSize * bid
end := start + e.bufSize
return e.buffers[start:end]
}
func (e *Eventloop) init() error {
// queue accept mshot
sqe := e.ring.GetSqe()
key, ud := e.allocUserdata()
uring.PrepAcceptMultishot(sqe, e.fd, &ud.rsa, &ud.rsaSz, 0)
ud.init(sqe.Opcode)
sqe.UserData = key
// queue init provide buffers
sqe = e.ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&e.buffers[0]), e.bufSize, e.bufCount, e.bufGroup, 0)
// wait for init provide buffers
submitted, err := e.ring.SubmitAndWait(1)
if err != nil {
return err
}
if submitted != 2 {
panic("MUST submit 2 sqes")
}
var cqe *uring.IoUringCqe
if err = e.ring.WaitCqe(&cqe); err != nil {
return err
}
if cqe.Res < 0 {
err = syscall.Errno(-cqe.Res)
return err
}
e.ring.SeenCqe(cqe)
return nil
}
func (e *Eventloop) queueProvideBuffer(bid int, ud uring.UserData) *uring.IoUringSqe {
sqe := e.ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&e.getBuf(bid)[0]), e.bufSize, 1, e.bufGroup, bid)
sqe.UserData = ud
sqe.Flags |= uring.IOSQE_IO_LINK
return sqe
}
func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe {
sqe := e.ring.GetSqe()
uring.PrepRead(sqe, fd, nil, e.bufSize, 0)
sqe.Flags |= uring.IOSQE_BUFFER_SELECT
sqe.Flags |= uring.IOSQE_IO_LINK
sqe.SetBufGroup(e.bufGroup)
sqe.UserData = ud
return sqe
}
func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoUringSqe {
sqe := e.ring.GetSqe()
uring.PrepWrite(sqe, fd, &buf[0], len(buf), 0)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe.UserData = ud
return sqe
}
func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe {
sqe := e.ring.GetSqe()
uring.PrepClose(sqe, fd)
sqe.Flags |= uring.IOSQE_IO_LINK
sqe.UserData = ud
return sqe
}
func (e *Eventloop) Run() {
var cqe *uring.IoUringCqe
var err error
for {
if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR {
runtime.Gosched()
continue
} else if err != nil {
panic(err)
}
ctx := &eventContext{
evloop: e,
}
ud, ok := e.userdata.Get(cqe.UserData)
if !ok {
goto skip_no_submit
}
ctx.ud = ud
switch ud.op {
case uring.IORING_OP_ACCEPT:
var sa syscall.Sockaddr
sa, err = anyToSockaddr(&ud.rsa)
if err != nil {
panic(err)
}
fd := cqe.Res
if fd < 0 {
goto skip_no_submit
}
ud.fd = int(fd)
e.handler.OnAccept(ctx, sa)
case uring.IORING_OP_READ:
if !(cqe.Flags&uring.IORING_CQE_F_BUFFER != 0) {
panic("MUST PROVIDE BUFFER")
}
nb := cqe.Res
bid := uint16(cqe.Flags >> 16)
if cqe.Res <= 0 {
e.queueClose(ud.fd, cqe.UserData)
} else {
e.handler.OnRead(ctx, e.getBuf(int(bid))[:nb])
}
e.queueProvideBuffer(int(bid), 0)
case uring.IORING_OP_WRITE:
e.handler.OnWrite(ctx, int(cqe.Res))
case uring.IORING_OP_CLOSE:
e.handler.OnClose(ctx)
}
if ud.op != uring.IORING_OP_ACCEPT { // don't remove mshot UD
e.freeUserdata(cqe.UserData)
}
if submitted, err := e.ring.Submit(); err != nil {
panic(err)
} else {
_ = submitted
// println("submitted:", submitted)
}
skip_no_submit:
e.ring.SeenCqe(cqe)
}
}
func (e *Eventloop) Close() {
e.ring.Close()
}
//go:linkname anyToSockaddr syscall.anyToSockaddr
func anyToSockaddr(rsa *syscall.RawSockaddrAny) (syscall.Sockaddr, error)
//go:linkname sockaddrToTCP net.sockaddrToTCP
func sockaddrToTCP(sa syscall.Sockaddr) net.Addr

View file

@ -0,0 +1,86 @@
package main
import (
"bytes"
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/ii64/gouring/examples/simple-eventloop/lib"
)
type myEchoServer struct{}
func (h myEchoServer) OnAccept(ctx lib.Context, sa syscall.Sockaddr) {
fmt.Printf("accept: %+#v\n", sa)
ctx.SetContext(context.Background())
ctx.Read()
}
func (h myEchoServer) OnRead(ctx lib.Context, b []byte) {
sctx := ctx.Context()
fmt.Printf("read ctx %+#v %+#v\n", sctx, b)
ctx.Write(b)
}
func (h myEchoServer) OnWrite(ctx lib.Context, nb int) {
ctx.Read()
}
func (h myEchoServer) OnClose(ctx lib.Context) {
}
type myHTTP11Server struct{}
func (h myHTTP11Server) OnAccept(ctx lib.Context, sa syscall.Sockaddr) {
ctx.Read()
}
func (h myHTTP11Server) OnRead(ctx lib.Context, b []byte) {
statusCode := http.StatusOK
if !bytes.HasPrefix(b, []byte("GET /")) {
statusCode = 400
}
statusText := http.StatusText(statusCode)
header := []byte(fmt.Sprintf("HTTP/1.1 %d %s\r\nServer: gouring-simple-evloop\r\nConnection: closed\r\nContent-Length: %d\r\n\r\n",
statusCode, statusText,
len(b)))
buf := make([]byte, len(header)+len(b))
copy(buf[0:], header)
copy(buf[len(header):], b)
ctx.Write(buf)
}
func (h myHTTP11Server) OnWrite(ctx lib.Context, nb int) {
ctx.Close()
}
func (h myHTTP11Server) OnClose(ctx lib.Context) {
}
func runServer(addr string, handler lib.EventHandler) {
lis, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
defer lis.Close()
file, err := lis.(*net.TCPListener).File()
if err != nil {
panic(err)
}
defer file.Close()
fd := file.Fd()
evloop := lib.New(64, int(fd), handler)
evloop.Run()
}
func main() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
go runServer("0.0.0.0:11338", myEchoServer{})
go runServer("0.0.0.0:11339", myHTTP11Server{})
<-sig
}