chore: example eventloop add debug

This commit is contained in:
Xeffy Chen 2023-04-26 00:28:02 +07:00
parent cf008fa0a8
commit c05ef4f5a1
Signed by: Xeffy
GPG key ID: E41C08AD390E7C49
2 changed files with 114 additions and 10 deletions

View file

@ -2,12 +2,16 @@ package lib
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/alphadose/haxmap"
@ -97,7 +101,10 @@ type Eventloop struct {
}
func New(ent uint32, listenFd int, handler EventHandler) *Eventloop {
ring, err := uring.New(ent, 0)
ring, err := uring.NewWithParams(ent, &uring.IoUringParams{
Flags: uring.IORING_SETUP_SQPOLL,
SqThreadIdle: 2000,
})
if err != nil {
panic(err)
}
@ -118,9 +125,63 @@ func New(ent uint32, listenFd int, handler EventHandler) *Eventloop {
if err := evloop.init(); err != nil {
panic(err)
}
go evloop.runDebugInterface()
return evloop
}
func (e *Eventloop) runDebugInterface() {
mux := http.NewServeMux()
mux.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) {
dec := json.NewDecoder(r.Body)
defer r.Body.Close()
type SubmitRequest struct {
EventName string `json:"en"`
Fd int `json:"fd"`
Data any `json:"d"`
}
var req SubmitRequest
if err := dec.Decode(&req); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Printf("debug iface error: %s\n", err)
return
}
cx := &eventContext{
evloop: e,
ud: &myUserdata{
ctx: r.Context(),
fd: req.Fd,
},
}
switch strings.ToUpper(strings.Trim(req.EventName, " ")) {
case "WRITE":
cx.Write([]byte("DEBUG!\n"))
case "READ":
cx.Read()
case "CLOSE":
cx.Close()
}
if _, err := e.ring.Submit(); err != nil {
goto errInvalid
}
w.WriteHeader(http.StatusOK)
return
errInvalid:
w.WriteHeader(http.StatusInternalServerError)
})
srv := &http.Server{
Addr: ":19110",
Handler: mux,
}
if err := srv.ListenAndServe(); err != nil {
panic(err)
}
}
var udPool = sync.Pool{
New: func() any {
return new(myUserdata)
@ -190,7 +251,7 @@ func (e *Eventloop) queueProvideBuffer(bid int, ud uring.UserData) *uring.IoUrin
return sqe
}
func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:READ\n")
fmt.Printf("[DEBUG] QU:READ fd=%d\n", fd)
sqe := e.ring.GetSqe()
uring.PrepRead(sqe, fd, nil, e.bufSize, 0)
sqe.Flags |= uring.IOSQE_BUFFER_SELECT
@ -200,7 +261,7 @@ func (e *Eventloop) queueRead(fd int, ud uring.UserData) *uring.IoUringSqe {
return sqe
}
func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:WRITE\n")
fmt.Printf("[DEBUG] QU:WRITE fd=%d\n", fd)
sqe := e.ring.GetSqe()
uring.PrepWrite(sqe, fd, &buf[0], len(buf), 0)
sqe.Flags |= uring.IOSQE_IO_LINK
@ -208,7 +269,7 @@ func (e *Eventloop) queueWrite(fd int, ud uring.UserData, buf []byte) *uring.IoU
return sqe
}
func (e *Eventloop) queueClose(fd int, ud uring.UserData) *uring.IoUringSqe {
fmt.Printf("[DEBUG] QU:CLOSE\n")
fmt.Printf("[DEBUG] QU:CLOSE fd=%d\n", fd)
sqe := e.ring.GetSqe()
uring.PrepClose(sqe, fd)
sqe.Flags |= uring.IOSQE_IO_LINK
@ -235,17 +296,44 @@ func (e *Eventloop) Run() {
var cqe *uring.IoUringCqe
var err error
var i = 0
var pokeCounter uint64 = 0
qEventPoke := func() {
sqe := e.ring.GetSqe()
uring.PrepNop(sqe)
sqe.UserData = 0xBEEF_BEEF_BEEF_BEEF
atomic.AddUint64(&pokeCounter, 1)
if _, err := e.ring.Submit(); err != nil {
panic(err)
}
}
if false {
go func() {
for {
fmt.Printf("[MON] POKE COUNT: %d\n", pokeCounter)
time.Sleep(time.Second * 2)
}
}()
qEventPoke()
}
for {
if atomic.CompareAndSwapUint64(e.sema, 1, 0) {
break
}
if err = e.ring.WaitCqe(&cqe); err == syscall.EINTR || err == syscall.EAGAIN || err == syscall.ETIME {
fmt.Printf("[DEBUG][%d] CQE WAIT err=%+#v\n", i, err)
runtime.Gosched() // relax, do other thing while waiting for IO
continue
} else if err != nil {
panic(err)
}
if cqe.UserData == 0xBEEF_BEEF_BEEF_BEEF {
e.ring.SeenCqe(cqe)
qEventPoke()
continue
}
ctx := &eventContext{
evloop: e,
}
@ -303,11 +391,13 @@ func (e *Eventloop) Run() {
if ud.op != uring.IORING_OP_ACCEPT { // don't remove mshot UD
e.freeUserdata(cqe.UserData)
}
e.reportSubmit(-1)
if submitted, err := e.ring.Submit(); err != nil {
panic(err)
} else {
_ = submitted
println("submitted:", submitted)
e.reportSubmit(submitted)
}
skip_no_submit:
e.ring.SeenCqe(cqe)
@ -315,6 +405,16 @@ func (e *Eventloop) Run() {
}
}
func (e *Eventloop) reportSubmit(submitted int) {
sq := &e.ring.Sq
p := (*[2]unsafe.Pointer)(unsafe.Pointer(sq))
khead := p[0]
ktail := p[1]
fmt.Printf("submitted: %d ht=(%d,%d) kht=(%p,%p)\n", submitted,
sq.SqeHead, sq.SqeTail,
khead, ktail)
}
func (e *Eventloop) Stop() error {
// mark spin to stop
atomic.StoreUint64(e.sema, 1)

View file

@ -112,10 +112,14 @@ func runClientEcho(ctx context.Context, id, serverAddr string) {
var wnb int
payload := []byte(fmt.Sprintf("ECHO[%s]:%d", id, time.Now().UnixMilli()))
if wnb, err = c.Write(payload); err != nil {
panic(err)
fmt.Printf("CLIENT[%s] seq=%d WRITE err=%q\n", id, i, err)
// panic(err)
continue
}
if nb, err = c.Read(buf[:]); err != nil {
panic(err)
fmt.Printf("CLIENT[%s] seq=%d READ err=%q\n", id, i, err)
// panic(err)
continue
} else if wnb != nb {
panic("message size not equal")
}
@ -134,10 +138,10 @@ func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
wg.Add(2)
wg.Add(1)
go runServer(&wg, ctx, "0.0.0.0:11338", myEchoServer{})
go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{})
// go runServer(&wg, ctx, "0.0.0.0:11339", myHTTP11Server{})
for i := 0; i < 1; i++ {
go runClientEcho(ctx, strconv.Itoa(i), "0.0.0.0:11338")