diff --git a/examples/simple-eventloop/lib/lib.go b/examples/simple-eventloop/lib/lib.go index dfb8940..30f01f9 100644 --- a/examples/simple-eventloop/lib/lib.go +++ b/examples/simple-eventloop/lib/lib.go @@ -2,12 +2,16 @@ package lib import ( "context" + "encoding/json" "fmt" "net" + "net/http" "runtime" + "strings" "sync" "sync/atomic" "syscall" + "time" "unsafe" "github.com/alphadose/haxmap" @@ -97,7 +101,10 @@ type Eventloop struct { } func New(ent uint32, listenFd int, handler EventHandler) *Eventloop { - ring, err := uring.New(ent, 0) + ring, err := uring.NewWithParams(ent, &uring.IoUringParams{ + Flags: uring.IORING_SETUP_SQPOLL, + SqThreadIdle: 2000, + }) if err != nil { panic(err) } @@ -118,9 +125,63 @@ func New(ent uint32, listenFd int, handler EventHandler) *Eventloop { if err := evloop.init(); err != nil { panic(err) } + go evloop.runDebugInterface() return evloop } +func (e *Eventloop) runDebugInterface() { + mux := http.NewServeMux() + mux.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) { + dec := json.NewDecoder(r.Body) + defer r.Body.Close() + + type SubmitRequest struct { + EventName string `json:"en"` + Fd int `json:"fd"` + Data any `json:"d"` + } + + var req SubmitRequest + if err := dec.Decode(&req); err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Printf("debug iface error: %s\n", err) + return + } + + cx := &eventContext{ + evloop: e, + ud: &myUserdata{ + ctx: r.Context(), + fd: req.Fd, + }, + } + + switch strings.ToUpper(strings.Trim(req.EventName, " ")) { + case "WRITE": + cx.Write([]byte("DEBUG!\n")) + case "READ": + cx.Read() + case "CLOSE": + cx.Close() + } + + if _, err := e.ring.Submit(); err != nil { + goto errInvalid + } + w.WriteHeader(http.StatusOK) + return + errInvalid: + w.WriteHeader(http.StatusInternalServerError) + }) + srv := &http.Server{ + Addr: ":19110", + Handler: mux, + } + if err := srv.ListenAndServe(); err != nil { + panic(err) + } +} + var udPool = sync.Pool{ New: func() any { return new(myUserdata) @@ -190,7 +251,7 @@ func (e *Eventloop) queueProvideBuffer(bid int, ud uring.UserData) *uring.IoUrin return sqe } func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe { - fmt.Printf("[DEBUG] QU:READ\n") + fmt.Printf("[DEBUG] QU:READ fd=%d\n", fd) sqe := e.ring.GetSqe() uring.PrepRead(sqe, fd, nil, e.bufSize, 0) sqe.Flags |= uring.IOSQE_BUFFER_SELECT @@ -200,7 +261,7 @@ func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe { return sqe } func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoUringSqe { - fmt.Printf("[DEBUG] QU:WRITE\n") + fmt.Printf("[DEBUG] QU:WRITE fd=%d\n", fd) sqe := e.ring.GetSqe() uring.PrepWrite(sqe, fd, &buf[0], len(buf), 0) sqe.Flags |= uring.IOSQE_IO_LINK @@ -208,7 +269,7 @@ func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoU return sqe } func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe { - fmt.Printf("[DEBUG] QU:CLOSE\n") + fmt.Printf("[DEBUG] QU:CLOSE fd=%d\n", fd) sqe := e.ring.GetSqe() uring.PrepClose(sqe, fd) sqe.Flags |= uring.IOSQE_IO_LINK @@ -235,17 +296,44 @@ func (e *Eventloop) Run() { var cqe *uring.IoUringCqe var err error var i = 0 + + var pokeCounter uint64 = 0 + qEventPoke := func() { + sqe := e.ring.GetSqe() + uring.PrepNop(sqe) + sqe.UserData = 0xBEEF_BEEF_BEEF_BEEF + atomic.AddUint64(&pokeCounter, 1) + if _, err := e.ring.Submit(); err != nil { + panic(err) + } + } + if false { + go func() { + for { + fmt.Printf("[MON] POKE COUNT: %d\n", pokeCounter) + time.Sleep(time.Second * 2) + } + }() + qEventPoke() + } + for { if atomic.CompareAndSwapUint64(e.sema, 1, 0) { break } - if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR || err == syscall.EAGAIN || err == syscall.ETIME { + fmt.Printf("[DEBUG][%d] CQE WAIT err=%+#v\n", i, err) runtime.Gosched() // relax, do other thing while waiting for IO continue } else if err != nil { panic(err) } + if cqe.UserData == 0xBEEF_BEEF_BEEF_BEEF { + e.ring.SeenCqe(cqe) + qEventPoke() + continue + } + ctx := &eventContext{ evloop: e, } @@ -303,11 +391,13 @@ func (e *Eventloop) Run() { if ud.op != uring.IORING_OP_ACCEPT { // don't remove mshot UD e.freeUserdata(cqe.UserData) } + + e.reportSubmit(-1) if submitted, err := e.ring.Submit(); err != nil { panic(err) } else { _ = submitted - println("submitted:", submitted) + e.reportSubmit(submitted) } skip_no_submit: e.ring.SeenCqe(cqe) @@ -315,6 +405,16 @@ func (e *Eventloop) Run() { } } +func (e *Eventloop) reportSubmit(submitted int) { + sq := &e.ring.Sq + p := (*[2]unsafe.Pointer)(unsafe.Pointer(sq)) + khead := p[0] + ktail := p[1] + fmt.Printf("submitted: %d ht=(%d,%d) kht=(%p,%p)\n", submitted, + sq.SqeHead, sq.SqeTail, + khead, ktail) +} + func (e *Eventloop) Stop() error { // mark spin to stop atomic.StoreUint64(e.sema, 1) diff --git a/examples/simple-eventloop/main.go b/examples/simple-eventloop/main.go index a07c4e3..b38a8b0 100644 --- a/examples/simple-eventloop/main.go +++ b/examples/simple-eventloop/main.go @@ -112,10 +112,14 @@ func runClientEcho(ctx context.Context, id, serverAddr string) { var wnb int payload := []byte(fmt.Sprintf("ECHO[%s]:%d", id, time.Now().UnixMilli())) if wnb, err = c.Write(payload); err != nil { - panic(err) + fmt.Printf("CLIENT[%s] seq=%d WRITE err=%q\n", id, i, err) + // panic(err) + continue } if nb, err = c.Read(buf[:]); err != nil { - panic(err) + fmt.Printf("CLIENT[%s] seq=%d READ err=%q\n", id, i, err) + // panic(err) + continue } else if wnb != nb { panic("message size not equal") } @@ -134,10 +138,10 @@ func main() { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) - wg.Add(2) + wg.Add(1) go runServer(&wg, ctx, "0.0.0.0:11338", myEchoServer{}) - go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{}) + // go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{}) for i := 0; i < 1; i++ { go runClientEcho(ctx, strconv.Itoa(i), "0.0.0.0:11338")