diff --git a/examples/simple-eventloop/lib/lib.go b/examples/simple-eventloop/lib/lib.go index 76f350d..016e67b 100644 --- a/examples/simple-eventloop/lib/lib.go +++ b/examples/simple-eventloop/lib/lib.go @@ -4,6 +4,7 @@ import ( "context" "net" "runtime" + "sync/atomic" "syscall" "unsafe" @@ -66,6 +67,7 @@ type myUserdata struct { rsa syscall.RawSockaddrAny rsaSz uintptr fd int + flag uint64 bid int // buffer id op uring.IoUringOp } @@ -87,6 +89,7 @@ type Eventloop struct { bufSize, bufCount int buffers []byte handler EventHandler + sema *uint64 userdata *haxmap.Map[uring.UserData, *myUserdata] bufGroup uint16 } @@ -108,6 +111,7 @@ func New(ent uint32, listenFd int, handler EventHandler) *Eventloop { buffers: make([]byte, bufCount*bufSize), userdata: haxmap.New[uring.UserData, *myUserdata](), handler: handler, + sema: new(uint64), } if err := evloop.init(); err != nil { panic(err) @@ -195,12 +199,30 @@ func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe { return sqe } +func (e *Eventloop) queueGraceShutdownNop() *uring.IoUringSqe { + sqe := e.ring.GetSqe() + uring.PrepNop(sqe) + sqe.Flags |= uring.IOSQE_IO_LINK + + key, ud := e.allocUserdata() + ud.init(sqe.Opcode) + ud.fd = e.fd + ud.flag = 0xDEAD_DEAD_DEAD_DEAD + + sqe.UserData = key + return sqe +} + func (e *Eventloop) Run() { var cqe *uring.IoUringCqe var err error for { + if atomic.CompareAndSwapUint64(e.sema, 1, 0) { + break + } + if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR { - runtime.Gosched() + runtime.Gosched() // relax, do other thing while waiting for IO continue } else if err != nil { panic(err) @@ -215,6 +237,12 @@ func (e *Eventloop) Run() { ctx.ud = ud switch ud.op { + case uring.IORING_OP_NOP: + switch { + case e.fd == ud.fd && ud.flag == 0xDEAD_DEAD_DEAD_DEAD: + break + } + case uring.IORING_OP_ACCEPT: var sa syscall.Sockaddr sa, err = anyToSockaddr(&ud.rsa) @@ -263,6 +291,18 @@ func (e *Eventloop) Run() { } } +func (e *Eventloop) Stop() error { + // mark spin to stop + atomic.StoreUint64(e.sema, 1) + // if the spin waiting for an event + // submit NOP with flag + e.queueGraceShutdownNop() + if _, err := e.ring.Submit(); err != nil { + return err + } + return nil +} + func (e *Eventloop) Close() { e.ring.Close() } diff --git a/examples/simple-eventloop/main.go b/examples/simple-eventloop/main.go index bdfa0b7..8bcbf8a 100644 --- a/examples/simple-eventloop/main.go +++ b/examples/simple-eventloop/main.go @@ -8,9 +8,11 @@ import ( "net/http" "os" "os/signal" + "sync" "syscall" "github.com/ii64/gouring/examples/simple-eventloop/lib" + "golang.org/x/sys/unix" ) type myEchoServer struct{} @@ -59,7 +61,9 @@ func (h myHTTP11Server) OnWrite(ctx lib.Context, nb int) { func (h myHTTP11Server) OnClose(ctx lib.Context) { } -func runServer(addr string, handler lib.EventHandler) { +func runServer(wg *sync.WaitGroup, ctx context.Context, addr string, handler lib.EventHandler) { + defer wg.Done() + lis, err := net.Listen("tcp", addr) if err != nil { panic(err) @@ -71,7 +75,19 @@ func runServer(addr string, handler lib.EventHandler) { } defer file.Close() fd := file.Fd() + + unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) evloop := lib.New(64, int(fd), handler) + defer evloop.Close() + + go func() { + <-ctx.Done() + if err := evloop.Stop(); err != nil { + panic(err) + } + }() + evloop.Run() } @@ -79,8 +95,14 @@ func main() { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt) - go runServer("0.0.0.0:11338", myEchoServer{}) - go runServer("0.0.0.0:11339", myHTTP11Server{}) + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(2) + + go runServer(&wg, ctx, "0.0.0.0:11338", myEchoServer{}) + go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{}) <-sig + cancel() + wg.Wait() }