WIP: Update struct decode and encode #2

Draft
Sirherobrine23 wants to merge 13 commits from struct-serealize into main
8 changed files with 160 additions and 41 deletions
Showing only changes of commit a0ee563541 - Show all commits

View File

@@ -76,13 +76,13 @@ func (client *Client) Setup() error {
} }
type toWr struct { type toWr struct {
Proto uint8 Proto proto.Protoc
To netip.AddrPort To netip.AddrPort
tun *Client tun *Client
} }
func (t toWr) Write(w []byte) (int, error) { func (t toWr) Write(w []byte) (int, error) {
err := structcode.NewEncode(t.tun.Conn, proto.Request{ data := proto.Request{
DataTX: &proto.ClientData{ DataTX: &proto.ClientData{
Data: w, Data: w,
Client: proto.Client{ Client: proto.Client{
@@ -90,21 +90,23 @@ func (t toWr) Write(w []byte) (int, error) {
Proto: t.Proto, Proto: t.Proto,
}, },
}, },
})
if err == nil {
return len(w), nil
} }
d, _ := json.Marshal(data)
fmt.Println(string(d))
if err := structcode.NewEncode(t.tun.Conn, data); err != nil {
return 0, err return 0, err
} }
return len(w), nil
}
func (tun *Client) GetTargetWrite(Proto uint8, To netip.AddrPort) io.Writer { func (tun *Client) GetTargetWrite(Proto proto.Protoc, To netip.AddrPort) io.Writer {
return &toWr{Proto: Proto, To: To, tun: tun} return &toWr{Proto: Proto, To: To, tun: tun}
} }
func (client *Client) handlers() { func (client *Client) handlers() {
var lastPing int64 = 0 var lastPing time.Time = time.Date(0, 0, 0, 0, 0, 0, 0, time.Local)
for { for {
if time.Now().UnixMilli()-lastPing > 3_000 { if time.Now().UnixMilli()-lastPing.UnixMilli() > 3_000_000 {
var req proto.Request var req proto.Request
req.Ping = new(time.Time) req.Ping = new(time.Time)
*req.Ping = time.Now() *req.Ping = time.Now()
@@ -125,7 +127,7 @@ func (client *Client) handlers() {
fmt.Println(string(d)) fmt.Println(string(d))
if res.Pong != nil { if res.Pong != nil {
lastPing = res.Pong.UnixMilli() lastPing = *res.Pong
continue continue
} }
if res.Unauthorized || res.NotListened { if res.Unauthorized || res.NotListened {

View File

@@ -3,9 +3,11 @@ package server
import ( import (
"fmt" "fmt"
"net/netip" "net/netip"
"sync"
"time" "time"
_ "modernc.org/sqlite" _ "modernc.org/sqlite"
"sirherobrine23.org/Minecraft-Server/go-pproxit/proto"
"sirherobrine23.org/Minecraft-Server/go-pproxit/server" "sirherobrine23.org/Minecraft-Server/go-pproxit/server"
"xorm.io/xorm" "xorm.io/xorm"
"xorm.io/xorm/names" "xorm.io/xorm/names"
@@ -13,6 +15,7 @@ import (
type serverCalls struct { type serverCalls struct {
XormEngine *xorm.Engine XormEngine *xorm.Engine
Locker sync.Locker
} }
type User struct { type User struct {
@@ -28,7 +31,7 @@ type Tun struct {
ID int64 `xorm:"pk"` // Tunnel ID ID int64 `xorm:"pk"` // Tunnel ID
User int64 `xorm:"notnull"` // Agent ID User int64 `xorm:"notnull"` // Agent ID
Token []byte `xorm:"blob notnull unique"` // Tunnel Token Token []byte `xorm:"blob notnull unique"` // Tunnel Token
Proto uint8 `xorm:"default 3"` // Proto accept Proto proto.Protoc `xorm:"default 3"` // Proto accept
TPCListen uint16 // Port listen TCP agent TPCListen uint16 // Port listen TCP agent
UDPListen uint16 // Port listen UDP agent UDPListen uint16 // Port listen UDP agent
} }
@@ -53,7 +56,7 @@ type RTX struct {
Client netip.AddrPort Client netip.AddrPort
TXSize int TXSize int
RXSize int RXSize int
Proto uint8 Proto proto.Protoc
} }
func NewCall(DBConn string) (call *serverCalls, err error) { func NewCall(DBConn string) (call *serverCalls, err error) {
@@ -61,6 +64,7 @@ func NewCall(DBConn string) (call *serverCalls, err error) {
if call.XormEngine, err = xorm.NewEngine("sqlite", DBConn); err != nil { if call.XormEngine, err = xorm.NewEngine("sqlite", DBConn); err != nil {
return return
} }
call.Locker = &sync.Mutex{}
call.XormEngine.SetMapper(names.SameMapper{}) call.XormEngine.SetMapper(names.SameMapper{})
session := call.XormEngine.NewSession() session := call.XormEngine.NewSession()
defer session.Close() defer session.Close()
@@ -75,11 +79,13 @@ func NewCall(DBConn string) (call *serverCalls, err error) {
type TunCallbcks struct { type TunCallbcks struct {
tunID int64 tunID int64
XormEngine *xorm.Engine XormEngine *xorm.Engine
Locker sync.Locker
} }
func (tun *TunCallbcks) AgentShutdown(onTime time.Time) {} func (*TunCallbcks) AgentShutdown(onTime time.Time) {}
func (tun *TunCallbcks) BlockedAddr(AddrPort string) bool { func (tun *TunCallbcks) BlockedAddr(AddrPort string) bool {
tun.Locker.Lock()
defer tun.Locker.Unlock()
var addr = AddrBlocked{Address: AddrPort, TunID: tun.tunID} var addr = AddrBlocked{Address: AddrPort, TunID: tun.tunID}
ok, err := tun.XormEngine.Get(&addr) ok, err := tun.XormEngine.Get(&addr)
if err != nil { if err != nil {
@@ -102,6 +108,8 @@ func (tun *TunCallbcks) BlockedAddr(AddrPort string) bool {
} }
func (tun *TunCallbcks) AgentPing(agent, server time.Time) { func (tun *TunCallbcks) AgentPing(agent, server time.Time) {
tun.Locker.Lock()
defer tun.Locker.Unlock()
c, _ := tun.XormEngine.Count(Ping{}) c, _ := tun.XormEngine.Count(Ping{})
tun.XormEngine.InsertOne(&Ping{ tun.XormEngine.InsertOne(&Ping{
ID: c, ID: c,
@@ -111,7 +119,9 @@ func (tun *TunCallbcks) AgentPing(agent, server time.Time) {
}) })
} }
func (tun *TunCallbcks) RegisterRX(client netip.AddrPort, Size int, Proto uint8) { func (tun *TunCallbcks) RegisterRX(client netip.AddrPort, Size int, Proto proto.Protoc) {
tun.Locker.Lock()
defer tun.Locker.Unlock()
tun.XormEngine.InsertOne(&RTX{ tun.XormEngine.InsertOne(&RTX{
TunID: tun.tunID, TunID: tun.tunID,
Client: client, Client: client,
@@ -120,7 +130,9 @@ func (tun *TunCallbcks) RegisterRX(client netip.AddrPort, Size int, Proto uint8)
TXSize: 0, TXSize: 0,
}) })
} }
func (tun *TunCallbcks) RegisterTX(client netip.AddrPort, Size int, Proto uint8) { func (tun *TunCallbcks) RegisterTX(client netip.AddrPort, Size int, Proto proto.Protoc) {
tun.Locker.Lock()
defer tun.Locker.Unlock()
tun.XormEngine.InsertOne(&RTX{ tun.XormEngine.InsertOne(&RTX{
TunID: tun.tunID, TunID: tun.tunID,
Client: client, Client: client,
@@ -142,6 +154,6 @@ func (caller *serverCalls) AgentAuthentication(Token []byte) (server.TunnelInfo,
Proto: tun.Proto, Proto: tun.Proto,
TCPPort: tun.TPCListen, TCPPort: tun.TPCListen,
UDPPort: tun.UDPListen, UDPPort: tun.UDPListen,
Callbacks: &TunCallbcks{tunID: tun.ID, XormEngine: caller.XormEngine}, Callbacks: &TunCallbcks{tunID: tun.ID, XormEngine: caller.XormEngine, Locker: caller.Locker},
}, nil }, nil
} }

View File

@@ -64,6 +64,22 @@ func decodeRecursive(r io.Reader, reflectValue reflect.Value) error {
} }
reflectValue.Set(reflect.ValueOf(data).Elem()) reflectValue.Set(reflect.ValueOf(data).Elem())
case reflect.Interface: case reflect.Interface:
case reflect.Map:
mapTypeof := reflectValue.Type()
reflectValue.Set(reflect.MakeMap(mapTypeof))
var size uint64
if err := binary.Read(r, binary.BigEndian, &size); err != nil {
return err
}
for range size {
key, value := reflect.New(mapTypeof.Key()).Elem(), reflect.New(mapTypeof.Elem()).Elem()
if err := decodeRecursive(r, key); err != nil {
return err
} else if err := decodeRecursive(r, value); err != nil {
return err
}
reflectValue.SetMapIndex(key, value)
}
case reflect.Struct: case reflect.Struct:
if ok, err := decodeTypeof(r, reflectValue); ok { if ok, err := decodeTypeof(r, reflectValue); ok {
return err return err

View File

@@ -39,6 +39,18 @@ func encodeRecursive(w io.Writer, reflectValue reflect.Value) error {
case reflect.Bool, reflect.Float32, reflect.Float64, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: case reflect.Bool, reflect.Float32, reflect.Float64, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
return binary.Write(w, binary.BigEndian, reflectValue.Interface()) return binary.Write(w, binary.BigEndian, reflectValue.Interface())
case reflect.Interface: case reflect.Interface:
case reflect.Map:
if err := binary.Write(w, binary.BigEndian, uint64(reflectValue.Len())); err != nil {
return err
}
inter := reflectValue.MapRange()
for inter.Next() {
if err := encodeRecursive(w, inter.Key()); err != nil {
return err
} else if err := encodeRecursive(w, inter.Value()); err != nil {
return err
}
}
case reflect.Struct: case reflect.Struct:
if ok, err := encodeTypeof(w, reflectValue); ok { if ok, err := encodeTypeof(w, reflectValue); ok {
return err return err

View File

@@ -2,8 +2,11 @@ package structcode
import ( import (
"bytes" "bytes"
"crypto/rand"
"encoding/hex" "encoding/hex"
"fmt"
"io" "io"
mathrand "math/rand/v2"
"net/netip" "net/netip"
"sync" "sync"
"testing" "testing"
@@ -12,7 +15,67 @@ import (
"sirherobrine23.org/Minecraft-Server/go-pproxit/proto" "sirherobrine23.org/Minecraft-Server/go-pproxit/proto"
) )
type mapTest map[string]mapStr
type mapStr struct {
Text string
Blob []byte
}
func randomBuff(size int) []byte {
buff := make([]byte, size)
_, err := rand.Read(buff)
if err != nil {
panic(err)
}
return buff
}
func TestSerelelize(t *testing.T) { func TestSerelelize(t *testing.T) {
t.Run("Map", func(t *testing.T) {
var err error
var waiter sync.WaitGroup
var enc, dec mapTest
enc = mapTest{}
enc["Test"] = mapStr{"Golang is best", []byte{5, 14, 22, 13}}
for i := range mathrand.IntN(20) {
enc["Rand"+fmt.Sprint(i)] = mapStr{string(randomBuff(14)), randomBuff(64)}
}
waiter.Add(2)
r, w := io.Pipe()
go func() {
defer waiter.Done()
if err = NewDecode(r, &dec); err != nil {
t.Error(err)
return
}
}()
go func() {
defer waiter.Done()
if err = NewEncode(w, enc); err != nil {
t.Error(err)
return
}
}()
waiter.Wait()
if err != nil {
return
}
for k, v := range enc {
if d, ok := dec[k]; ok {
if v.Text != d.Text {
t.Errorf("text from decode not exists or mismatch (%q), Encode %q, Decode %q", k, v.Text, d.Text)
return
} else if !bytes.Equal(v.Blob, d.Blob) {
t.Errorf("blob from decode not exists or mismatch (%q), Encode %s, Decode %s", k, hex.EncodeToString(v.Blob), hex.EncodeToString(d.Blob))
return
}
continue
}
t.Errorf("key not exists in decode (%q)", k)
return
}
})
t.Run("Response", func(t *testing.T) { t.Run("Response", func(t *testing.T) {
var err error var err error
var encodeRes, decodeRes proto.Response var encodeRes, decodeRes proto.Response

View File

@@ -6,7 +6,7 @@ import (
) )
const ( const (
ProtoBoth uint8 = iota // TCP+UDP Protocol ProtoBoth Protoc = iota // TCP+UDP Protocol
ProtoTCP // TCP Protocol ProtoTCP // TCP Protocol
ProtoUDP // UDP Protocol ProtoUDP // UDP Protocol
@@ -15,13 +15,27 @@ const (
PacketDataSize uint64 = DataSize + PacketSize // Header and Data request/response PacketDataSize uint64 = DataSize + PacketSize // Header and Data request/response
) )
var ( var ErrInvalidBody error = errors.New("invalid body, check request/response")
ErrInvalidBody error = errors.New("invalid body, check request/response")
) type Protoc uint8 // Net protocol support
func (pr Protoc) MarshalText() ([]byte, error) { return []byte(pr.String()), nil }
func (pr Protoc) String() string {
switch pr {
case ProtoBoth:
return "TCP + UDP"
case ProtoTCP:
return "TCP"
case ProtoUDP:
return "UDP"
default:
return "Invalid proto"
}
}
type Client struct { type Client struct {
Client netip.AddrPort // Client address and port Client netip.AddrPort // Client address and port
Proto uint8 // Protocol to close (proto.ProtoTCP, proto.ProtoUDP or proto.ProtoBoth) Proto Protoc // Protocol to close (proto.ProtoTCP, proto.ProtoUDP or proto.ProtoBoth)
} }
type ClientData struct { type ClientData struct {

View File

@@ -6,7 +6,7 @@ import (
) )
type AgentInfo struct { type AgentInfo struct {
Protocol uint8 // Proto supported (proto.ProtoTCP, proto.ProtoUDP or proto.ProtoBoth) Protocol Protoc // Proto supported (proto.ProtoTCP, proto.ProtoUDP or proto.ProtoBoth)
UDPPort, TCPPort uint16 // Controller port listened UDPPort, TCPPort uint16 // Controller port listened
AddrPort netip.AddrPort // request address and port AddrPort netip.AddrPort // request address and port
} }

View File

@@ -18,12 +18,12 @@ type TunnelCall interface {
BlockedAddr(AddrPort string) bool // Ignore request from this address BlockedAddr(AddrPort string) bool // Ignore request from this address
AgentPing(agent, server time.Time) // Register ping to Agent AgentPing(agent, server time.Time) // Register ping to Agent
AgentShutdown(onTime time.Time) // Agend end connection AgentShutdown(onTime time.Time) // Agend end connection
RegisterRX(client netip.AddrPort, Size int, Proto uint8) // Register Recived data from client RegisterRX(client netip.AddrPort, Size int, Proto proto.Protoc) // Register Recived data from client
RegisterTX(client netip.AddrPort, Size int, Proto uint8) // Register Transmitted data from client RegisterTX(client netip.AddrPort, Size int, Proto proto.Protoc) // Register Transmitted data from client
} }
type TunnelInfo struct { type TunnelInfo struct {
Proto uint8 // Protocol listen tunnel, use proto.ProtoTCP, proto.ProtoUDP or proto.ProtoBoth Proto proto.Protoc // Protocol listen tunnel, use proto.ProtoTCP, proto.ProtoUDP or proto.ProtoBoth
UDPPort, TCPPort uint16 // Port to Listen UDP and TCP listeners UDPPort, TCPPort uint16 // Port to Listen UDP and TCP listeners
Callbacks TunnelCall // Tunnel Callbacks Callbacks TunnelCall // Tunnel Callbacks
} }
@@ -65,7 +65,7 @@ func (tun *Tunnel) send(res proto.Response) error {
} }
type toWr struct { type toWr struct {
Proto uint8 Proto proto.Protoc
To netip.AddrPort To netip.AddrPort
tun *Tunnel tun *Tunnel
} }
@@ -87,7 +87,7 @@ func (t toWr) Write(w []byte) (int, error) {
return 0, err return 0, err
} }
func (tun *Tunnel) GetTargetWrite(Proto uint8, To netip.AddrPort) io.Writer { func (tun *Tunnel) GetTargetWrite(Proto proto.Protoc, To netip.AddrPort) io.Writer {
return &toWr{Proto: Proto, To: To, tun: tun} return &toWr{Proto: Proto, To: To, tun: tun}
} }
@@ -124,7 +124,7 @@ func (tun *Tunnel) Setup() {
fmt.Fprintln(os.Stderr, err.Error()) fmt.Fprintln(os.Stderr, err.Error())
return return
} }
d, _ := json.MarshalIndent(req, "", " ") d, _ := json.Marshal(req)
fmt.Println(string(d)) fmt.Println(string(d))
if req.AgentAuth != nil { if req.AgentAuth != nil {