1
0
Fork 0
mirror of https://github.com/ii64/gouring.git synced 2025-04-01 03:41:44 +02:00

chore: implement grace shutdown on simple eventloop

This commit is contained in:
Xeffy Chen 2023-04-18 12:04:03 +07:00
parent 64d8c0e3b6
commit b97c7aa767
Signed by: Xeffy
GPG key ID: E41C08AD390E7C49
2 changed files with 66 additions and 4 deletions
examples/simple-eventloop

View file

@ -4,6 +4,7 @@ import (
"context"
"net"
"runtime"
"sync/atomic"
"syscall"
"unsafe"
@ -66,6 +67,7 @@ type myUserdata struct {
rsa syscall.RawSockaddrAny
rsaSz uintptr
fd int
flag uint64
bid int // buffer id
op uring.IoUringOp
}
@ -87,6 +89,7 @@ type Eventloop struct {
bufSize, bufCount int
buffers []byte
handler EventHandler
sema *uint64
userdata *haxmap.Map[uring.UserData, *myUserdata]
bufGroup uint16
}
@ -108,6 +111,7 @@ func New(ent uint32, listenFd int, handler EventHandler) *Eventloop {
buffers: make([]byte, bufCount*bufSize),
userdata: haxmap.New[uring.UserData, *myUserdata](),
handler: handler,
sema: new(uint64),
}
if err := evloop.init(); err != nil {
panic(err)
@ -195,12 +199,30 @@ func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe {
return sqe
}
func (e *Eventloop) queueGraceShutdownNop() *uring.IoUringSqe {
sqe := e.ring.GetSqe()
uring.PrepNop(sqe)
sqe.Flags |= uring.IOSQE_IO_LINK
key, ud := e.allocUserdata()
ud.init(sqe.Opcode)
ud.fd = e.fd
ud.flag = 0xDEAD_DEAD_DEAD_DEAD
sqe.UserData = key
return sqe
}
func (e *Eventloop) Run() {
var cqe *uring.IoUringCqe
var err error
for {
if atomic.CompareAndSwapUint64(e.sema, 1, 0) {
break
}
if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR {
runtime.Gosched()
runtime.Gosched() // relax, do other thing while waiting for IO
continue
} else if err != nil {
panic(err)
@ -215,6 +237,12 @@ func (e *Eventloop) Run() {
ctx.ud = ud
switch ud.op {
case uring.IORING_OP_NOP:
switch {
case e.fd == ud.fd && ud.flag == 0xDEAD_DEAD_DEAD_DEAD:
break
}
case uring.IORING_OP_ACCEPT:
var sa syscall.Sockaddr
sa, err = anyToSockaddr(&ud.rsa)
@ -263,6 +291,18 @@ func (e *Eventloop) Run() {
}
}
func (e *Eventloop) Stop() error {
// mark spin to stop
atomic.StoreUint64(e.sema, 1)
// if the spin waiting for an event
// submit NOP with flag
e.queueGraceShutdownNop()
if _, err := e.ring.Submit(); err != nil {
return err
}
return nil
}
func (e *Eventloop) Close() {
e.ring.Close()
}

View file

@ -8,9 +8,11 @@ import (
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"github.com/ii64/gouring/examples/simple-eventloop/lib"
"golang.org/x/sys/unix"
)
type myEchoServer struct{}
@ -59,7 +61,9 @@ func (h myHTTP11Server) OnWrite(ctx lib.Context, nb int) {
func (h myHTTP11Server) OnClose(ctx lib.Context) {
}
func runServer(addr string, handler lib.EventHandler) {
func runServer(wg *sync.WaitGroup, ctx context.Context, addr string, handler lib.EventHandler) {
defer wg.Done()
lis, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
@ -71,7 +75,19 @@ func runServer(addr string, handler lib.EventHandler) {
}
defer file.Close()
fd := file.Fd()
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEADDR, 1)
evloop := lib.New(64, int(fd), handler)
defer evloop.Close()
go func() {
<-ctx.Done()
if err := evloop.Stop(); err != nil {
panic(err)
}
}()
evloop.Run()
}
@ -79,8 +95,14 @@ func main() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
go runServer("0.0.0.0:11338", myEchoServer{})
go runServer("0.0.0.0:11339", myHTTP11Server{})
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
wg.Add(2)
go runServer(&wg, ctx, "0.0.0.0:11338", myEchoServer{})
go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{})
<-sig
cancel()
wg.Wait()
}