diff --git a/.gitignore b/.gitignore index ffd93e3..5c2e291 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.exe +*.db *.log \ No newline at end of file diff --git a/client/client.go b/client/client.go index 845ca8c..75dea1e 100644 --- a/client/client.go +++ b/client/client.go @@ -2,10 +2,13 @@ package client import ( "bytes" + "encoding/json" "errors" "io" + "log" "net" "net/netip" + "reflect" "time" "sirherobrine23.org/Minecraft-Server/go-pproxit/internal/pipe" @@ -17,15 +20,16 @@ var ( ) type Client struct { - ControlAddr netip.AddrPort // Controller address - Conn net.Conn // Agent controller connection - // AgentInfo *proto.AgentInfo // Agent info - Token proto.AgentAuth // Agent Token - LastPong *time.Time // Last pong response - UDPClients map[string]net.Conn // UDP Clients - TCPClients map[string]net.Conn // TCP Clients - NewUDPClient chan net.Conn // Accepts new UDP Clients - NewTCPClient chan net.Conn // Accepts new TCP Clients + ControlAddr netip.AddrPort // Controller address + Conn net.Conn // Agent controller connection + Token proto.AgentAuth // Agent Token + ResponseBuffer uint64 // Agent Reponse Buffer size, Initial size from proto.DataSize + RequestBuffer uint64 // Controller send bytes, initial size from proto.DataSize + LastPong *time.Time // Last pong response + UDPClients map[string]net.Conn // UDP Clients + TCPClients map[string]net.Conn // TCP Clients + NewUDPClient chan net.Conn // Accepts new UDP Clients + NewTCPClient chan net.Conn // Accepts new TCP Clients } func NewClient(ControlAddr netip.AddrPort, Token [36]byte) Client { @@ -34,6 +38,9 @@ func NewClient(ControlAddr netip.AddrPort, Token [36]byte) Client { Conn: nil, Token: Token, + ResponseBuffer: proto.DataSize, + RequestBuffer: proto.DataSize, + UDPClients: make(map[string]net.Conn), TCPClients: make(map[string]net.Conn), NewUDPClient: make(chan net.Conn), @@ -57,6 +64,29 @@ func (client *Client) Close() error { return nil } +func (client Client) Recive() (res *proto.Response, err error) { + recBuff := make([]byte, client.ResponseBuffer+proto.PacketSize) + var n int + if n, err = client.Conn.Read(recBuff); err != nil { + if opErr, isOp := err.(*net.OpError); isOp { + log.Println() + err = opErr.Err + if reflect.TypeOf(opErr.Err).String() == "poll.errNetClosing" { + return nil, io.EOF + } + } + return + } + + res = new(proto.Response) + if err = res.Reader(bytes.NewBuffer(recBuff[:n])); err != nil { + return + } + d,_:=json.Marshal(res) + log.Println(string(d)) + return +} + func (client Client) Send(req proto.Request) error { buff, err := req.Wbytes() if err != nil { @@ -67,30 +97,26 @@ func (client Client) Send(req proto.Request) error { return nil } +// Send token to controller to connect to tunnel func (client *Client) auth() (info *proto.AgentInfo, err error) { - var res proto.Response + attemps := 0 + var res *proto.Response for { - var buff []byte - if err = client.Send(proto.Request{ - AgentAuth: &client.Token, - }); err != nil { + if err = client.Send(proto.Request{AgentAuth: &client.Token}); err != nil { client.Conn.Close() return - } - buff = make([]byte, proto.PacketSize) - var n int - n, err = client.Conn.Read(buff) - if err != nil { + } else if res, err = client.Recive(); err != nil { client.Conn.Close() return } - if err = res.Reader(bytes.NewBuffer(buff[:n])); err != nil { - client.Conn.Close() - return - } else if res.BadRequest || res.SendAuth { + if res.BadRequest || res.SendAuth { // Wait seconds to resend token <-time.After(time.Second * 3) + if attemps++; attemps >= 25 { + err = ErrAgentUnathorized // Cannot auth + return + } continue // Reload auth } else if res.Unauthorized { // Close tunnel and break loop-de-loop 🦔 @@ -103,34 +129,41 @@ func (client *Client) auth() (info *proto.AgentInfo, err error) { return res.AgentInfo, nil } -// Dial and Auth agent +// Dial to controller and auto accept new responses from controller func (client *Client) Dial() (info *proto.AgentInfo, err error) { if client.Conn, err = net.DialUDP("udp", nil, net.UDPAddrFromAddrPort(client.ControlAddr)); err != nil { return } + go client.backgroud() return client.auth() } // Watcher response from controller -func (client *Client) Backgroud() (err error) { +func (client *Client) backgroud() (err error) { + go func(){ + for { + var current = time.Now() + client.Send(proto.Request{Ping: ¤t}) + <-time.After(time.Second * 5) + } + }() for { - buff := make([]byte, proto.PacketSize) - n, err := client.Conn.Read(buff) - if err == io.EOF { - break - } else if err != nil { + log.Println("waiting response from controller") + var res *proto.Response + if res, err = client.Recive(); err != nil { + log.Println(err.Error()) + if err == io.EOF { + break + } continue } - var res proto.Response - if err = res.Reader(bytes.NewBuffer(buff[:n])); err != nil { - continue + if res.ResizeBuffer != nil { + client.ResponseBuffer = *res.ResizeBuffer } else if res.Pong != nil { client.LastPong = res.Pong continue // Wait to next response - } - - if res.BadRequest { + } else if res.BadRequest { continue } else if res.Unauthorized { return ErrAgentUnathorized @@ -145,7 +178,7 @@ func (client *Client) Backgroud() (err error) { client.NewTCPClient <- toAgent // send to Accept go func() { for { - buff := make([]byte, proto.DataSize) + buff := make([]byte, client.RequestBuffer) n, err := toClient.Read(buff) if err != nil { if err == io.EOF { @@ -156,11 +189,19 @@ func (client *Client) Backgroud() (err error) { } continue } else { + if client.RequestBuffer-uint64(n) == 0 { + client.RequestBuffer += 500 + var req proto.Request + req.ResizeBuffer = new(uint64) + *req.ResizeBuffer = client.RequestBuffer + client.Send(req) + <-time.After(time.Microsecond) + } go client.Send(proto.Request{ DataTX: &proto.ClientData{ Client: data.Client, - Size: uint64(n), - Data: buff[:n], + Size: uint64(n), + Data: buff[:n], }, }) } @@ -172,7 +213,7 @@ func (client *Client) Backgroud() (err error) { client.NewUDPClient <- toAgent // send to Accept go func() { for { - buff := make([]byte, proto.DataSize) + buff := make([]byte, client.RequestBuffer) n, err := toClient.Read(buff) if err != nil { if err == io.EOF { @@ -183,11 +224,17 @@ func (client *Client) Backgroud() (err error) { } continue } else { + if client.RequestBuffer-uint64(n) == 0 { + var req proto.Request + req.ResizeBuffer = new(uint64) + *req.ResizeBuffer = uint64(n) + go client.Send(req) + } go client.Send(proto.Request{ DataTX: &proto.ClientData{ Client: data.Client, - Size: uint64(n), - Data: buff[:n], + Size: uint64(n), + Data: buff[:n], }, }) } @@ -211,4 +258,4 @@ func (client *Client) Backgroud() (err error) { } } return nil -} \ No newline at end of file +} diff --git a/cmd/client/client.go b/cmd/client/client.go new file mode 100644 index 0000000..dec4828 --- /dev/null +++ b/cmd/client/client.go @@ -0,0 +1,76 @@ +package client + +import ( + "fmt" + "io" + "net" + "net/netip" + + "github.com/google/uuid" + "github.com/urfave/cli/v2" + "sirherobrine23.org/Minecraft-Server/go-pproxit/client" + "sirherobrine23.org/Minecraft-Server/go-pproxit/proto" +) + +var CmdClient = cli.Command{ + Name: "client", + Aliases: []string{"c"}, + Usage: "connect to controller server and bind new requests to local port", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "url", + Required: true, + Aliases: []string{"host", "u"}, + Usage: `host string to connect to controller, example: "example.com:5522"`, + }, + &cli.StringFlag{ + Name: "token", + Required: true, + Usage: "agent token", + Aliases: []string{"t"}, + Action: func(ctx *cli.Context, s string) error { + if _, err := uuid.Parse(s); err == nil { + return nil + } else if len(s) == len(proto.AgentAuth{}) { + return nil + } + return fmt.Errorf("set valid token") + }, + }, + &cli.StringFlag{ + Name: "dial", + Required: true, + Usage: `dial connection, default is "localhost:80"`, + Aliases: []string{"d"}, + }, + }, + Action: func(ctx *cli.Context) (err error) { + var addr netip.AddrPort + if addr, err = netip.ParseAddrPort(ctx.String("url")); err != nil { + return + } + client := client.NewClient(addr, [36]byte([]byte(ctx.String("token")))) + var info *proto.AgentInfo + if info, err = client.Dial(); err != nil { + return err + } + fmt.Printf("Connected, Remote port: %d\n", info.LitenerPort) + fmt.Printf(" Remote address: %s\n", info.AddrPort.String()) + localConnect := ctx.String("dial") + for { + var conn, dial net.Conn + select { + case conn = <-client.NewTCPClient: + if dial, err = net.Dial("tcp", localConnect); err != nil { + continue + } + case conn = <-client.NewUDPClient: + if dial, err = net.DialUDP("udp", nil, net.UDPAddrFromAddrPort(netip.MustParseAddrPort(localConnect))); err != nil { + continue + } + } + go io.Copy(conn, dial) + go io.Copy(dial, conn) + } + }, +} \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index cdc90a7..4dcd9fe 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,63 +2,26 @@ package main import ( "fmt" - "io" - "log" - "net" - "net/netip" "os" - "os/signal" - "sirherobrine23.org/Minecraft-Server/go-pproxit/client" - "sirherobrine23.org/Minecraft-Server/go-pproxit/server" + "github.com/urfave/cli/v2" + "sirherobrine23.org/Minecraft-Server/go-pproxit/cmd/client" + "sirherobrine23.org/Minecraft-Server/go-pproxit/cmd/server" ) +var description string = `pproxit is a proxy that allows anyone to host a server without port forwarding. We use tunneling. Only the server needs to run the program, not every player!` func main() { - cctrl := make(chan os.Signal, 1) - signal.Notify(cctrl, os.Interrupt) - - var port uint16 = 5522 - server := server.NewServer(nil) - go server.Listen(port) - fmt.Printf("Server listen on :%d\n", port) - - go func() { - client := client.NewClient(netip.AddrPortFrom(netip.IPv6Loopback(), port), [36]byte{}) - info, err := client.Dial() - if err != nil { - panic(err) - } - go client.Backgroud() // Recive data - fmt.Printf("Client remote: %s\n", client.Conn.RemoteAddr().String()) - fmt.Printf("Client Listened on %d\n", info.LitenerPort) - - localConnect := "127.0.0.1:5201" - for { - select { - case tcp := <-client.NewTCPClient: - go func() { - conn, err := net.Dial("tcp", localConnect) - if err != nil { - log.Println(err) - return - } - go io.Copy(conn, tcp) - go io.Copy(tcp, conn) - }() - case udp := <-client.NewUDPClient: - go func () { - conn, err := net.DialUDP("udp", nil, net.UDPAddrFromAddrPort(netip.MustParseAddrPort(localConnect))) - if err != nil { - log.Println(err) - return - } - go io.Copy(conn, udp) - go io.Copy(udp, conn) - }() - } - } - }() - - <-cctrl - fmt.Println("Closing controller") + app := cli.NewApp() + app.Name = "pproxit" + app.Description = description + app.EnableBashCompletion = true + app.HideHelpCommand = true + app.Commands = []*cli.Command{ + &server.CmdServer, + &client.CmdClient, + } + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(app.ErrWriter, "Error: %v\n", err) + os.Exit(1) + } } diff --git a/cmd/server/server.go b/cmd/server/server.go new file mode 100644 index 0000000..c12f90b --- /dev/null +++ b/cmd/server/server.go @@ -0,0 +1,42 @@ +package server + +import ( + "github.com/urfave/cli/v2" + "sirherobrine23.org/Minecraft-Server/go-pproxit/proto" + "sirherobrine23.org/Minecraft-Server/go-pproxit/server" +) + +var CmdServer = cli.Command{ + Name: "server", + Usage: "Create local server and open controller ports", + Aliases: []string{"s"}, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "port", + Value: 5522, + Aliases: []string{"p"}, + Usage: "Set controller port to watcher UDP requests", + }, + &cli.StringFlag{ + Name: "log", + Value: "silence", + Aliases: []string{"l"}, + Usage: "set server log: silence, 0 or verbose, 2", + }, + &cli.StringFlag{ + Name: "db", + Value: "./pproxit.db", + Aliases: []string{"d"}, + Usage: "sqlite file path", + }, + }, + Action: func(ctx *cli.Context) error { + calls, err := NewCall(ctx.String("db")) + if err != nil { + return err + } + pproxitServer := server.NewServer(calls) + pproxitServer.RequestBuffer = proto.PacketDataSize * 2 // More initial buffer request + return pproxitServer.Listen(uint16(ctx.Int("port"))) + }, +} \ No newline at end of file diff --git a/cmd/server/servercall.go b/cmd/server/servercall.go new file mode 100644 index 0000000..62449b1 --- /dev/null +++ b/cmd/server/servercall.go @@ -0,0 +1,85 @@ +package server + +import ( + "time" + + _ "modernc.org/sqlite" + "sirherobrine23.org/Minecraft-Server/go-pproxit/server" + "xorm.io/xorm" + "xorm.io/xorm/names" +) + +type serverCalls struct { + XormEngine *xorm.Engine +} + +type User struct { + ID int64 `xorm:"pk"` // Client ID + Username string `xorm:"varchar(32) notnull unique 'user'"` // Username + FullName string `xorm:"text notnull notnull 'name'"` // Real name for user + AccountStatus int8 `xorm:"BIT notnull 'status'"` // Account Status + CreateAt time.Time `xorm:"created"` // Create date + UpdateAt time.Time `xorm:"updated"` // Update date +} + +type Tun struct { + ID int64 `xorm:"pk"` // Tunnel ID + User int64 `xorm:"notnull"` // Agent ID + Token [36]byte `xorm:"blob notnull unique"` // Tunnel Token + Proto uint8 `xorm:"default 3"` // Proto accept + PortListen uint16 // Port listen agent +} + +type Ping struct { + ID int64 `json:"-" xorm:"pk"` // Tunnel ID + TunID int64 `json:"-"` + ServerTime time.Time `json:"server" xorm:"datetime notnull"` + AgentTime time.Time `json:"agent" xorm:"datetime notnull"` +} + +func NewCall(DBConn string) (call *serverCalls, err error) { + call = new(serverCalls) + if call.XormEngine, err = xorm.NewEngine("sqlite", DBConn); err != nil { + return + } + call.XormEngine.SetMapper(names.SameMapper{}) + session := call.XormEngine.NewSession() + defer session.Close() + session.CreateTable(User{}) + session.CreateTable(Tun{}) + session.CreateTable(Ping{}) + return +} + +func (call serverCalls) AgentInfo(Token [36]byte) (server.TunnelInfo, error) { + var tun = Tun{Token: Token} + if ok, err := call.XormEngine.Get(&tun); err != nil || !ok { + if !ok { + return server.TunnelInfo{}, server.ErrNoAgent + } + return server.TunnelInfo{}, err + } + return server.TunnelInfo{ + PortListen: tun.PortListen, + Proto: tun.Proto, + }, nil +} + +func (call serverCalls) RegisterPing(serverTime, clientTime time.Time, Token [36]byte) error { + var tun = Tun{Token: Token} + if ok, err := call.XormEngine.Get(&tun); err != nil { + return err + } else if !ok { + return server.ErrNoAgent + } + + ping := new(Ping) + ping.TunID = tun.ID + ping.ServerTime = serverTime + ping.AgentTime = clientTime + _, err := call.XormEngine.InsertOne(ping) + if err != nil { + return err + } + return nil +} diff --git a/go.mod b/go.mod index 65645b3..a3f4001 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,36 @@ module sirherobrine23.org/Minecraft-Server/go-pproxit go 1.22.3 + +require ( + github.com/google/uuid v1.6.0 + github.com/urfave/cli/v2 v2.27.2 + modernc.org/sqlite v1.30.0 + xorm.io/xorm v1.3.9 +) + +require ( + github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/goccy/go-json v0.8.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect + github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect + golang.org/x/net v0.14.0 // indirect + golang.org/x/sys v0.19.0 // indirect + modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect + modernc.org/libc v1.50.9 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.8.0 // indirect + modernc.org/strutil v1.2.0 // indirect + modernc.org/token v1.1.0 // indirect + xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978 // indirect +) diff --git a/go.sum b/go.sum index e69de29..3f2d6f8 100644 --- a/go.sum +++ b/go.sum @@ -0,0 +1,114 @@ +gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:lSA0F4e9A2NcQSqGqTOXqu2aRi/XEQxDCBwM8yJtE6s= +gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a/go.mod h1:EXuID2Zs0pAQhH8yz+DNjUbjppKQzKFAn28TMYPB6IU= +github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= +github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/goccy/go-json v0.8.1 h1:4/Wjm0JIJaTDm8K1KcGrLHJoa8EsJ13YWeX+6Kfq6uI= +github.com/goccy/go-json v0.8.1/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI= +github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 h1:+qGGcbkzsfDQNPPe9UDgpxAWQrhbbBXOYJFQDq/dtJw= +github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913/go.mod h1:4aEEwZQutDLsQv2Deui4iYQ6DWTxR14g6m8Wv88+Xqk= +golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= +golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= +golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.21.2 h1:dycHFB/jDc3IyacKipCNSDrjIC0Lm1hyoWOZTRR20Lk= +modernc.org/cc/v4 v4.21.2/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= +modernc.org/ccgo/v4 v4.17.8 h1:yyWBf2ipA0Y9GGz/MmCmi3EFpKgeS7ICrAFes+suEbs= +modernc.org/ccgo/v4 v4.17.8/go.mod h1:buJnJ6Fn0tyAdP/dqePbrrvLyr6qslFfTbFrCuaYvtA= +modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= +modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= +modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw= +modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.50.9 h1:hIWf1uz55lorXQhfoEoezdUHjxzuO6ceshET/yWjSjk= +modernc.org/libc v1.50.9/go.mod h1:15P6ublJ9FJR8YQCGy8DeQ2Uwur7iW9Hserr/T3OFZE= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= +modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.30.0 h1:8YhPUs/HTnlEgErn/jSYQTwHN/ex8CjHHjg+K9iG7LM= +modernc.org/sqlite v1.30.0/go.mod h1:cgkTARJ9ugeXSNaLBPK3CqbOe7Ec7ZhWPoMFGldEYEw= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= +xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978 h1:bvLlAPW1ZMTWA32LuZMBEGHAUOcATZjzHcotf3SWweM= +xorm.io/builder v0.3.11-0.20220531020008-1bd24a7dc978/go.mod h1:aUW0S9eb9VCaPohFCH3j7czOx1PMW3i1HrSzbLYGBSE= +xorm.io/xorm v1.3.9 h1:TUovzS0ko+IQ1XnNLfs5dqK1cJl1H5uHpWbWqAQ04nU= +xorm.io/xorm v1.3.9/go.mod h1:LsCCffeeYp63ssk0pKumP6l96WZcHix7ChpurcLNuMw= diff --git a/internal/udplisterner/udplisterner.go b/internal/udplisterner/udplisterner.go index 3b960f2..f1f57e9 100644 --- a/internal/udplisterner/udplisterner.go +++ b/internal/udplisterner/udplisterner.go @@ -1,91 +1,140 @@ package udplisterner import ( + "io" "net" "net/netip" + "sync" "sirherobrine23.org/Minecraft-Server/go-pproxit/internal/pipe" ) +type clientInfo struct { + WriteSize int // Size to Write + Conn net.Conn // Client Pipe +} + type UdpListerner struct { - MTU uint64 - udpConn *net.UDPConn - clients map[string]net.Conn - newClient chan any + readSize int // UDPConn size to reader + udpConn *net.UDPConn // UDPConn root + clientInfo *sync.Map // Storage *clientInfo + newClient chan any // Accept connection channel or error } -func (udpConn UdpListerner) Close() error { - for addr, cli := range udpConn.clients { - cli.Close() - delete(udpConn.clients, addr) +func ListenUDPAddr(network string, address *net.UDPAddr) (UdpListen *UdpListerner, err error) { + UdpListen = new(UdpListerner) + if UdpListen.udpConn, err = net.ListenUDP(network, address); err != nil { + return nil, err } - close(udpConn.newClient) - return udpConn.udpConn.Close() + UdpListen.readSize = 1024 // Initial buffer reader + UdpListen.newClient = make(chan any) + UdpListen.clientInfo = new(sync.Map) + go UdpListen.backgroud() // Recive new requests + return UdpListen, nil } -func (udpConn UdpListerner) Addr() net.Addr { - return udpConn.udpConn.LocalAddr() +func ListenAddrPort(network string, address netip.AddrPort) (*UdpListerner, error) { + return ListenUDPAddr(network, net.UDPAddrFromAddrPort(address)) } -func (udpConn UdpListerner) Accept() (net.Conn, error) { - if data, ok := <-udpConn.newClient; ok { - if err, isErr := data.(error); isErr { +func Listen(network, address string) (*UdpListerner, error) { + local, err := net.ResolveUDPAddr(network, address) + if err != nil { + return nil, err + } + return ListenUDPAddr(network, local) +} + +// Close clients and close client channel +func (udp *UdpListerner) Close() error { + close(udp.newClient) // Close channel to new accepts + var toDelete map[string]*clientInfo + udp.clientInfo.Range(func(key, value any) bool { + toDelete[key.(string)] = value.(*clientInfo) + return true + }) + for key, info := range toDelete { + info.Conn.Close() + udp.clientInfo.Delete(key) + } + return nil +} + +func (udp *UdpListerner) CloseClient(clientAddrPort string) { + client, ok := udp.clientInfo.LoadAndDelete(clientAddrPort) + if !ok { + return + } + agent := client.(clientInfo) + agent.Conn.Close() +} + +func (udp UdpListerner) Addr() net.Addr { + if udp.udpConn == nil { + return &net.UDPAddr{} + } + return udp.udpConn.LocalAddr() +} + +func (udp UdpListerner) Accept() (net.Conn, error) { + if conn, ok := <-udp.newClient; ok { + if err, isErr := conn.(error); isErr { return nil, err } - return data.(net.Conn), nil + return conn.(*clientInfo).Conn, nil } return nil, net.ErrClosed } func (udp *UdpListerner) backgroud() { for { - buffer := make([]byte, udp.MTU) - n, from, err := udp.udpConn.ReadFromUDP(buffer) + readBuffer := make([]byte, udp.readSize) // Make reader size + n, from, err := udp.udpConn.ReadFromUDP(readBuffer) if err != nil { - udp.newClient <- err // Send to accept error - return - } else if toListener, exist := udp.clients[from.String()]; exist { - // Send in backgroud - go func() { - if _, err := toListener.Write(buffer[:n]); err != nil { - toListener.Close() - delete(udp.clients, from.String()) // Remove from clients - } - }() - continue // Call next request + if err == io.EOF || err == io.ErrUnexpectedEOF { + udp.Close() // Close clients + break + } + continue + } else if n-udp.readSize == 0 { + udp.readSize += 500 // Add 500 Bytes to reader } - // Create new connection and send to accept - toClinet, toListener := pipe.CreatePipe(udp.udpConn.LocalAddr(), from) - udp.clients[from.String()] = toListener // Set listerner clients - udp.newClient <- toClinet // return to accept - + // Check if exists current connection + if client, ok := udp.clientInfo.Load(from.String()); ok { + toListener := client.(*clientInfo) + toListener.Conn.Write(readBuffer[:n]) // n size from Buffer to client + continue // Contine loop + } go func() { - toListener.Write(buffer[:n]) // Write buffer to new pipe - for { - buffer := make([]byte, udp.MTU) - n, err := toListener.Read(buffer) - if err != nil { - toListener.Close() - delete(udp.clients, from.String()) // Remove from clients - return + // Create new client + newClient := new(clientInfo) + newClient.WriteSize = n // Same Size from reader buffer + var agentPipe net.Conn + newClient.Conn, agentPipe = pipe.CreatePipe(udp.udpConn.LocalAddr(), from) + + udp.newClient <- newClient // Send to accept + udp.clientInfo.Store(from.String(), newClient) + go agentPipe.Write(readBuffer[:n]) // n size from Buffer to client + go func() { + for { + client, ok := udp.clientInfo.Load(from.String()) + if !ok { + udp.clientInfo.Delete(from.String()) + agentPipe.Close() + break // bye-bye + } + newClient := client.(*clientInfo) + writeBuffer := make([]byte, newClient.WriteSize) + n, err := agentPipe.Read(writeBuffer) + if err != nil { + udp.clientInfo.Delete(from.String()) + agentPipe.Close() + break + } + go udp.udpConn.WriteToUDP(writeBuffer[:n], from) } - udp.udpConn.WriteToUDP(buffer[:n], from) - } + }() }() } } - -func Listen(UdpProto string, Address netip.AddrPort, MTU uint64) (net.Listener, error) { - conn, err := net.ListenUDP(UdpProto, net.UDPAddrFromAddrPort(Address)) - if err != nil { - return nil, err - } - udp := new(UdpListerner) - udp.udpConn = conn - udp.newClient = make(chan any) - udp.clients = make(map[string]net.Conn) - udp.MTU = MTU - go udp.backgroud() - return udp, nil -} diff --git a/internal/udplisterner/v2/pipe.go b/internal/udplisterner/v2/pipe.go new file mode 100644 index 0000000..ecf5046 --- /dev/null +++ b/internal/udplisterner/v2/pipe.go @@ -0,0 +1,201 @@ +package udplisterner +import ( + "io" + "net" + "os" + "sync" + "time" +) + +// pipeDeadline is an abstraction for handling timeouts. +type pipeDeadline struct { + mu sync.Mutex // Guards timer and cancel + timer *time.Timer + cancel chan struct{} // Must be non-nil +} + +func makePipeDeadline() pipeDeadline { + return pipeDeadline{cancel: make(chan struct{})} +} + +// set sets the point in time when the deadline will time out. +// A timeout event is signaled by closing the channel returned by waiter. +// Once a timeout has occurred, the deadline can be refreshed by specifying a +// t value in the future. +// +// A zero value for t prevents timeout. +func (d *pipeDeadline) set(t time.Time) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil && !d.timer.Stop() { + <-d.cancel // Wait for the timer callback to finish and close cancel + } + d.timer = nil + + // Time is zero, then there is no deadline. + closed := isClosedChan(d.cancel) + if t.IsZero() { + if closed { + d.cancel = make(chan struct{}) + } + return + } + + // Time in the future, setup a timer to cancel in the future. + if dur := time.Until(t); dur > 0 { + if closed { + d.cancel = make(chan struct{}) + } + d.timer = time.AfterFunc(dur, func() { + close(d.cancel) + }) + return + } + + // Time in the past, so close immediately. + if !closed { + close(d.cancel) + } +} + +// wait returns a channel that is closed when the deadline is exceeded. +func (d *pipeDeadline) wait() chan struct{} { + d.mu.Lock() + defer d.mu.Unlock() + return d.cancel +} + +func isClosedChan(c <-chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +} + +type pipe struct { + localAddr, remoteAddr net.Addr + + wrMu sync.Mutex // Serialize Write operations + + // Used by local Read to interact with remote Write. + // Successful receive on rdRx is always followed by send on rdTx. + rdRx <-chan []byte + rdTx chan<- int + + // Used by local Write to interact with remote Read. + // Successful send on wrTx is always followed by receive on wrRx. + wrTx chan<- []byte + wrRx <-chan int + + once sync.Once // Protects closing localDone + localDone chan struct{} + remoteDone <-chan struct{} + + readDeadline pipeDeadline + writeDeadline pipeDeadline +} + +func (p *pipe) LocalAddr() net.Addr { return p.localAddr } +func (p *pipe) RemoteAddr() net.Addr { return p.remoteAddr } + +func (p *pipe) Read(b []byte) (int, error) { + n, err := p.read(b) + if err != nil && err != io.EOF && err != io.ErrClosedPipe { + err = &net.OpError{Op: "read", Net: "pipe", Err: err} + } + return n, err +} + +func (p *pipe) read(b []byte) (n int, err error) { + switch { + case isClosedChan(p.localDone): + return 0, io.ErrClosedPipe + case isClosedChan(p.remoteDone): + return 0, io.EOF + case isClosedChan(p.readDeadline.wait()): + return 0, os.ErrDeadlineExceeded + } + + select { + case bw := <-p.rdRx: + nr := copy(b, bw) + p.rdTx <- nr + return nr, nil + case <-p.localDone: + return 0, io.ErrClosedPipe + case <-p.remoteDone: + return 0, io.EOF + case <-p.readDeadline.wait(): + return 0, os.ErrDeadlineExceeded + } +} + +func (p *pipe) Write(b []byte) (int, error) { + n, err := p.write(b) + if err != nil && err != io.ErrClosedPipe { + err = &net.OpError{Op: "write", Net: "pipe", Err: err} + } + return n, err +} + +func (p *pipe) write(b []byte) (n int, err error) { + switch { + case isClosedChan(p.localDone): + return 0, io.ErrClosedPipe + case isClosedChan(p.remoteDone): + return 0, io.ErrClosedPipe + case isClosedChan(p.writeDeadline.wait()): + return 0, os.ErrDeadlineExceeded + } + + p.wrMu.Lock() // Ensure entirety of b is written together + defer p.wrMu.Unlock() + for once := true; once || len(b) > 0; once = false { + select { + case p.wrTx <- b: + nw := <-p.wrRx + b = b[nw:] + n += nw + case <-p.localDone: + return n, io.ErrClosedPipe + case <-p.remoteDone: + return n, io.ErrClosedPipe + case <-p.writeDeadline.wait(): + return n, os.ErrDeadlineExceeded + } + } + return n, nil +} + +func (p *pipe) SetDeadline(t time.Time) error { + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return io.ErrClosedPipe + } + p.readDeadline.set(t) + p.writeDeadline.set(t) + return nil +} + +func (p *pipe) SetReadDeadline(t time.Time) error { + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return io.ErrClosedPipe + } + p.readDeadline.set(t) + return nil +} + +func (p *pipe) SetWriteDeadline(t time.Time) error { + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return io.ErrClosedPipe + } + p.writeDeadline.set(t) + return nil +} + +func (p *pipe) Close() error { + p.once.Do(func() { close(p.localDone) }) + return nil +} diff --git a/internal/udplisterner/v2/v2.go b/internal/udplisterner/v2/v2.go new file mode 100644 index 0000000..cfae345 --- /dev/null +++ b/internal/udplisterner/v2/v2.go @@ -0,0 +1,111 @@ +package udplisterner + +import ( + "io" + "log" + "net" +) + +type client struct { + ClientConn *pipe + rdRx, wrTx chan []byte + rdTx, wrRx chan int + localDone, remoteDone chan struct{} +} + +type UDPListener struct { + conn *net.UDPConn // Root listener + toAccept chan any // Return accept connections + clients map[string]client // Clients +} + +// Get address from UDP Listener +func (lis UDPListener) Addr() net.Addr { + return lis.conn.LocalAddr() +} + +func (lis *UDPListener) Close() error { + for _, client := range lis.clients { + client.localDone <- struct{}{} // Close and wait response, ignoraing errors + } + close(lis.toAccept) // end channel + return lis.conn.Close() +} + +func (lis UDPListener) Accept() (net.Conn, error) { + if rec, ok := <-lis.toAccept; ok { + if err, isErr := rec.(error); isErr { + return nil, err + } + return rec.(net.Conn), nil + } + return nil, io.ErrClosedPipe +} + +func Listen(network string, address *net.UDPAddr) (net.Listener, error) { + var conn *net.UDPConn + var err error + if conn, err = net.ListenUDP(network, address); err != nil { + return nil, err + } + accepts := make(chan any) + listen := &UDPListener{conn, accepts, make(map[string]client)} + go func() { + var maxSize int = 1024 + for { + log.Println("waiting request") + buff := make([]byte, maxSize) + n, from, err := conn.ReadFromUDPAddrPort(buff) + if err != nil { + break // end loop-de-loop + } + log.Printf("Request from: %s", from.String()) + if tun, ok := listen.clients[from.String()]; ok { + tun.wrTx <- buff[:n] + <-tun.rdTx // but ignore + continue + } + go func() { + rdRx := make(chan []byte) + wrTx := make(chan []byte) + rdTx := make(chan int) + wrRx := make(chan int) + localDone := make(chan struct{}) + remoteDone := make(chan struct{}) + newClient := client{ + rdRx: rdRx, rdTx: rdTx, + wrTx: wrTx, wrRx: wrRx, + localDone: localDone, remoteDone: remoteDone, + ClientConn: &pipe{ + localAddr: conn.LocalAddr(), + remoteAddr: net.UDPAddrFromAddrPort(from), + + rdRx: wrTx, rdTx: wrRx, + wrTx: rdRx, wrRx: rdTx, + localDone: remoteDone, remoteDone: localDone, + readDeadline: makePipeDeadline(), + writeDeadline: makePipeDeadline(), + }, + } + listen.clients[from.String()] = newClient // Set to clients map + listen.toAccept <- newClient.ClientConn // Send to accept + newClient.wrTx <- buff[:n] + <-newClient.rdTx // but ignore + for { + if data, ok := <-rdRx; ok { + n, err := conn.WriteToUDPAddrPort(data, from) + if err != nil { + localDone <- struct{}{} + <-remoteDone // wait remote + break // end + } + wrRx <- n // send write data + continue + } + break + } + }() + } + }() + return listen, nil +} diff --git a/internal/udplisterner/v2/v2_test.go b/internal/udplisterner/v2/v2_test.go new file mode 100644 index 0000000..c74ea72 --- /dev/null +++ b/internal/udplisterner/v2/v2_test.go @@ -0,0 +1,44 @@ +package udplisterner + +import ( + "bytes" + "net" + "testing" + "time" +) + +func TestListen(t *testing.T) { + addr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:0") + listen, err := Listen("udp", addr) + if err != nil { + t.Fatal(err) + } + defer listen.Close() // end test + go func(){ + t.Logf("Waiting to accept client ...\n") + conn, err := listen.Accept() + if err != nil { + t.Fatal(err) + } + defer conn.Close() + buff := make([]byte, 4) + if _, err := conn.Read(buff); err != nil { + t.Fatal(err) + } + if bytes.Compare(buff, []byte{1, 9, 9, 1}) != 0 { + t.Fatalf("cannot get same buffer bytes") + } + conn.Write(buff) + }() + + time.Sleep(time.Microsecond) + t.Logf("Connecting to %s\n", listen.Addr().String()) + addr, _ = net.ResolveUDPAddr("udp", listen.Addr().String()) + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + conn.Write([]byte{1, 9, 9, 1}) + conn.Read(make([]byte, 4)) +} \ No newline at end of file diff --git a/proto/proto.go b/proto/proto.go index dce6b95..ad690ad 100644 --- a/proto/proto.go +++ b/proto/proto.go @@ -14,8 +14,9 @@ const ( ProtoUDP uint8 = 2 // UDP Protocol ProtoBoth uint8 = 3 // TCP+UDP Protocol - DataSize uint64 = 10_000 // Default listener data recive and send - PacketSize uint64 = DataSize + 800 // Packet to send and recive on controller + DataSize uint64 = 10_000 // Default listener data recive and send + PacketSize uint64 = 800 // Packet to without data only requests and response headers + PacketDataSize uint64 = DataSize + PacketSize // Header and Data request/response ) var ( @@ -84,7 +85,7 @@ func (close *Client) Reader(r io.Reader) (err error) { type ClientData struct { Client Client // Client Destination Size uint64 // Data size - Data []byte // Bytes to send + Data []byte `json:"-"` // Bytes to send } func (data ClientData) Writer(w io.Writer) error { diff --git a/proto/request.go b/proto/request.go index 98b63f7..8a2f984 100644 --- a/proto/request.go +++ b/proto/request.go @@ -14,6 +14,7 @@ const ( ReqPing uint64 = 2 // Time ping ReqCloseClient uint64 = 3 // Close client ReqClientData uint64 = 4 // Send data + ReqResize uint64 = 5 // Resize request buffer ) var ( @@ -37,10 +38,11 @@ func (agent *AgentAuth) Reader(r io.Reader) error { // Send request to agent and wait response type Request struct { - AgentAuth *AgentAuth // Send agent authentication to controller - Ping *time.Time // Send ping time to controller in unix milliseconds - ClientClose *Client // Close client in controller - DataTX *ClientData // Recive data from agent + AgentAuth *AgentAuth // Send agent authentication to controller + Ping *time.Time // Send ping time to controller in unix milliseconds + ClientClose *Client // Close client in controller + DataTX *ClientData // Recive data from agent + ResizeBuffer *uint64 // Resize request buffer } // Get Bytes from Request @@ -73,6 +75,11 @@ func (req Request) Writer(w io.Writer) error { return err } return data.Writer(w) + } else if req.ResizeBuffer != nil { + if err := bigendian.WriteUint64(w, ReqResize); err != nil { + return err + } + return bigendian.WriteUint64(w, *req.ResizeBuffer) } return ErrInvalidBody } @@ -98,6 +105,10 @@ func (req *Request) Reader(r io.Reader) (err error) { } else if reqID == ReqClientData { req.DataTX = new(ClientData) return req.DataTX.Reader(r) + } else if reqID == ReqResize { + req.ResizeBuffer = new(uint64) + *req.ResizeBuffer, err = bigendian.ReadUint64(r) + return } return ErrInvalidBody } diff --git a/proto/response.go b/proto/response.go index e2fb41b..e58b761 100644 --- a/proto/response.go +++ b/proto/response.go @@ -12,12 +12,12 @@ import ( const ( ResUnauthorized uint64 = 1 // Request not processed and ignored ResBadRequest uint64 = 2 // Request cannot process and ignored - ResNewClient uint64 = 3 // New client - ResCloseClient uint64 = 4 // Controller closed connection - ResClientData uint64 = 5 // Controller accepted data - ResSendAuth uint64 = 6 // Send token to controller - ResAgentInfo uint64 = 7 // Agent info - ResPong uint64 = 8 // Ping response + ResCloseClient uint64 = 3 // Controller closed connection + ResClientData uint64 = 4 // Controller accepted data + ResSendAuth uint64 = 5 // Send token to controller + ResAgentInfo uint64 = 6 // Agent info + ResPong uint64 = 7 // Ping response + ResResize uint64 = 8 // Resize buffer size ) type AgentInfo struct { @@ -87,10 +87,10 @@ type Response struct { BadRequest bool // Controller accepted packet so cannot process Request SendAuth bool // Send Agent token - AgentInfo *AgentInfo // Agent Info - Pong *time.Time // ping response + AgentInfo *AgentInfo // Agent Info + Pong *time.Time // ping response + ResizeBuffer *uint64 // Resize Agent response - // NewClient *Client // Controller Accepted client CloseClient *Client // Controller end client DataRX *ClientData // Controller recive data from client } @@ -116,11 +116,6 @@ func (res Response) Writer(w io.Writer) error { return err } return bigendian.WriteInt64(w, pong.UnixMilli()) - // } else if newClient := res.NewClient; newClient != nil { - // if err := bigendian.WriteUint64(w, ResNewClient); err != nil { - // return err - // } - // return newClient.Writer(w) } else if closeClient := res.CloseClient; closeClient != nil { if err := bigendian.WriteUint64(w, ResCloseClient); err != nil { return err @@ -136,6 +131,11 @@ func (res Response) Writer(w io.Writer) error { return err } return info.Writer(w) + } else if res.ResizeBuffer != nil { + if err := bigendian.WriteUint64(w, ResResize); err != nil { + return err + } + return bigendian.WriteUint64(w, *res.ResizeBuffer) } return ErrInvalidBody } @@ -154,9 +154,6 @@ func (res *Response) Reader(r io.Reader) error { } else if resID == ResSendAuth { res.SendAuth = true return nil - // } else if resID == ResNewClient { - // res.NewClient = new(Client) - // return res.NewClient.Reader(r) } else if resID == ResCloseClient { res.CloseClient = new(Client) return res.CloseClient.Reader(r) @@ -174,6 +171,13 @@ func (res *Response) Reader(r io.Reader) error { res.Pong = new(time.Time) *res.Pong = time.UnixMilli(unixMil) return nil + } else if resID == ResResize { + var err error + res.ResizeBuffer = new(uint64) + if *res.ResizeBuffer, err = bigendian.ReadUint64(r); err != nil { + return err + } + return nil } return ErrInvalidBody diff --git a/server/call.go b/server/call.go new file mode 100644 index 0000000..2b9c899 --- /dev/null +++ b/server/call.go @@ -0,0 +1,37 @@ +package server + +import ( + "net" + "time" + + "sirherobrine23.org/Minecraft-Server/go-pproxit/proto" +) + +func getFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} + +// Accept any agent in ramdom port +type DefaultCall struct{} + +func (DefaultCall) RegisterPing(serverTime, clientTime time.Time, Token [36]byte) error { return nil } +func (d DefaultCall) AgentInfo(Token [36]byte) (TunnelInfo, error) { + port, err := getFreePort() + if err == nil { + return TunnelInfo{ + PortListen: uint16(port), + Proto: proto.ProtoBoth, + }, nil + } + return TunnelInfo{}, err +} diff --git a/server/server.go b/server/server.go index ae15d0e..9c9627b 100644 --- a/server/server.go +++ b/server/server.go @@ -2,13 +2,15 @@ package server import ( "bytes" + "encoding/json" "errors" "io" + "log" "net" "net/netip" "time" - "sirherobrine23.org/Minecraft-Server/go-pproxit/internal/udplisterner" + "sirherobrine23.org/Minecraft-Server/go-pproxit/internal/udplisterner/v2" "sirherobrine23.org/Minecraft-Server/go-pproxit/proto" ) @@ -22,53 +24,36 @@ type TunnelInfo struct { } type Tunnel struct { - Token [36]byte // Agent Token - Authenticated bool // Agent Authenticated and avaible to recive/transmiter data - UDPListener net.Listener // Accept connections from UDP Clients - TCPListener net.Listener // Accept connections from TCP Clients - UDPClients map[string]net.Conn // Current clients connected in UDP Socket - TCPClients map[string]net.Conn // Current clients connected in TCP Socket - SendToAgent chan proto.Response // Send data to agent + Token [36]byte // Agent Token + Authenticated bool // Agent Authenticated and avaible to recive/transmiter data + ResponseBuffer uint64 // Send Reponse size + UDPListener net.Listener // Accept connections from UDP Clients + TCPListener net.Listener // Accept connections from TCP Clients + UDPClients map[string]net.Conn // Current clients connected in UDP Socket + TCPClients map[string]net.Conn // Current clients connected in TCP Socket + SendToAgent chan proto.Response // Send data to agent } // Interface to server accept and reject agents sessions type ServerCalls interface { AgentInfo(Token [36]byte) (TunnelInfo, error) - AgentShutdown(Token [36]byte) error -} - -// Accept any agent in ramdom port -type DefaultCall struct{} - -func (DefaultCall) getFreePort() (int, error) { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return 0, err - } - - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return 0, err - } - defer l.Close() - return l.Addr().(*net.TCPAddr).Port, nil -} - -func (DefaultCall) AgentShutdown(Token [36]byte) error { return nil } -func (d DefaultCall) AgentInfo(Token [36]byte) (TunnelInfo, error) { - port, err := d.getFreePort() - if err == nil { - return TunnelInfo{ - PortListen: uint16(port), - Proto: proto.ProtoBoth, - }, nil - } - return TunnelInfo{}, err + RegisterPing(serverTime, clientTime time.Time, Token [36]byte) error } type Server struct { - Tunnels map[string]Tunnel // Tunnels listened - ServerCalls ServerCalls // Server call to auth and more + Conn *net.UDPConn // Local listen + RequestBuffer uint64 // Request Buffer + Tunnels map[string]Tunnel // Tunnels listened + ServerCalls ServerCalls // Server call to auth and more +} + +func (server Server) Send(to netip.AddrPort, res proto.Response) error { + buff, err := res.Wbytes() + if err != nil { + return err + } + _, err = server.Conn.WriteToUDPAddrPort(buff, to) + return err } // Create new server struct @@ -79,19 +64,16 @@ func NewServer(Calls ServerCalls) Server { Calls = DefaultCall{} } return Server{ - Tunnels: make(map[string]Tunnel), - ServerCalls: Calls, + RequestBuffer: proto.DataSize, + ServerCalls: Calls, + Tunnels: make(map[string]Tunnel), } } // Close client and send dead to agent func (tun *Tunnel) Close() { - if tun.TCPListener != nil { - tun.TCPListener.Close() - } - if tun.UDPListener != nil { - tun.UDPListener.Close() - } + tun.TCPListener.Close() + tun.UDPListener.Close() for key, conn := range tun.UDPClients { conn.Close() // End connection @@ -114,16 +96,10 @@ func (tun *Tunnel) UDPAccepts() { } clientAddr := netip.MustParseAddrPort(conn.RemoteAddr().String()) tun.UDPClients[conn.RemoteAddr().String()] = conn - // tun.SendToAgent <- proto.Response{ - // NewClient: &proto.Client{ - // Client: clientAddr, - // Proto: proto.ProtoUDP, - // }, - // } go func() { for { - buff := make([]byte, proto.DataSize) + buff := make([]byte, tun.ResponseBuffer) n, err := conn.Read(buff) if err != nil { go conn.Close() @@ -135,6 +111,14 @@ func (tun *Tunnel) UDPAccepts() { } break } + if tun.ResponseBuffer-uint64(n) == 0 { + tun.ResponseBuffer += 500 + res := proto.Response{} + res.ResizeBuffer = new(uint64) + *res.ResizeBuffer = tun.ResponseBuffer + tun.SendToAgent <- res + <-time.After(time.Microsecond) + } tun.SendToAgent <- proto.Response{ DataRX: &proto.ClientData{ Size: uint64(n), @@ -160,16 +144,9 @@ func (tun *Tunnel) TCPAccepts() { } clientAddr := netip.MustParseAddrPort(conn.RemoteAddr().String()) tun.TCPClients[conn.RemoteAddr().String()] = conn - // tun.SendToAgent <- proto.Response{ - // NewClient: &proto.Client{ - // Client: clientAddr, - // Proto: proto.ProtoTCP, - // }, - // } - go func() { for { - buff := make([]byte, proto.DataSize) + buff := make([]byte, tun.ResponseBuffer) n, err := conn.Read(buff) if err != nil { go conn.Close() @@ -181,6 +158,14 @@ func (tun *Tunnel) TCPAccepts() { } break } + if tun.ResponseBuffer-uint64(n) == 0 { + tun.ResponseBuffer += 500 + res := proto.Response{} + res.ResizeBuffer = new(uint64) + *res.ResizeBuffer = tun.ResponseBuffer + tun.SendToAgent <- res + <-time.After(time.Microsecond) + } tun.SendToAgent <- proto.Response{ DataRX: &proto.ClientData{ Size: uint64(n), @@ -239,14 +224,15 @@ func (server *Server) Listen(ControllerPort uint16) (err error) { if conn, err = net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.AddrPortFrom(netip.IPv4Unspecified(), ControllerPort))); err != nil { return } + server.Conn = conn for { var err error var req proto.Request - var res proto.Response var readSize int var addr netip.AddrPort - buffer := make([]byte, proto.PacketSize) + log.Println("waiting to request") + buffer := make([]byte, proto.PacketSize+server.RequestBuffer) if readSize, addr, err = conn.ReadFromUDPAddrPort(buffer); err != nil { if err == io.EOF { break // End controller @@ -255,24 +241,22 @@ func (server *Server) Listen(ControllerPort uint16) (err error) { } if err := req.Reader(bytes.NewBuffer(buffer[:readSize])); err != nil { - res.BadRequest = true - if buffer, err = res.Wbytes(); err != nil { - continue // not send bad request to agent - } - conn.WriteToUDPAddrPort(buffer, addr) // Send bad request to agent + log.Printf("From %s, cannot reader buffer: %s", addr.String(), err.Error()) + go server.Send(addr, proto.Response{BadRequest: true}) // Send bad request to agent continue // Continue parsing new requests } - if ping := req.Ping; ping != nil { - res.Pong = new(time.Time) - *res.Pong = time.Now() - data, _ := res.Wbytes() - conn.WriteToUDPAddrPort(data, addr) - continue - } + d,_ := json.Marshal(req) + log.Printf("From %s: %s", addr.String(), string(d)) // Process request if tunnel is authenticated if tun, exist := server.Tunnels[addr.String()]; exist && tun.Authenticated { + if ping := req.Ping; ping != nil { + current := time.Now() + go server.ServerCalls.RegisterPing(current, *req.Ping, tun.Token) + go server.Send(addr, proto.Response{Pong: ¤t}) + continue + } go tun.Request(req) // process request to tunnel continue // Call next message } @@ -305,45 +289,53 @@ func (server *Server) Listen(ControllerPort uint16) (err error) { } if !server.Tunnels[addr.String()].Authenticated && req.AgentAuth == nil { - res.SendAuth = true - data, _ := res.Wbytes() - conn.WriteToUDPAddrPort(data, addr) + go server.Send(addr, proto.Response{SendAuth: true}) continue } + info, err := server.ServerCalls.AgentInfo([36]byte(req.AgentAuth[:])) if err != nil { if err == ErrNoAgent { // Client not found - res.BadRequest = true + go server.Send(addr, proto.Response{Unauthorized: true}) } else { // Cannot process request resend - res.SendAuth = true + go server.Send(addr, proto.Response{SendAuth: true}) } - data, _ := res.Wbytes() - conn.WriteToUDPAddrPort(data, addr) continue } + // Close tunnels tokens listened + for ared, tun := range server.Tunnels { + if ared == addr.String() { + continue + } else if bytes.Equal(tun.Token[:], req.AgentAuth[:]) { + log.Printf("Closing agent %s", ared) + tun.Close() + delete(server.Tunnels, ared) + } + } + tun := server.Tunnels[addr.String()] + tun.Token = *req.AgentAuth // Set token to tunnel + if info.Proto == 3 || info.Proto == 1 { tun.TCPListener, err = net.ListenTCP("tcp", net.TCPAddrFromAddrPort(netip.AddrPortFrom(netip.IPv4Unspecified(), info.PortListen))) if err != nil { - res.BadRequest = true - data, _ := res.Wbytes() - conn.WriteToUDPAddrPort(data, addr) + log.Printf("TCP Listener from %s: %s", addr.String(), err.Error()) + go server.Send(addr, proto.Response{BadRequest: true}) continue } go tun.TCPAccepts() // Make accepts new requests } if info.Proto == 3 || info.Proto == 2 { - tun.UDPListener, err = udplisterner.Listen("udp", netip.AddrPortFrom(netip.IPv4Unspecified(), info.PortListen), proto.DataSize) + tun.UDPListener, err = udplisterner.Listen("udp", net.UDPAddrFromAddrPort(netip.AddrPortFrom(netip.IPv4Unspecified(), info.PortListen))) if err != nil { + log.Printf("UDP Listener from %s: %s", addr.String(), err.Error()) if tun.TCPListener != nil { tun.TCPListener.Close() } - res.BadRequest = true - data, _ := res.Wbytes() - conn.WriteToUDPAddrPort(data, addr) + go server.Send(addr, proto.Response{BadRequest: true}) continue } go tun.UDPAccepts() // Make accepts new requests @@ -351,13 +343,11 @@ func (server *Server) Listen(ControllerPort uint16) (err error) { tun.Authenticated = true server.Tunnels[addr.String()] = tun - res.AgentInfo = new(proto.AgentInfo) - res.AgentInfo.Protocol = info.Proto - res.AgentInfo.LitenerPort = info.PortListen - res.AgentInfo.AddrPort = addr - - data, _ := res.Wbytes() - conn.WriteToUDPAddrPort(data, addr) + AgentInfo := new(proto.AgentInfo) + AgentInfo.Protocol = info.Proto + AgentInfo.LitenerPort = info.PortListen + AgentInfo.AddrPort = addr + go server.Send(addr, proto.Response{AgentInfo: AgentInfo}) continue } return