1
0
Fork 0
mirror of https://github.com/ii64/gouring.git synced 2025-04-26 05:42:48 +02:00

chore: example simple-eventloop run client on bg

This commit is contained in:
Xeffy Chen 2023-04-24 21:41:32 +07:00
parent 0ebe45c00d
commit 69433c4222
Signed by: Xeffy
GPG key ID: E41C08AD390E7C49
2 changed files with 68 additions and 3 deletions
examples/simple-eventloop

View file

@ -2,8 +2,10 @@ package lib
import (
"context"
"fmt"
"net"
"runtime"
"sync"
"sync/atomic"
"syscall"
"unsafe"
@ -119,14 +121,25 @@ func New(ent uint32, listenFd int, handler EventHandler) *Eventloop {
return evloop
}
var udPool = sync.Pool{
New: func() any {
return new(myUserdata)
},
}
func (e *Eventloop) allocUserdata() (key uring.UserData, val *myUserdata) {
// val = udPool.Get().(*myUserdata)
val = new(myUserdata)
key.SetUnsafe(unsafe.Pointer(val))
e.userdata.Set(key, val)
return
}
func (e *Eventloop) freeUserdata(key uring.UserData) {
// v, exist := e.userdata.Get(key)
// if exist {
// udPool.Put(v)
e.userdata.Del(key)
// }
}
func (e *Eventloop) getBuf(bid int) []byte {
@ -169,6 +182,7 @@ func (e *Eventloop) init() error {
}
func (e *Eventloop) queueProvideBuffer(bid int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:PRV_BUF\n")
sqe := e.ring.GetSqe()
uring.PrepProvideBuffers(sqe, unsafe.Pointer(&e.getBuf(bid)[0]), e.bufSize, 1, e.bufGroup, bid)
sqe.UserData = ud
@ -176,6 +190,7 @@ func (e *Eventloop) queueProvideBuffer(bid int, ud uring.UserData) *uring.IoUrin
return sqe
}
func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:READ\n")
sqe := e.ring.GetSqe()
uring.PrepRead(sqe, fd, nil, e.bufSize, 0)
sqe.Flags |= uring.IOSQE_BUFFER_SELECT
@ -185,6 +200,7 @@ func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe {
return sqe
}
func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:WRITE\n")
sqe := e.ring.GetSqe()
uring.PrepWrite(sqe, fd, &buf[0], len(buf), 0)
sqe.Flags |= uring.IOSQE_IO_LINK
@ -192,6 +208,7 @@ func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoU
return sqe
}
func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:CLOSE\n")
sqe := e.ring.GetSqe()
uring.PrepClose(sqe, fd)
sqe.Flags |= uring.IOSQE_IO_LINK
@ -200,6 +217,7 @@ func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe {
}
func (e *Eventloop) queueGraceShutdownNop() *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:GRACE_SHUTDOWN\n")
sqe := e.ring.GetSqe()
uring.PrepNop(sqe)
sqe.Flags |= uring.IOSQE_IO_LINK
@ -216,12 +234,13 @@ func (e *Eventloop) queueGraceShutdownNop() *uring.IoUringSqe {
func (e *Eventloop) Run() {
var cqe *uring.IoUringCqe
var err error
var i = 0
for {
if atomic.CompareAndSwapUint64(e.sema, 1, 0) {
break
}
if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR {
if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR || err == syscall.EAGAIN || err == syscall.ETIME {
runtime.Gosched() // relax, do other thing while waiting for IO
continue
} else if err != nil {
@ -244,6 +263,7 @@ func (e *Eventloop) Run() {
}
case uring.IORING_OP_ACCEPT:
fmt.Printf("[DEBUG][%d] EV:ACCEPT\n", i)
var sa syscall.Sockaddr
sa, err = anyToSockaddr(&ud.rsa)
if err != nil {
@ -257,6 +277,7 @@ func (e *Eventloop) Run() {
e.handler.OnAccept(ctx, sa)
case uring.IORING_OP_READ:
fmt.Printf("[DEBUG][%d] EV:READ\n", i)
if !(cqe.Flags&uring.IORING_CQE_F_BUFFER != 0) {
panic("MUST PROVIDE BUFFER")
}
@ -270,9 +291,11 @@ func (e *Eventloop) Run() {
e.queueProvideBuffer(int(bid), 0)
case uring.IORING_OP_WRITE:
fmt.Printf("[DEBUG][%d] EV:WRITE\n", i)
e.handler.OnWrite(ctx, int(cqe.Res))
case uring.IORING_OP_CLOSE:
fmt.Printf("[DEBUG][%d] EV:CLOSE\n", i)
e.handler.OnClose(ctx)
}
@ -284,10 +307,11 @@ func (e *Eventloop) Run() {
panic(err)
} else {
_ = submitted
// println("submitted:", submitted)
println("submitted:", submitted)
}
skip_no_submit:
e.ring.SeenCqe(cqe)
i++
}
}

View file

@ -8,8 +8,10 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/ii64/gouring/examples/simple-eventloop/lib"
"golang.org/x/sys/unix"
@ -78,7 +80,7 @@ func runServer(wg *sync.WaitGroup, ctx context.Context, addr string, handler lib
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
evloop := lib.New(64, int(fd), handler)
evloop := lib.New(32, int(fd), handler)
defer evloop.Close()
go func() {
@ -91,6 +93,41 @@ func runServer(wg *sync.WaitGroup, ctx context.Context, addr string, handler lib
evloop.Run()
}
func runClientEcho(ctx context.Context, id, serverAddr string) {
var c net.Conn
var err error
for {
if c, err = net.Dial("tcp", serverAddr); err == nil {
break
}
time.Sleep(time.Second)
}
defer c.Close()
var buf [512]byte
var nb int
i := 0
for ctx.Err() == nil {
c.SetReadDeadline(time.Now().Add(time.Second * 4))
var wnb int
payload := []byte(fmt.Sprintf("ECHO[%s]:%d", id, time.Now().UnixMilli()))
if wnb, err = c.Write(payload); err != nil {
panic(err)
}
if nb, err = c.Read(buf[:]); err != nil {
panic(err)
} else if wnb != nb {
panic("message size not equal")
}
b := buf[:nb]
if !bytes.Equal(payload, b) {
panic("message not equal")
}
fmt.Printf("CLIENT[%s] seq=%d: OK\n", id, i)
i++
}
}
func main() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
@ -102,6 +139,10 @@ func main() {
go runServer(&wg, ctx, "0.0.0.0:11338", myEchoServer{})
go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{})
for i := 0; i < 1; i++ {
go runClientEcho(ctx, strconv.Itoa(i), "0.0.0.0:11338")
}
<-sig
cancel()
wg.Wait()