Clients dynamic TX/RX sizes #1

Merged
Sirherobrine23 merged 3 commits from dynamic-size into main 2024-06-11 21:24:33 +00:00
7 changed files with 141 additions and 97 deletions
Showing only changes of commit 15473975f8 - Show all commits

View File

@ -17,15 +17,16 @@ var (
)
type Client struct {
ControlAddr netip.AddrPort // Controller address
Conn net.Conn // Agent controller connection
// AgentInfo *proto.AgentInfo // Agent info
Token proto.AgentAuth // Agent Token
LastPong *time.Time // Last pong response
UDPClients map[string]net.Conn // UDP Clients
TCPClients map[string]net.Conn // TCP Clients
NewUDPClient chan net.Conn // Accepts new UDP Clients
NewTCPClient chan net.Conn // Accepts new TCP Clients
ControlAddr netip.AddrPort // Controller address
Conn net.Conn // Agent controller connection
Token proto.AgentAuth // Agent Token
ResponseBuffer uint64 // Agent Reponse Buffer size, Initial size from proto.DataSize
RequestBuffer uint64 // Controller send bytes, initial size from proto.DataSize
LastPong *time.Time // Last pong response
UDPClients map[string]net.Conn // UDP Clients
TCPClients map[string]net.Conn // TCP Clients
NewUDPClient chan net.Conn // Accepts new UDP Clients
NewTCPClient chan net.Conn // Accepts new TCP Clients
}
func NewClient(ControlAddr netip.AddrPort, Token [36]byte) Client {
@ -34,6 +35,9 @@ func NewClient(ControlAddr netip.AddrPort, Token [36]byte) Client {
Conn: nil,
Token: Token,
ResponseBuffer: proto.DataSize,
RequestBuffer: proto.DataSize,
UDPClients: make(map[string]net.Conn),
TCPClients: make(map[string]net.Conn),
NewUDPClient: make(chan net.Conn),
@ -57,6 +61,20 @@ func (client *Client) Close() error {
return nil
}
func (client Client) Recive() (res *proto.Response, err error) {
recBuff := make([]byte, client.ResponseBuffer+proto.PacketSize)
var n int
if n, err = client.Conn.Read(recBuff); err != nil {
return
}
res = new(proto.Response)
if err = res.Reader(bytes.NewBuffer(recBuff[:n])); err != nil {
return
}
return
}
func (client Client) Send(req proto.Request) error {
buff, err := req.Wbytes()
if err != nil {
@ -68,27 +86,17 @@ func (client Client) Send(req proto.Request) error {
}
func (client *Client) auth() (info *proto.AgentInfo, err error) {
var res proto.Response
var res *proto.Response
for {
var buff []byte
if err = client.Send(proto.Request{
AgentAuth: &client.Token,
}); err != nil {
if err = client.Send(proto.Request{AgentAuth: &client.Token}); err != nil {
client.Conn.Close()
return
}
buff = make([]byte, proto.PacketSize)
var n int
n, err = client.Conn.Read(buff)
if err != nil {
} else if res, err = client.Recive(); err != nil {
client.Conn.Close()
return
}
if err = res.Reader(bytes.NewBuffer(buff[:n])); err != nil {
client.Conn.Close()
return
} else if res.BadRequest || res.SendAuth {
if res.BadRequest || res.SendAuth {
// Wait seconds to resend token
<-time.After(time.Second * 3)
continue // Reload auth
@ -103,7 +111,7 @@ func (client *Client) auth() (info *proto.AgentInfo, err error) {
return res.AgentInfo, nil
}
// Dial and Auth agent
// Dial and Auth agent before require call in new gorotine client.Backgroud()
func (client *Client) Dial() (info *proto.AgentInfo, err error) {
if client.Conn, err = net.DialUDP("udp", nil, net.UDPAddrFromAddrPort(client.ControlAddr)); err != nil {
return
@ -114,23 +122,20 @@ func (client *Client) Dial() (info *proto.AgentInfo, err error) {
// Watcher response from controller
func (client *Client) Backgroud() (err error) {
for {
buff := make([]byte, proto.PacketSize)
n, err := client.Conn.Read(buff)
if err == io.EOF {
break
} else if err != nil {
var res *proto.Response
if res, err = client.Recive(); err != nil {
if err == io.EOF {
break
}
continue
}
var res proto.Response
if err = res.Reader(bytes.NewBuffer(buff[:n])); err != nil {
continue
if res.ResizeBuffer != nil {
client.ResponseBuffer = *res.ResizeBuffer
} else if res.Pong != nil {
client.LastPong = res.Pong
continue // Wait to next response
}
if res.BadRequest {
} else if res.BadRequest {
continue
} else if res.Unauthorized {
return ErrAgentUnathorized
@ -145,7 +150,7 @@ func (client *Client) Backgroud() (err error) {
client.NewTCPClient <- toAgent // send to Accept
go func() {
for {
buff := make([]byte, proto.DataSize)
buff := make([]byte, client.RequestBuffer)
n, err := toClient.Read(buff)
if err != nil {
if err == io.EOF {
@ -156,11 +161,19 @@ func (client *Client) Backgroud() (err error) {
}
continue
} else {
if client.RequestBuffer-uint64(n) == 0 {
client.RequestBuffer += 500
var req proto.Request
req.ResizeBuffer = new(uint64)
*req.ResizeBuffer = client.RequestBuffer
client.Send(req)
<-time.After(time.Microsecond)
}
go client.Send(proto.Request{
DataTX: &proto.ClientData{
Client: data.Client,
Size: uint64(n),
Data: buff[:n],
Size: uint64(n),
Data: buff[:n],
},
})
}
@ -172,7 +185,7 @@ func (client *Client) Backgroud() (err error) {
client.NewUDPClient <- toAgent // send to Accept
go func() {
for {
buff := make([]byte, proto.DataSize)
buff := make([]byte, client.RequestBuffer)
n, err := toClient.Read(buff)
if err != nil {
if err == io.EOF {
@ -183,11 +196,17 @@ func (client *Client) Backgroud() (err error) {
}
continue
} else {
if client.RequestBuffer-uint64(n) == 0 {
var req proto.Request
req.ResizeBuffer = new(uint64)
*req.ResizeBuffer = uint64(n)
go client.Send(req)
}
go client.Send(proto.Request{
DataTX: &proto.ClientData{
Client: data.Client,
Size: uint64(n),
Data: buff[:n],
Size: uint64(n),
Data: buff[:n],
},
})
}
@ -211,4 +230,4 @@ func (client *Client) Backgroud() (err error) {
}
}
return nil
}
}

View File

@ -8,6 +8,7 @@ import (
"net/netip"
"os"
"os/signal"
"time"
"sirherobrine23.org/Minecraft-Server/go-pproxit/client"
"sirherobrine23.org/Minecraft-Server/go-pproxit/server"
@ -20,6 +21,7 @@ func main() {
var port uint16 = 5522
server := server.NewServer(nil)
go server.Listen(port)
time.Sleep(time.Second)
fmt.Printf("Server listen on :%d\n", port)
go func() {

View File

@ -8,7 +8,7 @@ import (
)
type UdpListerner struct {
MTU uint64
MTU func() uint64
udpConn *net.UDPConn
clients map[string]net.Conn
newClient chan any
@ -39,7 +39,7 @@ func (udpConn UdpListerner) Accept() (net.Conn, error) {
func (udp *UdpListerner) backgroud() {
for {
buffer := make([]byte, udp.MTU)
buffer := make([]byte, udp.MTU())
n, from, err := udp.udpConn.ReadFromUDP(buffer)
if err != nil {
udp.newClient <- err // Send to accept error
@ -63,7 +63,7 @@ func (udp *UdpListerner) backgroud() {
go func() {
toListener.Write(buffer[:n]) // Write buffer to new pipe
for {
buffer := make([]byte, udp.MTU)
buffer := make([]byte, udp.MTU())
n, err := toListener.Read(buffer)
if err != nil {
toListener.Close()
@ -76,7 +76,7 @@ func (udp *UdpListerner) backgroud() {
}
}
func Listen(UdpProto string, Address netip.AddrPort, MTU uint64) (net.Listener, error) {
func Listen(UdpProto string, Address netip.AddrPort, MTU func() uint64) (net.Listener, error) {
conn, err := net.ListenUDP(UdpProto, net.UDPAddrFromAddrPort(Address))
if err != nil {
return nil, err

View File

@ -14,8 +14,9 @@ const (
ProtoUDP uint8 = 2 // UDP Protocol
ProtoBoth uint8 = 3 // TCP+UDP Protocol
DataSize uint64 = 10_000 // Default listener data recive and send
PacketSize uint64 = DataSize + 800 // Packet to send and recive on controller
DataSize uint64 = 10_000 // Default listener data recive and send
PacketSize uint64 = 800 // Packet to without data only requests and response headers
PacketDataSize uint64 = DataSize + PacketSize // Header and Data request/response
)
var (

View File

@ -14,6 +14,7 @@ const (
ReqPing uint64 = 2 // Time ping
ReqCloseClient uint64 = 3 // Close client
ReqClientData uint64 = 4 // Send data
ReqResize uint64 = 5 // Resize request buffer
)
var (
@ -37,10 +38,11 @@ func (agent *AgentAuth) Reader(r io.Reader) error {
// Send request to agent and wait response
type Request struct {
AgentAuth *AgentAuth // Send agent authentication to controller
Ping *time.Time // Send ping time to controller in unix milliseconds
ClientClose *Client // Close client in controller
DataTX *ClientData // Recive data from agent
AgentAuth *AgentAuth // Send agent authentication to controller
Ping *time.Time // Send ping time to controller in unix milliseconds
ClientClose *Client // Close client in controller
DataTX *ClientData // Recive data from agent
ResizeBuffer *uint64 // Resize request buffer
}
// Get Bytes from Request
@ -73,6 +75,11 @@ func (req Request) Writer(w io.Writer) error {
return err
}
return data.Writer(w)
} else if req.ResizeBuffer != nil {
if err := bigendian.WriteUint64(w, ReqResize); err != nil {
return err
}
return bigendian.WriteUint64(w, *req.ResizeBuffer)
}
return ErrInvalidBody
}
@ -98,6 +105,10 @@ func (req *Request) Reader(r io.Reader) (err error) {
} else if reqID == ReqClientData {
req.DataTX = new(ClientData)
return req.DataTX.Reader(r)
} else if reqID == ReqResize {
req.ResizeBuffer = new(uint64)
*req.ResizeBuffer, err = bigendian.ReadUint64(r)
return
}
return ErrInvalidBody
}

View File

@ -12,12 +12,12 @@ import (
const (
ResUnauthorized uint64 = 1 // Request not processed and ignored
ResBadRequest uint64 = 2 // Request cannot process and ignored
ResNewClient uint64 = 3 // New client
ResCloseClient uint64 = 4 // Controller closed connection
ResClientData uint64 = 5 // Controller accepted data
ResSendAuth uint64 = 6 // Send token to controller
ResAgentInfo uint64 = 7 // Agent info
ResPong uint64 = 8 // Ping response
ResCloseClient uint64 = 3 // Controller closed connection
ResClientData uint64 = 4 // Controller accepted data
ResSendAuth uint64 = 5 // Send token to controller
ResAgentInfo uint64 = 6 // Agent info
ResPong uint64 = 7 // Ping response
ResResize uint64 = 8 // Resize buffer size
)
type AgentInfo struct {
@ -87,10 +87,10 @@ type Response struct {
BadRequest bool // Controller accepted packet so cannot process Request
SendAuth bool // Send Agent token
AgentInfo *AgentInfo // Agent Info
Pong *time.Time // ping response
AgentInfo *AgentInfo // Agent Info
Pong *time.Time // ping response
ResizeBuffer *uint64 // Resize Agent response
// NewClient *Client // Controller Accepted client
CloseClient *Client // Controller end client
DataRX *ClientData // Controller recive data from client
}
@ -116,11 +116,6 @@ func (res Response) Writer(w io.Writer) error {
return err
}
return bigendian.WriteInt64(w, pong.UnixMilli())
// } else if newClient := res.NewClient; newClient != nil {
// if err := bigendian.WriteUint64(w, ResNewClient); err != nil {
// return err
// }
// return newClient.Writer(w)
} else if closeClient := res.CloseClient; closeClient != nil {
if err := bigendian.WriteUint64(w, ResCloseClient); err != nil {
return err
@ -136,6 +131,11 @@ func (res Response) Writer(w io.Writer) error {
return err
}
return info.Writer(w)
} else if res.ResizeBuffer != nil {
if err := bigendian.WriteUint64(w, ResResize); err != nil {
return err
}
return bigendian.WriteUint64(w, *res.ResizeBuffer)
}
return ErrInvalidBody
}
@ -154,9 +154,6 @@ func (res *Response) Reader(r io.Reader) error {
} else if resID == ResSendAuth {
res.SendAuth = true
return nil
// } else if resID == ResNewClient {
// res.NewClient = new(Client)
// return res.NewClient.Reader(r)
} else if resID == ResCloseClient {
res.CloseClient = new(Client)
return res.CloseClient.Reader(r)
@ -174,6 +171,12 @@ func (res *Response) Reader(r io.Reader) error {
res.Pong = new(time.Time)
*res.Pong = time.UnixMilli(unixMil)
return nil
} else if resID == ResResize {
var err error
res.ResizeBuffer = new(uint64)
if *res.ResizeBuffer, err = bigendian.ReadUint64(r); err != nil {
return err
}
}
return ErrInvalidBody

View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"io"
"log"
"net"
"net/netip"
"time"
@ -22,13 +23,14 @@ type TunnelInfo struct {
}
type Tunnel struct {
Token [36]byte // Agent Token
Authenticated bool // Agent Authenticated and avaible to recive/transmiter data
UDPListener net.Listener // Accept connections from UDP Clients
TCPListener net.Listener // Accept connections from TCP Clients
UDPClients map[string]net.Conn // Current clients connected in UDP Socket
TCPClients map[string]net.Conn // Current clients connected in TCP Socket
SendToAgent chan proto.Response // Send data to agent
Token [36]byte // Agent Token
Authenticated bool // Agent Authenticated and avaible to recive/transmiter data
ResponseBuffer uint64 // Send Reponse size
UDPListener net.Listener // Accept connections from UDP Clients
TCPListener net.Listener // Accept connections from TCP Clients
UDPClients map[string]net.Conn // Current clients connected in UDP Socket
TCPClients map[string]net.Conn // Current clients connected in TCP Socket
SendToAgent chan proto.Response // Send data to agent
}
// Interface to server accept and reject agents sessions
@ -67,8 +69,9 @@ func (d DefaultCall) AgentInfo(Token [36]byte) (TunnelInfo, error) {
}
type Server struct {
Tunnels map[string]Tunnel // Tunnels listened
ServerCalls ServerCalls // Server call to auth and more
RequestBuffer uint64 // Request Buffer
Tunnels map[string]Tunnel // Tunnels listened
ServerCalls ServerCalls // Server call to auth and more
}
// Create new server struct
@ -79,8 +82,9 @@ func NewServer(Calls ServerCalls) Server {
Calls = DefaultCall{}
}
return Server{
Tunnels: make(map[string]Tunnel),
ServerCalls: Calls,
RequestBuffer: proto.DataSize,
ServerCalls: Calls,
Tunnels: make(map[string]Tunnel),
}
}
@ -114,16 +118,10 @@ func (tun *Tunnel) UDPAccepts() {
}
clientAddr := netip.MustParseAddrPort(conn.RemoteAddr().String())
tun.UDPClients[conn.RemoteAddr().String()] = conn
// tun.SendToAgent <- proto.Response{
// NewClient: &proto.Client{
// Client: clientAddr,
// Proto: proto.ProtoUDP,
// },
// }
go func() {
for {
buff := make([]byte, proto.DataSize)
buff := make([]byte, tun.ResponseBuffer)
n, err := conn.Read(buff)
if err != nil {
go conn.Close()
@ -135,6 +133,14 @@ func (tun *Tunnel) UDPAccepts() {
}
break
}
if tun.ResponseBuffer-uint64(n) == 0 {
tun.ResponseBuffer += 500
res := proto.Response{}
res.ResizeBuffer = new(uint64)
*res.ResizeBuffer = tun.ResponseBuffer
tun.SendToAgent <- res
<-time.After(time.Microsecond)
}
tun.SendToAgent <- proto.Response{
DataRX: &proto.ClientData{
Size: uint64(n),
@ -160,16 +166,9 @@ func (tun *Tunnel) TCPAccepts() {
}
clientAddr := netip.MustParseAddrPort(conn.RemoteAddr().String())
tun.TCPClients[conn.RemoteAddr().String()] = conn
// tun.SendToAgent <- proto.Response{
// NewClient: &proto.Client{
// Client: clientAddr,
// Proto: proto.ProtoTCP,
// },
// }
go func() {
for {
buff := make([]byte, proto.DataSize)
buff := make([]byte, tun.ResponseBuffer)
n, err := conn.Read(buff)
if err != nil {
go conn.Close()
@ -181,6 +180,14 @@ func (tun *Tunnel) TCPAccepts() {
}
break
}
if tun.ResponseBuffer-uint64(n) == 0 {
tun.ResponseBuffer += 500
res := proto.Response{}
res.ResizeBuffer = new(uint64)
*res.ResizeBuffer = tun.ResponseBuffer
tun.SendToAgent <- res
<-time.After(time.Microsecond)
}
tun.SendToAgent <- proto.Response{
DataRX: &proto.ClientData{
Size: uint64(n),
@ -246,7 +253,8 @@ func (server *Server) Listen(ControllerPort uint16) (err error) {
var res proto.Response
var readSize int
var addr netip.AddrPort
buffer := make([]byte, proto.PacketSize)
log.Println("waiting to request")
buffer := make([]byte, proto.PacketSize+server.RequestBuffer)
if readSize, addr, err = conn.ReadFromUDPAddrPort(buffer); err != nil {
if err == io.EOF {
break // End controller
@ -336,7 +344,7 @@ func (server *Server) Listen(ControllerPort uint16) (err error) {
go tun.TCPAccepts() // Make accepts new requests
}
if info.Proto == 3 || info.Proto == 2 {
tun.UDPListener, err = udplisterner.Listen("udp", netip.AddrPortFrom(netip.IPv4Unspecified(), info.PortListen), proto.DataSize)
tun.UDPListener, err = udplisterner.Listen("udp", netip.AddrPortFrom(netip.IPv4Unspecified(), info.PortListen), func() uint64 {return server.RequestBuffer})
if err != nil {
if tun.TCPListener != nil {
tun.TCPListener.Close()