diff --git a/.gitignore b/.gitignore index adf8f72..893e98f 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ # Go workspace file go.work +main.go +*.log diff --git a/api/api.go b/api/api.go index 0358c35..3c17d30 100644 --- a/api/api.go +++ b/api/api.go @@ -1,7 +1,17 @@ package api +import ( + "log" + "os" + "runtime" +) + +var NullFile, _ = os.Open(os.DevNull) +var debug = log.New(NullFile, "api.playit.gg: ", log.Ldate) + const ( GoPlayitVersion string = "0.17.1" + PlayitAPI string = "https://api.playit.gg" // Playit API TunnelTypeMCBedrock string = "minecraft-bedrock" // Minecraft Bedrock server TunnelTypeMCJava string = "minecraft-java" // Minecraft java server @@ -26,12 +36,6 @@ const ( ) var ( - PlayitAPI string = "https://api.playit.gg" // Playit API - PortType []string = []string{ - PortTypeBoth, - PortTypeTcp, - PortTypeUdp, - } // Tunnel protocol supports TunnelType []string = []string{ TunnelTypeMCBedrock, TunnelTypeMCJava, @@ -57,3 +61,69 @@ type Api struct { Code string // Claim code Secret string // Agent Secret } + +type PortProto string + +func (proto PortProto) IsValid() bool { + switch proto { + case PortProto(PortTypeBoth): + case PortProto(PortTypeTcp): + case PortProto(PortTypeUdp): + return true + } + return false +} +func (proto PortProto) SetBoth() { + proto = "both" +} +func (proto PortProto) SetTcp() { + proto = "tcp" +} +func (proto PortProto) SetUdp() { + proto = "udp" +} + +type Platform string + +func (Platform Platform) Host() { + switch runtime.GOOS { + case "linux": + Platform = "linux" + case "freebsd": + Platform = "freebsd" + case "windows": + Platform = "windows" + case "darwin": + Platform = "macos" + case "android": + Platform = "android" + case "ios": + Platform = "ios" + default: + Platform = "unknown" + } +} +func (Platform Platform) Linux() { + Platform = "linux" +} +func (Platform Platform) Freebsd() { + Platform = "freebsd" +} +func (Platform Platform) Windows() { + Platform = "windows" +} +func (Platform Platform) Macos() { + Platform = "macos" +} +func (Platform Platform) Android() { + Platform = "android" +} +func (Platform Platform) Ios() { + Platform = "ios" +} +func (Platform Platform) MinecraftPlugin() { + Platform = "minecraft-plugin" +} +func (Platform Platform) Unknown() { + Platform = "unknown" +} diff --git a/api/claim.go b/api/claim.go index 4988200..d808763 100644 --- a/api/claim.go +++ b/api/claim.go @@ -12,7 +12,7 @@ import ( ) var ( - ClaimAgents []string = []string{ + ClaimAgents []string = []string{ "default", "assignable", "self-managed", } ) diff --git a/api/others.go b/api/others.go index c5f4c3b..c0ba424 100644 --- a/api/others.go +++ b/api/others.go @@ -3,7 +3,7 @@ package api import ( "bytes" "encoding/json" - "net" + "fmt" "net/netip" "github.com/google/uuid" @@ -15,24 +15,38 @@ type PortRange struct { } type AgentTunnel struct { - ID uuid.UUID `json:"id"` - Name string `json:"name"` - IpNum uint16 `json:"ip_num"` - RegionNum uint16 `json:"region_num"` - Port PortRange `json:"port"` - Proto string `json:"proto"` - LocalIp net.IP `json:"local_ip"` - LocalPort uint16 `json:"local_port"` - TunnelType string `json:"tunnel_type"` - AssignedDomain string `json:"assigned_domain"` - CustomDomain string `json:"custom_domain"` - Disabled *any `json:"disabled"` + ID uuid.UUID `json:"id"` + Name string `json:"name"` + IpNum uint16 `json:"ip_num"` + RegionNum uint16 `json:"region_num"` + Port PortRange `json:"port"` + Proto string `json:"proto"` + LocalIp netip.Addr `json:"local_ip"` + LocalPort uint16 `json:"local_port"` + TunnelType string `json:"tunnel_type"` + AssignedDomain string `json:"assigned_domain"` + CustomDomain string `json:"custom_domain"` + Disabled *any `json:"disabled"` +} + +func (tun *AgentTunnel) DestString() string { + return fmt.Sprintf("%s:%d", tun.LocalIp, tun.LocalPort) +} +func (tun *AgentTunnel) SourceString() string { + var addr string + if addr = tun.CustomDomain; addr == "" { + addr = tun.AssignedDomain + } + if tun.TunnelType == "minecraft-java" { + return addr + } + return fmt.Sprintf("%s:%d", addr, tun.Port.From) } type AgentPendingTunnel struct { ID uuid.UUID `json:"id"` // Agent ID Name string `json:"name"` // Agent Name - PortType string `json:"proto"` // Port type + PortType PortProto `json:"proto"` // Port type PortCount uint16 `json:"port_count"` // Port count TunnelType string `json:"tunnel_type"` // Tunnel type Disabled bool `json:"is_disabled"` // Tunnel is disabled diff --git a/api/request.go b/api/request.go index 6b7be2e..5d08847 100644 --- a/api/request.go +++ b/api/request.go @@ -17,6 +17,18 @@ func recodeJson(from, to any) error { return nil } +func prettyJSON(from string) string { + var data any + if err := json.Unmarshal([]byte(from), &data); err != nil { + return from + } + marData, err := json.MarshalIndent(data, "", " ") + if err != nil { + return from + } + return string(marData) +} + func (w *Api) requestToApi(Path string, Body io.Reader, Response any, Headers map[string]string) (*http.Response, error) { req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", PlayitAPI, Path), Body) if err != nil { @@ -38,30 +50,38 @@ func (w *Api) requestToApi(Path string, Body io.Reader, Response any, Headers ma return nil, err } defer res.Body.Close() + data, err := io.ReadAll(res.Body) + if err != nil { + return res, err + } + debug.Printf("(%q %d): %s\n", Path, res.StatusCode, prettyJSON(string(data))) var ResBody struct { + Error string `json:"error"` Status string `json:"status"` Data any `json:"data"` } - if err = json.NewDecoder(res.Body).Decode(&ResBody); err != nil { + if err = json.Unmarshal(data, &ResBody); err != nil { return res, err } - if res.StatusCode >= 300 { + if ResBody.Error != "" { + return res, fmt.Errorf("api.playit.gg: %s", ResBody.Error) + } var errStatus struct { Type string `json:"type"` Message string `json:"message"` } if data, is := ResBody.Data.(string); is { - return res, fmt.Errorf(data) + return res, fmt.Errorf("api.playit.gg: %s", data) } else if err = recodeJson(&ResBody.Data, &errStatus); err != nil { return res, err } if len(errStatus.Message) > 0 { - return res, fmt.Errorf("%s: %s", errStatus.Type, errStatus.Message) + return res, fmt.Errorf("api.playit.gg: %s %s", errStatus.Type, errStatus.Message) } - return res, fmt.Errorf("%s", errStatus.Type) + return res, fmt.Errorf("api.playit.gg: %s", errStatus.Type) } if Response != nil { if err = recodeJson(&ResBody.Data, Response); err != nil { diff --git a/api/tunnels.go b/api/tunnels.go index 71742f6..9c58dca 100644 --- a/api/tunnels.go +++ b/api/tunnels.go @@ -54,23 +54,26 @@ type UseAllocPortAlloc struct { type UseRegion struct { Region string `json:"region"` } -/** + +/* +* "status": "allocated", -"data": { - "assigned_domain": "going-scales.gl.at.ply.gg", - "assigned_srv": null, - "assignment": { - "type": "shared-ip" - }, - "id": "f667b538-0294-4817-9332-5cba5e94d79e", - "ip_hostname": "19.ip.gl.ply.gg", - "ip_type": "both", - "port_end": 49913, - "port_start": 49912, - "region": "global", - "static_ip4": "147.185.221.19", - "tunnel_ip": "2602:fbaf:0:1::13" -} + + "data": { + "assigned_domain": "going-scales.gl.at.ply.gg", + "assigned_srv": null, + "assignment": { + "type": "shared-ip" + }, + "id": "f667b538-0294-4817-9332-5cba5e94d79e", + "ip_hostname": "19.ip.gl.ply.gg", + "ip_type": "both", + "port_end": 49913, + "port_start": 49912, + "region": "global", + "static_ip4": "147.185.221.19", + "tunnel_ip": "2602:fbaf:0:1::13" + } */ type TunnelCreateUseAllocation struct { Status string `json:"status"` // For tunnel list @@ -99,7 +102,7 @@ type Tunnel struct { ID *uuid.UUID `json:"tunnel_id,omitempty"` // Tunnel UUID Name string `json:"name,omitempty"` // Tunnel name TunnelType string `json:"tunnel_type,omitempty"` // Tunnel type from TunnelType const's - PortType string `json:"port_type"` // tcp, udp or both + PortType PortProto `json:"port_type"` // tcp, udp or both PortCount uint16 `json:"port_count"` // Port count to assign to connect Origin TunnelOriginCreate `json:"origin"` Enabled bool `json:"enabled"` @@ -167,14 +170,14 @@ func (w *Api) DeleteTunnel(TunnelID *uuid.UUID) error { } type AccountTunnel struct { - ID uuid.UUID `json:"id"` - TunnelType string `json:"tunnel_type"` - CreatedAt time.Time `json:"created_at"` - Name string `json:"name"` - PortType string `json:"port_type"` - PortCount int32 `json:"port_count"` - Alloc TunnelCreateUseAllocation `json:"alloc"` - Origin TunnelOriginCreate `json:"origin"` + ID uuid.UUID `json:"id"` + TunnelType string `json:"tunnel_type"` + CreatedAt time.Time `json:"created_at"` + Name string `json:"name"` + PortType PortProto `json:"port_type"` + PortCount int32 `json:"port_count"` + Alloc TunnelCreateUseAllocation `json:"alloc"` + Origin TunnelOriginCreate `json:"origin"` Domain *struct { ID uuid.UUID `json:"id"` Name string `json:"name"` diff --git a/enc/enc.go b/enc/enc.go new file mode 100644 index 0000000..ca0a08c --- /dev/null +++ b/enc/enc.go @@ -0,0 +1,195 @@ +package enc + +import ( + "encoding/binary" + "fmt" + "io" + "net/netip" +) + +func ReadU8(r io.Reader) uint8 { + var d uint8 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func ReadU16(r io.Reader) uint16 { + var d uint16 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func ReadU32(r io.Reader) uint32 { + var d uint32 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func ReadU64(r io.Reader) uint64 { + var d uint64 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} + +func WriteU8(w io.Writer, d uint8) error { + return binary.Write(w, binary.BigEndian, d) +} +func WriteU16(w io.Writer, d uint16) error { + return binary.Write(w, binary.BigEndian, d) +} +func WriteU32(w io.Writer, d uint32) error { + return binary.Write(w, binary.BigEndian, d) +} +func WriteU64(w io.Writer, d uint64) error { + return binary.Write(w, binary.BigEndian, d) +} + +func Read8(r io.Reader) int8 { + var d int8 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func Read16(r io.Reader) int16 { + var d int16 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func Read32(r io.Reader) int32 { + var d int32 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func Read64(r io.Reader) int64 { + var d int64 + err := binary.Read(r, binary.BigEndian, &d) + if err != nil { + panic(err) + } + return d +} +func Write8(w io.Writer, d int8) error { + return binary.Write(w, binary.BigEndian, &d) +} +func Write16(w io.Writer, d int16) error { + return binary.Write(w, binary.BigEndian, &d) +} +func Write32(w io.Writer, d int32) error { + return binary.Write(w, binary.BigEndian, &d) +} +func Write64(w io.Writer, d int64) error { + return binary.Write(w, binary.BigEndian, &d) +} + +func ReadByteN(r io.Reader, size int) (buff []byte, err error) { + buff = make([]byte, size) + for index := range buff { + if err = binary.Read(r, binary.BigEndian, &buff[index]); err != nil { + buff = buff[:index] + return + } + } + return +} +func WriteBytes(w io.Writer, buff []byte) error { + return binary.Write(w, binary.BigEndian, &buff) +} + +func AddrWrite(w io.Writer, addr netip.Addr) error { + if addr.Is6() { + if err := WriteU8(w, 6); err != nil { + return err + } else if err = WriteBytes(w, addr.AsSlice()); err != nil { + return err + } + return nil + } + if err := WriteU8(w, 4); err != nil { + return err + } else if err = WriteBytes(w, addr.AsSlice()); err != nil { + return err + } + return nil +} +func AddrRead(r io.Reader) (addr netip.Addr, err error) { + var buff []byte + switch ReadU8(r) { + case 4: + if buff, err = ReadByteN(r, 4); err != nil { + return + } + addr = netip.AddrFrom4([4]byte(buff)) + return + case 6: + if buff, err = ReadByteN(r, 16); err != nil { + return + } + netip.AddrFrom16([16]byte(buff)) + return + } + err = fmt.Errorf("connet get ip type") + return +} + +func AddrPortRead(r io.Reader) (netip.AddrPort, error) { + switch ReadU8(r) { + case 4: + buff, err := ReadByteN(r, 4) + if err != nil { + return netip.AddrPort{}, err + } + return netip.AddrPortFrom(netip.AddrFrom4([4]byte(buff)), ReadU16(r)), nil + case 6: + buff, err := ReadByteN(r, 16) + if err != nil { + return netip.AddrPort{}, err + } + return netip.AddrPortFrom(netip.AddrFrom16([16]byte(buff)), ReadU16(r)), nil + } + return netip.AddrPort{}, fmt.Errorf("connet get ip type") +} +func AddrPortWrite(w io.Writer, addr netip.AddrPort) error { + if !addr.IsValid() { + return fmt.Errorf("invalid ip address") + } else if addr.Addr().Is6() { + if err := WriteU8(w, 6); err != nil { + return err + } else if err = binary.Write(w, binary.BigEndian, addr.Addr().AsSlice()); err != nil { + return err + } + return nil + } + if err := WriteU8(w, 4); err != nil { + return err + } else if err = binary.Write(w, binary.BigEndian, addr.Addr().AsSlice()); err != nil { + return err + } + return nil +} + +func ReadOption(r io.Reader, callback func(r io.Reader) (err error)) error { + switch ReadU8(r) { + case 0: + return nil + case 1: + return callback(r) + } + return fmt.Errorf("invalid Option value") +} diff --git a/logfile/log.go b/logfile/log.go new file mode 100644 index 0000000..a8d92b6 --- /dev/null +++ b/logfile/log.go @@ -0,0 +1,19 @@ +package logfile + +import ( + "encoding/json" + "os" +) + +var DebugFile = func() *os.File { + file, err := os.Create("./debug.log") + if err != nil { + panic(err) + } + return file +}() + +func JSONString(data any) string { + d, _ := json.Marshal(data) + return string(d) +} \ No newline at end of file diff --git a/network/address_lookup.go b/network/address_lookup.go new file mode 100644 index 0000000..3093ff6 --- /dev/null +++ b/network/address_lookup.go @@ -0,0 +1,97 @@ +package network + +import ( + "encoding/binary" + "net/netip" + + "sirherobrine23.org/playit-cloud/go-playit/api" +) + +type AddressValue[V any] struct { + Value V + FromPort, ToPort uint16 +} + +type AddressLookup[Value any] interface { + Lookup(IP netip.Addr, Port uint16, Proto api.PortProto) *AddressValue[Value] +} + +type MatchIP struct { + IPNumber uint64 + RegionID *uint16 +} + +func (slef *MatchIP) Matches(ip netip.Addr) bool { + if ip.Is6() { + other := NewMatchIP(ip) + return slef.IPNumber == other.IPNumber && slef.RegionID == other.RegionID + } + octs := ip.As4() + if uint64(octs[3]) != slef.IPNumber { + return false + } + if slef.RegionID == nil { + return true + } + return RegionNumberV4(ip) == *slef.RegionID +} + +func NewMatchIP(ip netip.Addr) MatchIP { + parts := ip.As16() + regionID := binary.BigEndian.Uint16([]byte{parts[6], parts[7]}) + ipNumber := binary.BigEndian.Uint64([]byte{ + parts[8], + parts[9], + parts[10], + parts[11], + parts[12], + parts[13], + parts[14], + parts[15], + }) + info := MatchIP{IPNumber: ipNumber} + if regionID != 0 { + info.RegionID = new(uint16) + *info.RegionID = regionID + } + return info +} + +func RegionNumberV4(ip netip.Addr) uint16 { + octs := ip.As4(); + if octs[0] == 147 && octs[1] == 185 && octs[2] == 221 { /* 147.185.221.0/24 (1) */ + return 1 + } else if octs[0] == 209 && octs[1] == 25 && octs[2] >= 140 && octs[2] <= 143 { /* 209.25.140.0/22 (2 to 5) */ + return uint16(2 + (octs[2] - 140)) + } else if octs[0] == 23 && octs[1] == 133 && octs[2] == 216 { /* 23.133.216.0/24 (6) */ + return 6 + } + /* global IP */ + return 0 +} + +type MappingOverride struct { + Proto api.PortProto + Port api.PortRange + LocalAddr netip.AddrPort + MatchIP MatchIP +} + +type LookupWithOverrides []MappingOverride + +func (look *LookupWithOverrides) Lookup(IP netip.Addr, Port uint16, Proto api.PortProto) *AddressValue[netip.AddrPort] { + for _, over := range *look { + if (over.Port.From <= Port && Port < over.Port.To) && (over.Proto == "both" || over.Proto == Proto) { + return &AddressValue[netip.AddrPort]{ + Value: over.LocalAddr, + FromPort: over.Port.From, + ToPort: over.Port.To, + } + } + } + return &AddressValue[netip.AddrPort]{ + Value: netip.AddrPortFrom(netip.IPv4Unspecified(), Port), + FromPort: Port, + ToPort: Port+1, + } +} \ No newline at end of file diff --git a/network/lan_address.go b/network/lan_address.go new file mode 100644 index 0000000..8988969 --- /dev/null +++ b/network/lan_address.go @@ -0,0 +1,77 @@ +package network + +import ( + "encoding/binary" + "net" + "net/netip" +) + +func shuffle(v uint32) uint32 { + v = ((v >> 16) ^ v) * 0x45d9f3 + v = ((v >> 16) ^ v) * 0x45d9f3 + v = (v >> 16) ^ v + return v +} + +func asLocalMasked(ip uint32) uint32 { + ip = shuffle(ip) & 0x00FFFFFF + if ip == 0 { + ip = 1 + } + return ip | 0x7F000000 +} + +func mapToLocalIP4(ip net.IP) netip.Addr { + var ipUint32 uint32 + if ip.To4() != nil { // Check if it's already IPv4 + ipUint32 = binary.BigEndian.Uint32(ip.To4()) + } else { // Handle IPv6 + bytes := ip.To16() // Convert to IPv6 bytes + ipUint32 = shuffle(binary.BigEndian.Uint32(bytes[0:4])) ^ + shuffle(binary.BigEndian.Uint32(bytes[4:8])) ^ + shuffle(binary.BigEndian.Uint32(bytes[8:12])) ^ + shuffle(binary.BigEndian.Uint32(bytes[12:16])) + } + + return netip.AddrFrom4([4]byte{ + byte(asLocalMasked(ipUint32)>>24), + byte(asLocalMasked(ipUint32)>>16), + byte(asLocalMasked(ipUint32)>>8), + byte(asLocalMasked(ipUint32)), + }) +} + +func TcpSocket(SpecialLan bool, Peer, Host netip.AddrPort) (*net.TCPConn, error) { + isLoopback := Host.Addr().IsLoopback() + if isLoopback && SpecialLan { + stream, err := net.DialTCP("tcp", net.TCPAddrFromAddrPort(netip.AddrPortFrom(mapToLocalIP4(Peer.Addr().AsSlice()), 0)), net.TCPAddrFromAddrPort(Host)) + if err == nil { + return stream, nil + } + // logDebug.Printf("Failed to establish connection using special lan %s for flow %s -> %s\n", local_ip, Peer.String(), Host.String()) + } + // logDebug.Printf("Failed to bind connection to special local address to support IP based banning") + stream, err := net.DialTCP("tcp", nil, net.TCPAddrFromAddrPort(Host)) + if err != nil { + // logDebug.Printf("Failed to establish connection for flow %s -> %s. Is your server running? %q", Peer.String(), Host.String(), err.Error()) + return nil, err + } + return stream, nil +} + +func UdpSocket(SpecialLan bool, Peer, Host netip.AddrPort) (*net.UDPConn, error) { + isLoopback := Host.Addr().IsLoopback() + if isLoopback && SpecialLan { + local_ip := mapToLocalIP4(Peer.Addr().AsSlice()); + local_port := 40000 + (Peer.Port() % 24000); + stream, err := net.ListenUDP("udp4", net.UDPAddrFromAddrPort(netip.AddrPortFrom(local_ip, local_port))) + if err != nil { + stream, err = net.ListenUDP("udp4", net.UDPAddrFromAddrPort(netip.AddrPortFrom(local_ip, 0))) + if err != nil { + stream, err = net.ListenUDP("udp4", nil) + } + } + return stream, err + } + return net.ListenUDP("udp", nil) +} \ No newline at end of file diff --git a/network/network.go b/network/network.go new file mode 100644 index 0000000..ee936fd --- /dev/null +++ b/network/network.go @@ -0,0 +1,9 @@ +package network + +import ( + "log" + + "sirherobrine23.org/playit-cloud/go-playit/logfile" +) + +var debug = log.New(logfile.DebugFile, "network.playit.gg: ", log.Ldate) \ No newline at end of file diff --git a/network/tcp_clients.go b/network/tcp_clients.go new file mode 100644 index 0000000..917d557 --- /dev/null +++ b/network/tcp_clients.go @@ -0,0 +1,98 @@ +package network + +import ( + "net" + "net/netip" + "sync" + + "sirherobrine23.org/playit-cloud/go-playit/tunnel" + "sirherobrine23.org/playit-cloud/go-playit/proto" +) + +type ActiveClients struct { + locked sync.Mutex + // active: Arc>>, + active map[[2]netip.AddrPort]proto.NewClient +} + +func NewActiveClients() ActiveClients { + return ActiveClients{ + locked: sync.Mutex{}, + active: make(map[[2]netip.AddrPort]proto.NewClient), + } +} + +func (clients *ActiveClients) Len() int { + return len(clients.active) +} + +func (clients *ActiveClients) GetClients() []proto.NewClient { + clientsArr := []proto.NewClient{} + for _, cl := range clients.active { + clientsArr = append(clientsArr, cl) + } + return clientsArr +} + +func (clients *ActiveClients) AddNew(client proto.NewClient) *Dropper { + clients.locked.Lock() + defer clients.locked.Unlock() + for actClient := range clients.active { + if client.PeerAddr.Compare(actClient[0]) == 0 && client.ConnectAddr.Compare(actClient[1]) == 0 { + return nil + } + } + key := [2]netip.AddrPort{client.PeerAddr, client.ConnectAddr} + clients.active[key] = client + return &Dropper{key, *clients} +} + +type Dropper struct { + key [2]netip.AddrPort + inner ActiveClients +} + +func (dr *Dropper) Drop() { + PeerAddr, ConnectAddr := dr.key[0], dr.key[1] + dr.inner.locked.Lock() + defer dr.inner.locked.Unlock() + for client := range dr.inner.active { + if client[0].Compare(PeerAddr) == 0 && client[1].Compare(ConnectAddr) == 0 { + delete(dr.inner.active, client) + break + } + } +} + +func NewTcpClients() TcpClients { + return TcpClients{NewActiveClients(), true} +} + +type TcpClients struct { + active ActiveClients + UseSpecialLAN bool +} + +func (tcp *TcpClients) ActiveClients() ActiveClients { + return tcp.active +} + +func (tcp *TcpClients) Connect(newClient proto.NewClient) (*TcpClient, error) { + claimInstruction := newClient.ClaimInstructions + droppe := tcp.active.AddNew(newClient) + if droppe == nil { + return nil, nil + } + + stream, err := (&tunnel.TcpTunnel{claimInstruction}).Connect() + if err != nil { + return nil, err + } + + return &TcpClient{stream, droppe}, nil +} + +type TcpClient struct { + Stream *net.TCPConn + Dropper *Dropper +} diff --git a/network/udp_clients.go b/network/udp_clients.go new file mode 100644 index 0000000..d941523 --- /dev/null +++ b/network/udp_clients.go @@ -0,0 +1,161 @@ +package network + +import ( + "fmt" + "net" + "net/netip" + "reflect" + "sync/atomic" + "time" + + "sirherobrine23.org/playit-cloud/go-playit/api" + "sirherobrine23.org/playit-cloud/go-playit/tunnel" +) + +type UdpClient struct { + clientKey ClientKey + sendFlow tunnel.UdpFlow + udpTunnel tunnel.UdpTunnel + localUdp *net.UDPConn + localStartAddr netip.AddrPort + tunnelFromPort uint16 + tunnelToPort uint16 + udpClients map[ClientKey]UdpClient + lastActivity atomic.Uint32 +} + +func (self *UdpClient) SendLocal(dstPort uint16, data []byte) error { + portOffset := dstPort - self.tunnelFromPort + self.lastActivity.Store(uint32(time.Now().UnixMilli() / 1_000)) + if portOffset == 0 { + _, err := self.localUdp.WriteToUDP(data, net.UDPAddrFromAddrPort(self.localStartAddr)) + return err + } + _, err := self.localUdp.WriteToUDP(data, net.UDPAddrFromAddrPort(netip.AddrPortFrom(self.localStartAddr.Addr(), self.localStartAddr.Port()+portOffset))) + return err +} + +type HostToTunnelForwarder struct{ UdpClient } + +func (self *HostToTunnelForwarder) Run() { + buffer := make([]byte, 2048) + for { + buffer = make([]byte, 2048) + self.localUdp.SetReadDeadline(time.Now().Add(time.Second * 30)) + size, source, err := self.localUdp.ReadFromUDPAddrPort(buffer) + if err != nil { + debug.Println(err) + break + } else if source.Addr().Compare(self.localStartAddr.Addr()) != 0 { + // "dropping packet from different unexpected source" + continue + } + + portCount := self.tunnelToPort - self.tunnelFromPort + localFrom := self.localStartAddr.Port() + localTo := localFrom + portCount + if source.Port() < localFrom || localTo <= source.Port() { + // "dropping packet outside of expected port range" + continue + } + buffer = buffer[:size] + portOffset := source.Port() - localFrom + flow := self.sendFlow.WithSrcPort(self.tunnelFromPort + portOffset) + if _, err = self.udpTunnel.Send(buffer, flow); err != nil { + // "failed to send packet to through tunnel" + } + } + + if _, is := self.UdpClient.udpClients[self.clientKey]; is { + // if !reflect.DeepEqual(v, self) {} else {} + delete(self.UdpClient.udpClients, self.clientKey) + } +} + +type ClientKey struct { + ClientAddr, TunnelAddr netip.AddrPort +} + +type UdpClients struct { + udpTunnel tunnel.UdpTunnel + lookup AddressLookup[netip.AddrPort] + udpClients map[ClientKey]UdpClient + UseSpecialLan bool +} + +func NewUdpClients(Tunnel tunnel.UdpTunnel, Lookup AddressLookup[netip.AddrPort]) UdpClients { + return UdpClients{ + udpTunnel: Tunnel, + lookup: Lookup, + udpClients: make(map[ClientKey]UdpClient), + UseSpecialLan: true, + } +} + +func (self *UdpClients) ClientCount() int { + return len(self.udpClients) +} + +func (self *UdpClients) ForwardPacket(Flow tunnel.UdpFlow, data []byte) error { + flowDst := Flow.Dst() + found := self.lookup.Lookup(flowDst.Addr(), flowDst.Port(), api.PortProto("udp")) + if found == nil { + return fmt.Errorf("could not find tunnel") + } + + key := ClientKey{ClientAddr: Flow.Src(), TunnelAddr: netip.AddrPortFrom(flowDst.Addr(), found.FromPort)} + for kkey, client := range self.udpClients { + if reflect.DeepEqual(kkey, key) { + return client.SendLocal(flowDst.Port(), data) + } + } + + client, err := func() (*UdpClient, error) { + for kkey, client := range self.udpClients { + if reflect.DeepEqual(kkey, key) { + return &client, nil + } + } + localAddr := found.Value + var sendFlow tunnel.UdpFlow + var clientAddr netip.AddrPort + if Flow.IPSrc.Addr().Is4() { + clientAddr = netip.AddrPortFrom(Flow.IPSrc.Addr(), Flow.IPSrc.Port()) + sendFlow = tunnel.UdpFlow{ + IPSrc: netip.AddrPortFrom(Flow.IPDst.Addr(), found.FromPort), + IPDst: Flow.Src(), + } + } else { + clientAddr = netip.AddrPortFrom(Flow.IPSrc.Addr(), Flow.IPSrc.Port()) + sendFlow = tunnel.UdpFlow{ + IPSrc: netip.AddrPortFrom(Flow.IPDst.Addr(), found.FromPort), + IPDst: Flow.Src(), + Flow: sendFlow.Flow, + } + } + + usock, err := UdpSocket(self.UseSpecialLan, clientAddr, localAddr) + if err != nil { + return nil, err + } + client := UdpClient{ + clientKey: key, + sendFlow: sendFlow, + localUdp: usock, + udpTunnel: self.udpTunnel, + localStartAddr: localAddr, + tunnelFromPort: found.FromPort, + tunnelToPort: found.ToPort, + udpClients: self.udpClients, + lastActivity: atomic.Uint32{}, + } + + self.udpClients[key] = client + go (&HostToTunnelForwarder{client}).Run() + return &client, nil + }() + if err != nil { + return err + } + return client.SendLocal(flowDst.Port(), data) +} diff --git a/proto/control_feed.go b/proto/control_feed.go new file mode 100644 index 0000000..1f1e95d --- /dev/null +++ b/proto/control_feed.go @@ -0,0 +1,109 @@ +package proto + +import ( + "fmt" + "io" + "net/netip" + + "sirherobrine23.org/playit-cloud/go-playit/enc" + "sirherobrine23.org/playit-cloud/go-playit/logfile" +) + +var ( + ErrFeedRead error = fmt.Errorf("invalid controlFeed id") +) + +type ControlFeed struct { + Response *ControlRpcMessage[*ControlResponse] + NewClient *NewClient +} + +func (Feed *ControlFeed) ReadFrom(r io.Reader) (err error) { + id := enc.ReadU32(r) + if id == 1 { + Feed.Response = new(ControlRpcMessage[*ControlResponse]) + Feed.Response.Content = new(ControlResponse) + err = Feed.Response.ReadFrom(r) + debug.Printf("Read Feed (id %d): %s\n", id, logfile.JSONString(Feed)) + } else if id == 2 { + Feed.NewClient = &NewClient{} + err = Feed.NewClient.ReadFrom(r) + debug.Printf("Read Feed (id %d): %s\n", id, logfile.JSONString(Feed)) + } else { + err = ErrFeedRead + } + return +} +func (Feed *ControlFeed) WriteTo(w io.Writer) error { + defer debug.Printf("Write Feed: %s\n", logfile.JSONString(Feed)) + if Feed.Response != nil { + if err := enc.WriteU32(w, 1); err != nil { + return err + } + return Feed.Response.WriteTo(w) + } else if Feed.NewClient != nil { + if err := enc.WriteU32(w, 2); err != nil { + return err + } + return Feed.NewClient.WriteTo(w) + } + return fmt.Errorf("set Response or NewClient") +} + +type NewClient struct { + ConnectAddr netip.AddrPort + PeerAddr netip.AddrPort + ClaimInstructions ClaimInstructions + TunnelServerId uint64 + DataCenterId uint32 +} + +func (client *NewClient) ReadFrom(r io.Reader) error { + var err error + if client.ConnectAddr, err = enc.AddrPortRead(r); err != nil { + return err + } else if client.PeerAddr, err = enc.AddrPortRead(r); err != nil { + return err + } else if err = client.ClaimInstructions.ReadFrom(r); err != nil { + return err + } + client.TunnelServerId, client.DataCenterId = enc.ReadU64(r), enc.ReadU32(r) + return nil +} +func (client *NewClient) WriteTo(w io.Writer) error { + if err := enc.AddrPortWrite(w, client.ConnectAddr); err != nil { + return err + } else if err := enc.AddrPortWrite(w, client.PeerAddr); err != nil { + return err + } else if err := client.ClaimInstructions.WriteTo(w); err != nil { + return err + } else if err := enc.WriteU64(w, client.TunnelServerId); err != nil { + return err + } else if err := enc.WriteU32(w, client.DataCenterId); err != nil { + return err + } + return nil +} + +type ClaimInstructions struct { + Address netip.AddrPort + Token []byte +} + +func (claim *ClaimInstructions) ReadFrom(r io.Reader) (err error) { + if claim.Address, err = enc.AddrPortRead(r); err != nil { + return err + } + claim.Token, err = enc.ReadByteN(r, int(enc.ReadU64(r))) + return +} +func (claim *ClaimInstructions) WriteTo(w io.Writer) error { + if err := enc.AddrPortWrite(w, claim.Address); err != nil { + return err + } else if err := enc.WriteU64(w, uint64(len(claim.Token))); err != nil { + return err + } else if err = enc.WriteBytes(w, claim.Token); err != nil { + return err + } + return nil +} diff --git a/proto/control_messages.go b/proto/control_messages.go new file mode 100644 index 0000000..70737f6 --- /dev/null +++ b/proto/control_messages.go @@ -0,0 +1,436 @@ +package proto + +import ( + "bytes" + "fmt" + "io" + "net/netip" + "time" + + "sirherobrine23.org/playit-cloud/go-playit/enc" + "sirherobrine23.org/playit-cloud/go-playit/logfile" +) + +type ControlRequest struct { + Ping *Ping + AgentRegister *AgentRegister + AgentKeepAlive *AgentSessionId + SetupUdpChannel *AgentSessionId + AgentCheckPortMapping *AgentCheckPortMapping +} + +func (Control *ControlRequest) WriteTo(w io.Writer) error { + defer debug.Printf("Write ControlRequest: %s\n", logfile.JSONString(Control)) + if Control.Ping != nil { + if err := enc.WriteU32(w, 6); err != nil { + debug.Printf("Write ControlRequest error: %s\n", err.Error()) + return err + } + return Control.Ping.WriteTo(w) + } else if Control.AgentRegister != nil { + if err := enc.WriteU32(w, 2); err != nil { + debug.Printf("Write ControlRequest error: %s\n", err.Error()) + return err + } + return Control.AgentRegister.WriteTo(w) + } else if Control.AgentKeepAlive != nil { + if err := enc.WriteU32(w, 3); err != nil { + debug.Printf("Write ControlRequest error: %s\n", err.Error()) + return err + } + return Control.AgentKeepAlive.WriteTo(w) + } else if Control.SetupUdpChannel != nil { + if err := enc.WriteU32(w, 4); err != nil { + debug.Printf("Write ControlRequest error: %s\n", err.Error()) + return err + } + return Control.SetupUdpChannel.WriteTo(w) + } else if Control.AgentCheckPortMapping != nil { + if err := enc.WriteU32(w, 5); err != nil { + debug.Printf("Write ControlRequest error: %s\n", err.Error()) + return err + } + return Control.AgentCheckPortMapping.WriteTo(w) + } + return fmt.Errorf("set ControlRequest") +} +func (Control *ControlRequest) ReadFrom(r io.Reader) (err error) { + switch enc.ReadU32(r) { + case 1: + Control.Ping = new(Ping) + err = Control.Ping.ReadFrom(r) + case 2: + Control.AgentRegister = new(AgentRegister) + err = Control.AgentRegister.ReadFrom(r) + case 3: + Control.AgentKeepAlive = new(AgentSessionId) + err = Control.AgentKeepAlive.ReadFrom(r) + case 4: + Control.SetupUdpChannel = new(AgentSessionId) + err = Control.SetupUdpChannel.ReadFrom(r) + case 5: + Control.AgentCheckPortMapping = new(AgentCheckPortMapping) + err = Control.AgentCheckPortMapping.ReadFrom(r) + default: + err = fmt.Errorf("invalid ControlRequest id") + } + debug.Printf("Read ControlRequest: %s\n", logfile.JSONString(Control)) + if err != nil { + debug.Printf("Read ControlRequest error: %s\n", err.Error()) + } + return +} + +type AgentCheckPortMapping struct { + AgentSessionId AgentSessionId + PortRange PortRange +} + +func (Agent *AgentCheckPortMapping) WriteTo(w io.Writer) error { + if err := Agent.AgentSessionId.WriteTo(w); err != nil { + return err + } + return Agent.PortRange.WriteTo(w) +} +func (Agent *AgentCheckPortMapping) ReadFrom(r io.Reader) error { + if err := Agent.AgentSessionId.ReadFrom(r); err != nil { + return err + } + return Agent.AgentSessionId.ReadFrom(r) +} + +type Ping struct { + Now time.Time + CurrentPing *uint32 + SessionId *AgentSessionId +} + +func (ping *Ping) WriteTo(w io.Writer) error { + if err := enc.WriteU64(w, uint64(ping.Now.UnixMilli())); err != nil { + return err + } + + if ping.CurrentPing == nil { + if err := enc.WriteU8(w, 0); err != nil { + return err + } + } else { + if err := enc.WriteU8(w, 1); err != nil { + return err + } else if err := enc.WriteU32(w, *ping.CurrentPing); err != nil { + return err + } + } + + if ping.SessionId == nil { + if err := enc.WriteU8(w, 0); err != nil { + return err + } + } else { + if err := enc.WriteU8(w, 1); err != nil { + return err + } else if err := ping.SessionId.WriteTo(w); err != nil { + return err + } + } + return nil +} +func (ping *Ping) ReadFrom(r io.Reader) error { + ping.Now = time.UnixMilli(int64(enc.ReadU64(r))) + if err := enc.ReadOption(r, func(r io.Reader) error { + *ping.CurrentPing = enc.ReadU32(r) + return nil + }); err != nil { + return err + } + + if err := enc.ReadOption(r, func(r io.Reader) error { + return ping.SessionId.ReadFrom(r) + }); err != nil { + return err + } + return nil +} + +type AgentRegister struct { + AccountID, AgentId, AgentVersion, Timestamp uint64 + ClientAddr, TunnelAddr netip.AddrPort + Signature [32]byte +} + +func (agent *AgentRegister) writePlain() *bytes.Buffer { + buff := new(bytes.Buffer) + enc.WriteU64(buff, agent.AccountID) + enc.WriteU64(buff, agent.AgentId) + enc.WriteU64(buff, agent.AgentVersion) + enc.WriteU64(buff, agent.Timestamp) + enc.AddrPortWrite(buff, agent.ClientAddr) + enc.AddrPortWrite(buff, agent.TunnelAddr) + return buff +} +func (agent *AgentRegister) UpdateSignature(hmac HmacSha256) { + agent.Signature = hmac.Sign(agent.writePlain().Bytes()) +} +func (agent *AgentRegister) VerifySignature(hmac HmacSha256) bool { + return hmac.Verify(agent.writePlain().Bytes(), agent.Signature[:]) +} + +func (AgentReg *AgentRegister) WriteTo(w io.Writer) error { + if err := enc.WriteU64(w, AgentReg.AccountID); err != nil { + return err + } else if err := enc.WriteU64(w, AgentReg.AgentId); err != nil { + return err + } else if err := enc.WriteU64(w, AgentReg.AgentVersion); err != nil { + return err + } else if err := enc.WriteU64(w, AgentReg.Timestamp); err != nil { + return err + } + if err := enc.AddrPortWrite(w, AgentReg.ClientAddr); err != nil { + return err + } else if err := enc.AddrPortWrite(w, AgentReg.TunnelAddr); err != nil { + return err + } + if _, err := w.Write(AgentReg.Signature[:]); err != nil { + return err + } + return nil +} +func (AgentReg *AgentRegister) ReadFrom(r io.Reader) error { + AgentReg.AccountID, AgentReg.AccountID, AgentReg.AgentVersion, AgentReg.Timestamp = enc.ReadU64(r), enc.ReadU64(r), enc.ReadU64(r), enc.ReadU64(r) + var err error + if AgentReg.ClientAddr, err = enc.AddrPortRead(r); err != nil { + return err + } else if AgentReg.TunnelAddr, err = enc.AddrPortRead(r); err != nil { + return err + } + AgentReg.Signature = [32]byte(make([]byte, 32)) + if n, _ := r.Read(AgentReg.Signature[:]); n != 32 { + return fmt.Errorf("missing signature") + } + return nil +} + +type ControlResponse struct { + InvalidSignature, Unauthorized, RequestQueued, TryAgainLater bool + Pong *Pong + AgentRegistered *AgentRegistered + AgentPortMapping *AgentPortMapping + UdpChannelDetails *UdpChannelDetails +} + +func (Control *ControlResponse) WriteTo(w io.Writer) error { + defer debug.Printf("Write Feed: %s\n", logfile.JSONString(&Control)) + if Control.Pong != nil { + if err := enc.WriteU32(w, 1); err != nil { + return err + } + return Control.Pong.WriteTo(w) + } else if Control.InvalidSignature { + return enc.WriteU32(w, 2) + } else if Control.Unauthorized { + return enc.WriteU32(w, 3) + } else if Control.RequestQueued { + return enc.WriteU32(w, 4) + } else if Control.TryAgainLater { + return enc.WriteU32(w, 5) + } else if Control.AgentRegistered != nil { + if err := enc.WriteU32(w, 6); err != nil { + return err + } + return Control.AgentRegistered.WriteTo(w) + } else if Control.AgentPortMapping != nil { + if err := enc.WriteU32(w, 7); err != nil { + return err + } + return Control.AgentPortMapping.WriteTo(w) + } else if Control.UdpChannelDetails != nil { + if err := enc.WriteU32(w, 8); err != nil { + return err + } + return Control.UdpChannelDetails.WriteTo(w) + } + return fmt.Errorf("insert any options to write") +} +func (Control *ControlResponse) ReadFrom(r io.Reader) (err error) { + code := enc.ReadU32(r) + switch code { + case 1: + Control.Pong = new(Pong) + err = Control.Pong.ReadFrom(r) + case 2: + Control.InvalidSignature = true + err = nil + case 3: + Control.Unauthorized = true + err = nil + case 4: + Control.RequestQueued = true + err = nil + case 5: + Control.TryAgainLater = true + err = nil + case 6: + Control.AgentRegistered = new(AgentRegistered) + err = Control.AgentRegistered.ReadFrom(r) + case 7: + Control.AgentPortMapping = new(AgentPortMapping) + err = Control.AgentPortMapping.ReadFrom(r) + case 8: + Control.UdpChannelDetails = new(UdpChannelDetails) + err = Control.UdpChannelDetails.ReadFrom(r) + default: + err = fmt.Errorf("invalid ControlResponse id") + } + debug.Printf("Read ControlResponse (Code %d): %s\n", code, logfile.JSONString(Control)) + if err != nil { + debug.Printf("Read ControlResponse (Code %d) error: %s\n", code, err.Error()) + } + return +} + +type AgentPortMapping struct { + Range PortRange + Found *AgentPortMappingFound +} + +func (Agent *AgentPortMapping) WriteTo(w io.Writer) error { + if err := Agent.Range.WriteTo(w); err != nil { + return err + } else if err := Agent.Found.WriteTo(w); err != nil { + return err + } + return nil +} +func (Agent *AgentPortMapping) ReadFrom(r io.Reader) error { + if err := Agent.Range.ReadFrom(r); err != nil { + return err + } else if err := Agent.Found.ReadFrom(r); err != nil { + return err + } + return nil +} + +type AgentPortMappingFound struct { + ToAgent *AgentSessionId +} + +func (Agent *AgentPortMappingFound) WriteTo(w io.Writer) error { + if Agent.ToAgent != nil { + if err := enc.WriteU32(w, 1); err != nil { + return err + } else if err := Agent.ToAgent.WriteTo(w); err != nil { + return err + } + return nil + } + return nil +} +func (Agent *AgentPortMappingFound) ReadFrom(r io.Reader) error { + if enc.ReadU32(r) == 1 { + Agent.ToAgent = new(AgentSessionId) + return Agent.ToAgent.ReadFrom(r) + } + return fmt.Errorf("unknown AgentPortMappingFound id") +} + +type UdpChannelDetails struct { + TunnelAddr netip.AddrPort + Token []byte +} + +func (UdpChannel *UdpChannelDetails) WriteTo(w io.Writer) error { + if err := enc.AddrPortWrite(w, UdpChannel.TunnelAddr); err != nil { + return err + } else if err := enc.WriteU64(w, uint64(len(UdpChannel.Token))); err != nil { + return err + } else if err := enc.WriteBytes(w, UdpChannel.Token); err != nil { + return err + } + return nil +} +func (UdpChannel *UdpChannelDetails) ReadFrom(r io.Reader) error { + var err error + if UdpChannel.TunnelAddr, err = enc.AddrPortRead(r); err != nil { + return err + } else if UdpChannel.Token, err = enc.ReadByteN(r, int(enc.ReadU64(r))); err != nil { + return err + } + return nil +} + +type Pong struct { + RequestNow, ServerNow time.Time + ServerId uint64 + DataCenterId uint32 + ClientAddr, TunnelAddr netip.AddrPort + SessionExpireAt *time.Time +} + +func (pong *Pong) WriteTo(w io.Writer) error { + if err := enc.Write64(w, pong.RequestNow.UnixMilli()); err != nil { + return err + } else if err := enc.Write64(w, pong.ServerNow.UnixMilli()); err != nil { + return err + } else if err := enc.WriteU64(w, pong.ServerId); err != nil { + return err + } else if err := enc.WriteU32(w, pong.DataCenterId); err != nil { + return err + } else if err := enc.AddrPortWrite(w, pong.ClientAddr); err != nil { + return err + } else if err := enc.AddrPortWrite(w, pong.TunnelAddr); err != nil { + return err + } + + if pong.SessionExpireAt == nil { + if err := enc.Write8(w, 0); err != nil { + return err + } + } else { + if err := enc.Write8(w, 1); err != nil { + return err + } else if err := enc.Write64(w, pong.SessionExpireAt.UnixMilli()); err != nil { + return err + } + } + return nil +} +func (pong *Pong) ReadFrom(r io.Reader) error { + pong.RequestNow = time.UnixMilli(enc.Read64(r)) + pong.ServerNow = time.UnixMilli(enc.Read64(r)) + pong.ServerId = enc.ReadU64(r) + pong.DataCenterId = enc.ReadU32(r) + var err error + if pong.ClientAddr, err = enc.AddrPortRead(r); err != nil { + return err + } else if pong.TunnelAddr, err = enc.AddrPortRead(r); err != nil { + return err + } else if err := enc.ReadOption(r, func(r io.Reader) (err error) { + pong.SessionExpireAt = new(time.Time) + *pong.SessionExpireAt = time.UnixMilli(enc.Read64(r)) // Fix set SessionExpireAt + return nil + }); err != nil { + return err + } + return nil +} + +type AgentRegistered struct { + Id AgentSessionId + ExpiresAt time.Time +} + +func (agent *AgentRegistered) WriteTo(w io.Writer) error { + if err := agent.Id.WriteTo(w); err != nil { + return err + } else if err := enc.Write64(w, agent.ExpiresAt.UnixMilli()); err != nil { + return err + } + return nil +} +func (agent *AgentRegistered) ReadFrom(r io.Reader) error { + if err := agent.Id.ReadFrom(r); err != nil { + return err + } + agent.ExpiresAt = time.UnixMilli(enc.Read64(r)) + return nil +} diff --git a/proto/encoding.go b/proto/encoding.go new file mode 100644 index 0000000..bf87813 --- /dev/null +++ b/proto/encoding.go @@ -0,0 +1,10 @@ +package proto + +import ( + "io" +) + +type MessageEncoding interface { + ReadFrom(r io.Reader) error + WriteTo(w io.Writer) error +} diff --git a/proto/hmac.go b/proto/hmac.go new file mode 100644 index 0000000..b162690 --- /dev/null +++ b/proto/hmac.go @@ -0,0 +1,29 @@ +package proto + +import ( + "crypto/hmac" + "crypto/sha256" + "hash" +) + +type HmacSha256 struct { + mac hash.Hash +} + +func NewHmacSha256(secret []byte) *HmacSha256 { + mac := hmac.New(sha256.New, secret) + return &HmacSha256{mac} +} + +func (h *HmacSha256) Verify(data, sig []byte) bool { + expectedMAC := h.mac.Sum(nil) + h.mac.Reset() + h.mac.Write(data) + return hmac.Equal(expectedMAC, sig) +} + +func (h *HmacSha256) Sign(data []byte) [32]byte { + h.mac.Reset() + h.mac.Write(data) + return [32]byte(h.mac.Sum(nil)) +} diff --git a/proto/lib.go b/proto/lib.go new file mode 100644 index 0000000..04be3c7 --- /dev/null +++ b/proto/lib.go @@ -0,0 +1,87 @@ +package proto + +import ( + "fmt" + "io" + "net/netip" + + "sirherobrine23.org/playit-cloud/go-playit/enc" +) + +type AgentSessionId struct { + SessionID, AccountID, AgentID uint64 +} + +type PortRange struct { + IP netip.Addr + PortStart, PortEnd uint16 + PortProto PortProto +} + +type PortProto string + +func (AgentSession *AgentSessionId) WriteTo(w io.Writer) error { + if err := enc.WriteU64(w, AgentSession.SessionID); err != nil { + return err + } else if err := enc.WriteU64(w, AgentSession.AccountID); err != nil { + return err + } else if err := enc.WriteU64(w, AgentSession.AgentID); err != nil { + return err + } + return nil +} +func (AgentSession *AgentSessionId) ReadFrom(r io.Reader) error { + AgentSession.SessionID, AgentSession.AccountID, AgentSession.AgentID = enc.ReadU64(r), enc.ReadU64(r), enc.ReadU64(r) + return nil +} + +func (portRange *PortRange) WriteTo(w io.Writer) error { + if err := enc.AddrWrite(w, portRange.IP); err != nil { + return err + } else if err := enc.WriteU16(w, portRange.PortStart); err != nil { + return err + } else if err := enc.WriteU16(w, portRange.PortEnd); err != nil { + return err + } else if err := portRange.PortProto.WriteTo(w); err != nil { + return err + } + return nil +} +func (portRange *PortRange) ReadFrom(r io.Reader) error { + var err error + portRange.IP, err = enc.AddrRead(r) + if err != nil { + return err + } + portRange.PortStart, portRange.PortEnd = enc.ReadU16(r), enc.ReadU16(r) + portRange.PortProto = PortProto("") + if err := portRange.PortProto.ReadFrom(r); err != nil { + return err + } + return nil +} + +func (proto PortProto) WriteTo(w io.Writer) error { + switch proto { + case "tcp": + return enc.WriteU8(w, 1) + case "udp": + return enc.WriteU8(w, 2) + case "both": + return enc.WriteU8(w, 3) + } + return fmt.Errorf("invalid port proto") +} +func (proto PortProto) ReadFrom(r io.Reader) error { + switch enc.ReadU8(r) { + case 1: + proto = PortProto("tcp") + case 2: + proto = PortProto("udp") + case 3: + proto = PortProto("both") + default: + return fmt.Errorf("invalid port proto") + } + return nil +} diff --git a/proto/proto.go b/proto/proto.go new file mode 100644 index 0000000..c4c25ce --- /dev/null +++ b/proto/proto.go @@ -0,0 +1,9 @@ +package proto + +import ( + "log" + + "sirherobrine23.org/playit-cloud/go-playit/logfile" +) + +var debug = log.New(logfile.DebugFile, "proto.playit.gg: ", log.Ldate) \ No newline at end of file diff --git a/proto/raw_slice.go b/proto/raw_slice.go new file mode 100644 index 0000000..174ed64 --- /dev/null +++ b/proto/raw_slice.go @@ -0,0 +1,21 @@ +package proto + +import ( + "fmt" + "io" +) + +type RawSlice []byte + +func (buff RawSlice) ReadFrom(r io.Reader) error { + return fmt.Errorf("cannot read for RawSlice") +} +func (buff RawSlice) WriteTo(w io.Writer) error { + size, err := w.Write(buff) + if err != nil { + return err + } else if size != len(buff) { + return fmt.Errorf("not enough space to write raw slice") + } + return nil +} diff --git a/proto/rpc.go b/proto/rpc.go new file mode 100644 index 0000000..03faf68 --- /dev/null +++ b/proto/rpc.go @@ -0,0 +1,34 @@ +package proto + +import ( + "io" + "reflect" + + "sirherobrine23.org/playit-cloud/go-playit/enc" + "sirherobrine23.org/playit-cloud/go-playit/logfile" +) + +type ControlRpcMessage[T MessageEncoding] struct { + RequestID uint64 + Content T // Convert with .(*type) +} + +func (rpc *ControlRpcMessage[T]) WriteTo(w io.Writer) error { + + defer debug.Printf("Write ControlRpcMessage[%s]: %s\n", reflect.TypeOf(rpc.Content).String(), logfile.JSONString(rpc)) + if err := enc.WriteU64(w, rpc.RequestID); err != nil { + return err + } else if err = rpc.Content.WriteTo(w); err != nil { + return err + } + return nil +} +func (rpc *ControlRpcMessage[T]) ReadFrom(r io.Reader) error { + rpc.RequestID = enc.ReadU64(r) + if err := rpc.Content.ReadFrom(r); err != nil { + debug.Printf("Read ControlRpcMessage[%s] error: %s\n", reflect.TypeOf(rpc.Content).String(), err.Error()) + return err + } + debug.Printf("Read ControlRpcMessage[%s]: %s\n", reflect.TypeOf(rpc.Content).String(), logfile.JSONString(rpc)) + return nil +} diff --git a/runner/autorun.go b/runner/autorun.go new file mode 100644 index 0000000..8a058fe --- /dev/null +++ b/runner/autorun.go @@ -0,0 +1,107 @@ +package runner + +import ( + "fmt" + "net/netip" + "sync" + "time" + + "sirherobrine23.org/playit-cloud/go-playit/api" + "sirherobrine23.org/playit-cloud/go-playit/network" +) + +type TunnelEntry struct { + PubAddress string + MatchIP network.MatchIP + PortType api.PortProto + FromPort, ToPort uint16 + LocalStartAdress netip.AddrPort +} + +type LocalLookup struct { + AdreessLock sync.Mutex + Adreess []TunnelEntry +} + +func (look *LocalLookup) Lookup(IP netip.Addr, Port uint16, Proto api.PortProto) *network.AddressValue[netip.AddrPort] { + // look.AdreessLock.Lock() + // defer look.AdreessLock.Unlock() + for _, tunnel := range look.Adreess { + if tunnel.PortType != Proto && tunnel.PortType != "both" { + continue + } else if !tunnel.MatchIP.Matches(IP) { + continue + } else if tunnel.FromPort <= Port && Port < tunnel.ToPort { + return &network.AddressValue[netip.AddrPort]{ + Value: tunnel.LocalStartAdress, + FromPort: tunnel.FromPort, + ToPort: tunnel.ToPort, + } + } + } + return nil +} + +func (look *LocalLookup) Update(tunnels []api.AgentTunnel) { + entries := []TunnelEntry{} + for _, tunnel := range tunnels { + tun := TunnelEntry{ + PortType: api.PortProto(tunnel.Proto), + FromPort: tunnel.Port.From, + ToPort: tunnel.Port.To, + LocalStartAdress: netip.AddrPortFrom(tunnel.LocalIp, tunnel.LocalPort), + MatchIP: network.MatchIP{IPNumber: uint64(tunnel.IpNum)}, + } + if tunnel.RegionNum != 0 { + tun.MatchIP.RegionID = new(uint16) + *tun.MatchIP.RegionID = tunnel.RegionNum + } + entries = append(entries, tun) + } + look.AdreessLock.Lock() + defer look.AdreessLock.Unlock() + look.Adreess = entries +} + +func Autorun(Api api.Api) error { + lookup := LocalLookup{Adreess: []TunnelEntry{}, AdreessLock: sync.Mutex{}} + tuns, err := Api.AgentInfo() + if err != nil { + return err + } + lookup.Update(tuns.Tunnels) + for _, tun := range tuns.Tunnels { + src, dst := tun.SourceString(), tun.DestString() + if tun.Disabled != nil { + fmt.Printf("%s -> %s (Disabled)\n", src, dst) + } else if tun.TunnelType != "" { + fmt.Printf("%s -> %s (%s)\n", src, dst, tun.TunnelType) + } else { + fmt.Printf("%s -> %s (Proto: %s, Port Count %d)\n", src, dst, tun.Proto, tun.Port.To - tun.Port.From) + } + } + var runner TunnelRunner + errorCount := 0 + for { + runner, err = NewTunnelRunner(Api, &lookup) + if err == nil { + break + } else if errorCount++; errorCount > 5 { + return err + } + <-time.After(time.Second*2) + } + runing := runner.Run() + go func(){ + for runner.KeepRunning.Load() { + if tuns, err = Api.AgentInfo(); err != nil { + <-time.After(time.Second*3) + continue + } + lookup.Update(tuns.Tunnels) + <-time.After(time.Second*3) + } + }() + defer runner.KeepRunning.Store(false) + return <- runing +} diff --git a/runner/runner.go b/runner/runner.go new file mode 100644 index 0000000..56be7bc --- /dev/null +++ b/runner/runner.go @@ -0,0 +1,9 @@ +package runner + +import ( + "log" + + "sirherobrine23.org/playit-cloud/go-playit/logfile" +) + +var debug = log.New(logfile.DebugFile, "runner.playit.gg: ", log.Ldate) \ No newline at end of file diff --git a/runner/tunnel_runner.go b/runner/tunnel_runner.go new file mode 100644 index 0000000..f4c0cc9 --- /dev/null +++ b/runner/tunnel_runner.go @@ -0,0 +1,132 @@ +package runner + +import ( + "io" + "net" + "net/netip" + "sync/atomic" + "time" + + "sirherobrine23.org/playit-cloud/go-playit/api" + "sirherobrine23.org/playit-cloud/go-playit/logfile" + "sirherobrine23.org/playit-cloud/go-playit/network" + "sirherobrine23.org/playit-cloud/go-playit/tunnel" +) + +type TunnelRunner struct { + Lookup network.AddressLookup[netip.AddrPort] + Tunnel tunnel.SimpleTunnel + UdpClients network.UdpClients + TcpClients network.TcpClients + KeepRunning atomic.Bool +} + +func NewTunnelRunner(Api api.Api, Lookup network.AddressLookup[netip.AddrPort]) (TunnelRunner, error) { + tunnel := tunnel.NewSimpleTunnel(Api) + if err := tunnel.Setup(); err != nil { + return TunnelRunner{}, err + } + udp_clients := network.NewUdpClients(*tunnel.UdpTunnel(), Lookup) + var keep atomic.Bool + keep.Store(true) + return TunnelRunner{ + Lookup: Lookup, + Tunnel: tunnel, + UdpClients: udp_clients, + TcpClients: network.NewTcpClients(), + KeepRunning: keep, + }, nil +} + +func (self *TunnelRunner) SetSpecialLan(setUse bool) { + self.TcpClients.UseSpecialLAN = setUse + self.UdpClients.UseSpecialLan = setUse +} + +func (self *TunnelRunner) Run() chan error { + end := make(chan error) + tunnel := self.Tunnel + go func() { + lastControlUpdate := time.Now().UnixMilli() + for self.KeepRunning.Load() { + now := time.Now().UnixMilli() + if 30_000 < time.Now().UnixMilli()-lastControlUpdate { + lastControlUpdate = now + if _, err := tunnel.ReloadControlAddr(); err != nil { + <-time.After(time.Second * 3) + continue + } + } + new_client, err := tunnel.Update() + if err != nil { + debug.Printf("Error recived: %s\n", err.Error()) + <-time.After(time.Second) + continue + } else if new_client == nil { + <-time.After(time.Second) + continue + } + debug.Println("New TCP Client") + var found *network.AddressValue[netip.AddrPort] + if found = self.Lookup.Lookup(new_client.ConnectAddr.Addr(), new_client.ConnectAddr.Port(), api.PortProto("tcp")); found == nil { + debug.Println("could not find local address for connection") + continue + } + go func() { + var ( + tunnel_conn *network.TcpClient + local_conn *net.TCPConn + err error + ) + + if tunnel_conn, err = self.TcpClients.Connect(*new_client); err != nil { + return + } + if tunnel_conn.Stream != nil { + defer tunnel_conn.Stream.Close() + } + + if local_conn, err = network.TcpSocket(self.TcpClients.UseSpecialLAN, new_client.PeerAddr, netip.AddrPortFrom(found.Value.Addr(), (new_client.ConnectAddr.Port()-found.FromPort)+found.Value.Port())); err != nil { + debug.Println(err) + return + } + defer local_conn.Close() + done := make(chan struct{}) + defer close(done) + go func() { + io.Copy(tunnel_conn.Stream, local_conn) + done <- struct{}{} + }() + go func() { + io.Copy(local_conn, tunnel_conn.Stream) + done <- struct{}{} + }() + <-done + <-done + }() + } + end <- nil + }() + + go func() { + udp := tunnel.UdpTunnel() + for self.KeepRunning.Load() { + buffer, rx, err := udp.ReceiveFrom() + if err != nil { + // if et, is := err.(net.Error); is && !et.Timeout() { + debug.Printf("UdpTunnel Error: %s\n", err.Error()) + // } + time.Sleep(time.Second) + continue + } + debug.Printf("UdpTunnel: %s\n", logfile.JSONString(rx)) + if rx.ConfirmerdConnection { + continue + } else if err := self.UdpClients.ForwardPacket(rx.ReceivedPacket.Flow, buffer); err != nil { + debug.Println(err) + panic(err) + } + } + }() + return end +} diff --git a/tunnel/address_lookup.go b/tunnel/address_lookup.go deleted file mode 100644 index 5ab52a4..0000000 --- a/tunnel/address_lookup.go +++ /dev/null @@ -1,68 +0,0 @@ -package tunnel - -import ( - "net/netip" - "slices" - - "sirherobrine23.org/playit-cloud/go-playit/api" -) - -type PortType struct { - Value string -} -func (w *PortType) IsValid() bool { - return slices.Contains(api.PortType, w.Value) -} -func (proto *PortProto) SetBoth() { - proto.Value = "both" -} -func (proto *PortProto) SetTcp() { - proto.Value = "tcp" -} -func (proto *PortProto) SetUdp() { - proto.Value = "udp" -} - -type AddressValue[T any] struct { - Value T - FromPort, ToPort uint16 -} - -type AddressLookup[T any] interface { - // Resolve address if exist return value else return nil point - Lookup(IpPort netip.AddrPort, Proto PortType) *AddressValue[T] -} - -type MatchIp struct { - IP netip.AddrPort - RegionID *uint16 -} -func (mat *MatchIp) Matches(ip netip.AddrPort) bool { - return mat.IP.Compare(ip) == 0 -} - -type MappingOverride struct { - MatchIP MatchIp - Proto PortType - Port api.PortRange - LocalAddr netip.AddrPort -} - -type LookupWithOverrides []MappingOverride - -func (Look *LookupWithOverrides) Lookup(IpPort netip.AddrPort, Proto PortType) *AddressValue[netip.AddrPort] { - for _, Over := range *Look { - if Over.Proto.Value == Proto.Value && Over.MatchIP.Matches(IpPort) { - return &AddressValue[netip.AddrPort]{ - Value: Over.LocalAddr, - FromPort: Over.Port.From, - ToPort: Over.Port.To, - } - } - } - return &AddressValue[netip.AddrPort]{ - Value: netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), IpPort.Port()), - FromPort: IpPort.Port(), - ToPort: IpPort.Port() + 1, - } -} diff --git a/tunnel/bigint.go b/tunnel/bigint.go deleted file mode 100644 index 5cbf8e7..0000000 --- a/tunnel/bigint.go +++ /dev/null @@ -1,219 +0,0 @@ -package tunnel - -import ( - "encoding/binary" - "fmt" - "io" - "net/netip" -) - -type RawSlice struct { - Buff []byte -} - -func (w *RawSlice) WriteTo(I io.Writer) error { - _, err := I.Write(w.Buff) - return err -} -func (w *RawSlice) ReadFrom(I io.Reader) error { - _, err := I.Read(w.Buff) - return err -} - -func ReadU8(w io.Reader) uint8 { - var value uint8 - binary.Read(w, binary.BigEndian, &value) - return value -} - -func WriteU8(w io.Writer, value uint8) error { - return binary.Write(w, binary.BigEndian, value) -} - -func ReadU16(w io.Reader) uint16 { - var value uint16 - binary.Read(w, binary.BigEndian, &value) - return value -} - -func WriteU16(w io.Writer, value uint16) error { - return binary.Write(w, binary.BigEndian, value) -} - -func ReadU32(w io.Reader) uint32 { - var value uint32 - binary.Read(w, binary.BigEndian, &value) - return value -} - -func WriteU32(w io.Writer, value uint32) error { - return binary.Write(w, binary.BigEndian, value) -} - -func ReadU64(w io.Reader) uint64 { - var value uint64 - binary.Read(w, binary.BigEndian, &value) - return value -} - -func WriteU64(w io.Writer, value uint64) error { - return binary.Write(w, binary.BigEndian, value) -} - -func WriteData(w io.Writer, val any) error { - return binary.Write(w, binary.BigEndian, val) -} - -func ReadBuff(w io.Reader, buff []byte) error { - for index := range buff { - if err := binary.Read(w, binary.BigEndian, &buff[index]); err != nil { - return err - } - } - return nil -} - -func ReadBuffN(w io.Reader, size int) ([]byte, error) { - buff := make([]byte, size) - return buff, ReadBuff(w, buff) -} - -func ReadU8Buff(w io.Reader, buff []uint8) error { - for index := range buff { - if err := binary.Read(w, binary.BigEndian, &buff[index]); err != nil { - return err - } - } - return nil -} - -func ReadU16Buff(w io.Reader, buff []uint16) error { - for index := range buff { - if err := binary.Read(w, binary.BigEndian, &buff[index]); err != nil { - return err - } - } - return nil -} - -func ReadU32Buff(w io.Reader, buff []uint32) error { - for index := range buff { - if err := binary.Read(w, binary.BigEndian, &buff[index]); err != nil { - return err - } - } - return nil -} - -func ReadU64Buff(w io.Reader, buff []uint64) error { - for index := range buff { - if err := binary.Read(w, binary.BigEndian, &buff[index]); err != nil { - return err - } - } - return nil -} - -func ReadOption(w io.Reader, callback func(reader io.Reader) error) error { - code := ReadU8(w) - if code == 1 { - return callback(w) - } - return nil -} - -func WriteOption(w io.Writer, value MessageEncoding) error { - fmt.Printf("%+v\n", value) - if value != nil { - if err := binary.Write(w, binary.BigEndian, uint8(1)); err != nil { - return err - } - return value.WriteTo(w) - } - return binary.Write(w, binary.BigEndian, uint8(0)) -} - -func WriteOptionU8(w io.Writer, value *uint8) error { - if value == nil { - return binary.Write(w, binary.BigEndian, uint8(0)) - } - if err := binary.Write(w, binary.BigEndian, uint8(1)); err != nil { - return err - } - return WriteU8(w, *value) -} - -func WriteOptionU16(w io.Writer, value *uint16) error { - if value == nil { - return binary.Write(w, binary.BigEndian, uint8(0)) - } - if err := binary.Write(w, binary.BigEndian, uint8(1)); err != nil { - return err - } - return WriteU16(w, *value) -} - -func WriteOptionU32(w io.Writer, value *uint32) error { - if value == nil { - return binary.Write(w, binary.BigEndian, uint8(0)) - } - if err := binary.Write(w, binary.BigEndian, uint8(1)); err != nil { - return err - } - return WriteU32(w, *value) -} - -func WriteOptionU64(w io.Writer, value *uint64) error { - if value == nil { - return binary.Write(w, binary.BigEndian, uint8(0)) - } - if err := binary.Write(w, binary.BigEndian, uint8(1)); err != nil { - return err - } - return WriteU64(w, *value) -} - -type AddressPort struct { - netip.AddrPort -} - -func (sock *AddressPort) WriteTo(w io.Writer) error { - addr := sock.Addr() - ip, _ := addr.MarshalBinary() - if addr.Is6() { - if err := WriteU8(w, uint8(6)); err != nil { - return err - } else if _, err = w.Write(ip); err != nil { - return err - } - } else { - if err := WriteU8(w, uint8(4)); err != nil { - return err - } else if _, err = w.Write(ip); err != nil { - return err - } - } - if err := WriteU16(w, sock.Port()); err != nil { - return err - } - return nil -} -func (sock *AddressPort) ReadFrom(w io.Reader) error { - switch ReadU8(w) { - case 4: - buff, err := ReadBuffN(w, 4) - if err != nil { - return err - } - sock.AddrPort = netip.AddrPortFrom(netip.AddrFrom4([4]byte(buff)), ReadU16(w)) - return nil - case 6: - buff, err := ReadBuffN(w, 16) - if err != nil { - return err - } - sock.AddrPort = netip.AddrPortFrom(netip.AddrFrom16([16]byte(buff)), ReadU16(w)) - return nil - } - return fmt.Errorf("cannot get IP type") -} diff --git a/tunnel/control.go b/tunnel/control.go index a891b72..c2b09bd 100644 --- a/tunnel/control.go +++ b/tunnel/control.go @@ -3,139 +3,129 @@ package tunnel import ( "bytes" "fmt" - "net/netip" + "net" "time" "sirherobrine23.org/playit-cloud/go-playit/api" + "sirherobrine23.org/playit-cloud/go-playit/proto" ) type AuthenticatedControl struct { - ApiClient api.Api + Api api.Api Conn ConnectedControl + LastPong proto.Pong + Registered proto.AgentRegistered + buffer *bytes.Buffer + ForceExpire bool CurrentPing *uint32 - LastPong Pong - ForceEpired bool - Registered AgentRegistered - Buff []byte } -func (Auth *AuthenticatedControl) Send(Req ControlRpcMessage[MessageEncoding]) error { - Auth.Buff = []byte{} - bufio := bytes.NewBuffer(Auth.Buff) - if err := Req.WriteTo(bufio); err != nil { +func (control *AuthenticatedControl) SendKeepAlive(requestID uint64) error { + return control.Send(proto.ControlRpcMessage[*proto.ControlRequest]{ + RequestID: requestID, + Content: &proto.ControlRequest{ + AgentKeepAlive: &control.Registered.Id, + }, + }) +} + +func (self *AuthenticatedControl) SendSetupUdpChannel(requestId uint64) error { + return self.Send(proto.ControlRpcMessage[*proto.ControlRequest]{ + RequestID: requestId, + Content: &proto.ControlRequest{ + SetupUdpChannel: &self.Registered.Id, + }, + }) +} + +func (control *AuthenticatedControl) SetupUdpChannel(requestID uint64) error { + return control.Send(proto.ControlRpcMessage[*proto.ControlRequest]{ + RequestID: requestID, + Content: &proto.ControlRequest{ + SetupUdpChannel: &control.Registered.Id, + }, + }) +} + +func (control *AuthenticatedControl) Ping(requestID uint64, Now time.Time) error { + return control.Send(proto.ControlRpcMessage[*proto.ControlRequest]{ + RequestID: requestID, + Content: &proto.ControlRequest{ + Ping: &proto.Ping{Now: Now, CurrentPing: control.CurrentPing, SessionId: &control.Registered.Id}, + }, + }) +} + +func (self *AuthenticatedControl) GetExpireAt() time.Time { + return self.Registered.ExpiresAt +} + +func (self *AuthenticatedControl) IsExpired() bool { + return self.ForceExpire || self.LastPong.SessionExpireAt == nil || self.FlowChanged() +} + +func (self *AuthenticatedControl) SetExpired() { + self.ForceExpire = true +} + +func (self *AuthenticatedControl) FlowChanged() bool { + return self.Conn.Pong.ClientAddr.Compare(self.LastPong.ClientAddr) != 0 +} + +func (self *AuthenticatedControl) Send(req proto.ControlRpcMessage[*proto.ControlRequest]) error { + self.buffer.Reset() + if err := req.WriteTo(self.buffer); err != nil { return err - } - Auth.Buff = bufio.Bytes() - _, err := Auth.Conn.Udp.WriteToUDPAddrPort(Auth.Buff, Auth.Conn.ControlAddr) - if err != nil { + } else if _, err := self.Conn.Udp.WriteTo(self.buffer.Bytes(), net.UDPAddrFromAddrPort(self.Conn.ControlAddr)); err != nil { return err } return nil } -func (Auth *AuthenticatedControl) SendKeepAlive(RequestID uint64) error { - return Auth.Send(ControlRpcMessage[MessageEncoding]{ - RequestID: RequestID, - Content: &ControlRequest{ - AgentKeepAlive: &Auth.Registered.ID, - }, - }) -} - -func (Auth *AuthenticatedControl) SendSetupUDPChannel(RequestID uint64) error { - return Auth.Send(ControlRpcMessage[MessageEncoding]{ - RequestID: RequestID, - Content: &ControlRequest{ - SetupUdpChannel: &Auth.Registered.ID, - }, - }) -} - -func (Auth *AuthenticatedControl) SendPing(RequestID uint64, Now time.Time) error { - return Auth.Send(ControlRpcMessage[MessageEncoding]{ - RequestID: RequestID, - Content: &ControlRequest{ - Ping: &Ping{ - Now: Now, - CurrentPing: Auth.CurrentPing, - SessionID: &Auth.Registered.ID, - }, - }, - }) -} - -func (Auth *AuthenticatedControl) FlowChanged() bool { - return Auth.LastPong.ClientAddr.Compare(Auth.LastPong.ClientAddr.AddrPort) != 0 -} - -func (Auth *AuthenticatedControl) IsIspired() bool { - return Auth.ForceEpired || Auth.LastPong.SessionExpireAt == nil || Auth.FlowChanged() -} - -func (Auth *AuthenticatedControl) IntoRequiresAuth() *ConnectedControl { - return &ConnectedControl{ - ControlAddr: Auth.Conn.ControlAddr, - Udp: Auth.Conn.Udp, - Pong: &Auth.LastPong, +func (self *AuthenticatedControl) IntoRequireAuth() ConnectedControl { + return ConnectedControl{ + ControlAddr: self.Conn.ControlAddr, + Udp: self.Conn.Udp, + Pong: self.LastPong, } } -type InvalidRemote struct { - Expected, Got netip.AddrPort +func (self *AuthenticatedControl) Authenticate() (AuthenticatedControl, error) { + conn := self.IntoRequireAuth() + return conn.Authenticate(self.Api) } -func (a InvalidRemote) Error() string { - return fmt.Sprintf("expected %s, got %s", a.Expected.String(), a.Got.String()) -} - -func (Auth *AuthenticatedControl) RecFeedMsg() (*ControlFeed, error) { - Auth.Buff = append(Auth.Buff, make([]byte, 1024)...) - size, remote, err := Auth.Conn.Udp.ReadFromUDP(Auth.Buff) - LogDebug.Println(size, remote, err) +func (self *AuthenticatedControl) RecvFeedMsg() (proto.ControlFeed, error) { + buff := make([]byte, 1024) + // self.Conn.Udp.SetReadDeadline(*new(time.Time)) // Remove deadline + self.Conn.Udp.SetReadDeadline(time.Now().Add(time.Microsecond * 5)) + size, remote, err := self.Conn.Udp.ReadFromUDPAddrPort(buff) if err != nil { - return nil, err - } else if remote.AddrPort().Compare(Auth.Conn.ControlAddr) != 0 { - return nil, InvalidRemote{Expected: Auth.Conn.ControlAddr, Got: remote.AddrPort()} + if et, is := err.(net.Error); is && !et.Timeout() { + debug.Printf("control reader UDP control: %s", err.Error()) + } + return proto.ControlFeed{}, err + } else if remote.Compare(self.Conn.ControlAddr) != 0 { + return proto.ControlFeed{}, fmt.Errorf("invalid remote, expected %q got %q", remote.String(), self.Conn.ControlAddr.String()) } - - var feed ControlFeed - if err := feed.ReadFrom(bytes.NewBuffer(Auth.Buff[size:])); err != nil { - return nil, err + self.buffer.Reset() + self.buffer.Write(buff[:size]) + feed := proto.ControlFeed{} + if err := feed.ReadFrom(self.buffer); err != nil { + debug.Printf("control feed reader: %s", err.Error()) + return proto.ControlFeed{}, err } - - if feed.Response != nil { - if feed.Response.Content != nil { - if feed.Response.Content.AgentRegistered != nil { - LogDebug.Println("agent registred") - LogDebug.Printf("%+v\n", feed.Response.Content.AgentRegistered) - Auth.Registered = *feed.Response.Content.AgentRegistered - } else if feed.Response.Content.Pong != nil { - CurrentPing := uint32(feed.Response.Content.Pong.RequestNow - uint64(time.Now().UnixMilli())) - Auth.CurrentPing = &CurrentPing - Auth.LastPong = *feed.Response.Content.Pong - if feed.Response.Content.Pong.SessionExpireAt != nil { - Auth.Registered.ExpiresAt = time.UnixMilli(int64(*feed.Response.Content.Pong.SessionExpireAt)) - } + if res := feed.Response; res != nil { + if registered := res.Content.AgentRegistered; registered != nil { + self.Registered = *registered + } else if pong := res.Content.Pong; pong != nil { + self.CurrentPing = new(uint32) + *self.CurrentPing = uint32(time.Now().UnixMilli() - pong.RequestNow.UnixMilli()) + self.LastPong = *pong + if expires_at := pong.SessionExpireAt; expires_at != nil { + self.Registered.ExpiresAt = *expires_at } } - } - return &feed, nil -} - -func (Auth *AuthenticatedControl) Authenticate() error { - conn, err := (&ConnectedControl{ - ControlAddr: Auth.Conn.ControlAddr, - Udp: Auth.Conn.Udp, - Pong: &Auth.LastPong, - }).Authenticate(Auth.ApiClient) - if err != nil { - return err - } - Auth.Buff = conn.Buff - Auth.Conn = conn.Conn - Auth.CurrentPing = conn.CurrentPing - Auth.LastPong = conn.LastPong - Auth.Registered = conn.Registered - return nil + return feed, nil } diff --git a/tunnel/control_feed.go b/tunnel/control_feed.go deleted file mode 100644 index 44aa202..0000000 --- a/tunnel/control_feed.go +++ /dev/null @@ -1,110 +0,0 @@ -package tunnel - -import ( - "encoding/binary" - "encoding/json" - "fmt" - "io" -) - -type ClaimInstructions struct { - Address AddressPort - Token []byte -} - -func (w *ClaimInstructions) WriteTo(I io.Writer) error { - if err := w.Address.WriteTo(I); err != nil { - return err - } else if err := WriteU64(I, uint64(len(w.Token))); err != nil { - return err - } else if err = binary.Write(I, binary.BigEndian, w.Token); err != nil { - return err - } - return nil -} -func (w *ClaimInstructions) ReadFrom(I io.Reader) error { - w.Address = AddressPort{} - if err := w.Address.ReadFrom(I); err != nil { - return err - } - w.Token = make([]byte, ReadU64(I)) - if err := ReadBuff(I, w.Token); err != nil { - return err - } - return nil -} - -type NewClient struct { - ConnectAddr AddressPort - PeerAddr AddressPort - ClaimInstructions ClaimInstructions - TunnelServerId uint64 - DataCenterId uint32 -} - -func (w *NewClient) WriteTo(I io.Writer) error { - if err := w.ConnectAddr.WriteTo(I); err != nil { - return err - } else if w.PeerAddr.WriteTo(I); err != nil { - return err - } else if w.ClaimInstructions.WriteTo(I); err != nil { - return err - } else if err := WriteU64(I, w.TunnelServerId); err != nil { - return err - } else if err := WriteU32(I, w.DataCenterId); err != nil { - return err - } - return nil -} -func (w *NewClient) ReadFrom(I io.Reader) error { - w.ConnectAddr, w.PeerAddr = AddressPort{}, AddressPort{} - if err := w.ConnectAddr.ReadFrom(I); err != nil { - return err - } else if err := w.PeerAddr.ReadFrom(I); err != nil { - return err - } else if err := w.ClaimInstructions.ReadFrom(I); err != nil { - return err - } - w.TunnelServerId, w.DataCenterId = ReadU64(I), ReadU32(I) - return nil -} - -type ControlFeed struct { - Response *ControlRpcMessage[*ControlResponse] - NewClient *NewClient -} - -func (w *ControlFeed) WriteTo(I io.Writer) error { - defer func(){ - d, _ := json.MarshalIndent(w, "", " ") - LogDebug.Printf("Write Feed: %s\n", string(d)) - }() - if w.Response != nil { - if err := WriteU32(I, 1); err != nil { - return err - } - return w.Response.WriteTo(I) - } else if w.NewClient != nil { - if err := WriteU32(I, 2); err != nil { - return err - } - return w.NewClient.WriteTo(I) - } - return fmt.Errorf("set ResponseControl or NewClient") -} -func (w *ControlFeed) ReadFrom(I io.Reader) error { - defer func(){ - d, _ := json.MarshalIndent(w, "", " ") - LogDebug.Printf("Read Feed: %s\n", string(d)) - }() - switch ReadU32(I) { - case 1: - w.Response = &ControlRpcMessage[*ControlResponse]{} - w.Response.Content = &ControlResponse{} - return w.Response.ReadFrom(I) - case 2: - w.NewClient = &NewClient{} - return w.NewClient.ReadFrom(I) - } - return fmt.Errorf("invalid ControlFeed id") -} diff --git a/tunnel/control_message.go b/tunnel/control_message.go deleted file mode 100644 index 6d1afb6..0000000 --- a/tunnel/control_message.go +++ /dev/null @@ -1,411 +0,0 @@ -package tunnel - -import ( - "encoding/binary" - "fmt" - "io" - "time" -) - -type Ping struct { - Now time.Time - CurrentPing *uint32 - SessionID *AgentSessionId -} - -func (w *Ping) WriteTo(I io.Writer) error { - if err := WriteU64(I, uint64(w.Now.UnixMilli())); err != nil { - return err - } - - if w.CurrentPing == nil { - if err := binary.Write(I, binary.BigEndian, uint8(0)); err != nil { - return err - } - } else { - if err := binary.Write(I, binary.BigEndian, uint8(1)); err != nil { - return err - } else if err := binary.Write(I, binary.BigEndian, w.CurrentPing); err != nil { - return err - } - } - - if w.SessionID == nil { - return binary.Write(I, binary.BigEndian, uint8(0)) - } else if err := binary.Write(I, binary.BigEndian, uint8(1)); err != nil { - return err - } - return w.SessionID.WriteTo(I) -} -func (w *Ping) ReadFrom(I io.Reader) error { - w.Now = time.UnixMilli(int64(ReadU64(I))) - - CurrentPing := ReadU32(I) - w.CurrentPing = &CurrentPing - - w.SessionID = &AgentSessionId{} - w.SessionID.ReadFrom(I) - return nil -} - -type Pong struct { - RequestNow uint64 - ServerNow uint64 - ServerId uint64 - DataCenterId uint32 - ClientAddr AddressPort - TunnelAddr AddressPort - SessionExpireAt *uint64 -} - -func (w *Pong) WriteTo(I io.Writer) error { - if err := WriteU64(I, w.RequestNow); err != nil { - return err - } else if err := WriteU64(I, w.ServerNow); err != nil { - return err - } else if err := WriteU64(I, w.ServerId); err != nil { - return err - } else if err := WriteU32(I, w.DataCenterId); err != nil { - return err - } else if err := w.ClientAddr.WriteTo(I); err != nil { - return err - } else if err := w.TunnelAddr.WriteTo(I); err != nil { - return err - } else if err := WriteOptionU64(I, w.SessionExpireAt); err != nil { - return err - } - return nil -} -func (w *Pong) ReadFrom(I io.Reader) error { - w.RequestNow, w.ServerNow, w.ServerId = ReadU64(I), ReadU64(I), ReadU64(I) - w.DataCenterId = ReadU32(I) - w.ClientAddr = AddressPort{} - w.TunnelAddr = AddressPort{} - - if err := w.ClientAddr.ReadFrom(I); err != nil { - return err - } else if err := w.TunnelAddr.ReadFrom(I); err != nil { - return err - } - - Sess := ReadU64(I) - w.SessionExpireAt = &Sess - return nil -} - -type AgentRegister struct { - AccountID, AgentId, AgentVersion, Timestamp uint64 - ClientAddr, TunnelAddr AddressPort - Signature []byte // 32 bytes -} - -func (w *AgentRegister) WritePlain(buff io.Writer) error { - if err := WriteU64(buff, w.AccountID); err != nil { - return err - } else if err := WriteU64(buff, w.AgentId); err != nil { - return err - } else if err := WriteU64(buff, w.AgentVersion); err != nil { - return err - } else if err := WriteU64(buff, w.Timestamp); err != nil { - return err - } else if err := w.ClientAddr.WriteTo(buff); err != nil { - return err - } else if err := w.TunnelAddr.WriteTo(buff); err != nil { - return err - } - return nil -} -func (w *AgentRegister) WriteTo(I io.Writer) error { - if err := WriteU64(I, w.AccountID); err != nil { - return err - } else if err := WriteU64(I, w.AgentId); err != nil { - return err - } else if err := WriteU64(I, w.AgentVersion); err != nil { - return err - } else if err := WriteU64(I, w.Timestamp); err != nil { - return err - } else if err := w.ClientAddr.WriteTo(I); err != nil { - return err - } else if err := w.TunnelAddr.WriteTo(I); err != nil { - return err - } else if err := binary.Write(I, binary.BigEndian, w.Signature); err != nil { - return err - } - return nil -} -func (w *AgentRegister) ReadFrom(I io.Reader) error { - w.AccountID = ReadU64(I) - w.AgentId = ReadU64(I) - w.AgentVersion = ReadU64(I) - w.Timestamp = ReadU64(I) - w.ClientAddr, w.TunnelAddr = AddressPort{}, AddressPort{} - if err := w.ClientAddr.ReadFrom(I); err != nil { - return err - } else if err := w.TunnelAddr.ReadFrom(I); err != nil { - return err - } - w.Signature = make([]byte, 32) - if err := ReadBuff(I, w.Signature); err != nil { - return err - } - return nil -} - -type AgentCheckPortMapping struct { - AgentSessionId AgentSessionId - PortRange PortRange -} - -func (w *AgentCheckPortMapping) WriteTo(I io.Writer) error { - if err := w.AgentSessionId.WriteTo(I); err != nil { - return err - } else if err := w.PortRange.WriteTo(I); err != nil { - return err - } - return nil -} -func (w *AgentCheckPortMapping) ReadFrom(I io.Reader) error { - w.AgentSessionId, w.PortRange = AgentSessionId{}, PortRange{} - if err := w.AgentSessionId.ReadFrom(I); err != nil { - return err - } else if err := w.PortRange.ReadFrom(I); err != nil { - return err - } - return nil -} - -type ControlRequest struct { - Ping *Ping - AgentRegister *AgentRegister - AgentKeepAlive *AgentSessionId - SetupUdpChannel *AgentSessionId - AgentCheckPortMapping *AgentCheckPortMapping -} - -func (w *ControlRequest) WriteTo(I io.Writer) error { - if w.Ping != nil { - if err := WriteU32(I, uint32(6)); err != nil { - return err - } - return w.Ping.WriteTo(I) - } else if w.AgentRegister != nil { - if err := WriteU32(I, uint32(2)); err != nil { - return err - } - return w.AgentRegister.WriteTo(I) - } else if w.AgentKeepAlive != nil { - if err := WriteU32(I, uint32(3)); err != nil { - return err - } - return w.AgentKeepAlive.WriteTo(I) - } else if w.SetupUdpChannel != nil { - if err := WriteU32(I, uint32(4)); err != nil { - return err - } - return w.SetupUdpChannel.WriteTo(I) - } else if w.AgentCheckPortMapping != nil { - if err := WriteU32(I, uint32(5)); err != nil { - return err - } - return w.AgentCheckPortMapping.WriteTo(I) - } - return fmt.Errorf("set ControlRequest") -} -func (w *ControlRequest) ReadFrom(I io.Reader) error { - switch ReadU32(I) { - case 1: - w.Ping = &Ping{} - return w.Ping.ReadFrom(I) - case 2: - w.AgentRegister = &AgentRegister{} - return w.AgentRegister.ReadFrom(I) - case 3: - w.AgentKeepAlive = &AgentSessionId{} - return w.AgentKeepAlive.ReadFrom(I) - case 4: - w.SetupUdpChannel = &AgentSessionId{} - return w.SetupUdpChannel.ReadFrom(I) - case 5: - w.AgentCheckPortMapping = &AgentCheckPortMapping{} - return w.AgentCheckPortMapping.ReadFrom(I) - } - return fmt.Errorf("invalid ControlRequest id") -} - -type AgentRegistered struct { - ID AgentSessionId - ExpiresAt time.Time -} - -func (w *AgentRegistered) WriteTo(I io.Writer) error { - if err := w.ID.WriteTo(I); err != nil { - return err - } else if err := WriteU64(I, uint64(w.ExpiresAt.UnixMilli())); err != nil { - return err - } - return nil -} -func (w *AgentRegistered) ReadFrom(I io.Reader) error { - w.ID = AgentSessionId{} - if err := w.ID.ReadFrom(I); err != nil { - return err - } - w.ExpiresAt = time.UnixMilli(int64(ReadU64(I))) - return nil -} - -type AgentPortMappingFound struct { - ToAgent *AgentSessionId -} - -func (agentPort *AgentPortMappingFound) WriteTo(I io.Writer) error { - if agentPort.ToAgent != nil { - if err := WriteU32(I, 1); err != nil { - return err - } else if err := agentPort.ToAgent.WriteTo(I); err != nil { - return err - } - } - return nil -} -func (agentPort *AgentPortMappingFound) ReadFrom(I io.Reader) error { - if ReadU32(I) == 1 { - agentPort.ToAgent = &AgentSessionId{} - return agentPort.ToAgent.ReadFrom(I) - } - return fmt.Errorf("unknown AgentPortMappingFound id") -} - -type AgentPortMapping struct { - Range PortRange - Found *AgentPortMappingFound -} - -func (w *AgentPortMapping) WriteTo(I io.Writer) error { - if err := w.Range.WriteTo(I); err != nil { - return err - } else if err := w.Found.WriteTo(I); err != nil { - return err - } - return nil -} -func (w *AgentPortMapping) ReadFrom(I io.Reader) error { - if err := w.Range.ReadFrom(I); err != nil { - return err - } else if err := w.Found.ReadFrom(I); err != nil { - return err - } - return nil -} - -type UdpChannelDetails struct { - TunnelAddr AddressPort - Token []byte -} - -func (w *UdpChannelDetails) WriteTo(I io.Writer) error { - if err := w.TunnelAddr.WriteTo(I); err != nil { - return err - } else if err := WriteU64(I, uint64(len(w.Token))); err != nil { - return err - } else if err := binary.Write(I, binary.BigEndian, w.Token); err != nil { - return err - } - return nil -} -func (w *UdpChannelDetails) ReadFrom(I io.Reader) error { - w.TunnelAddr = AddressPort{} - if err := w.TunnelAddr.ReadFrom(I); err != nil { - return err - } - w.Token = make([]byte, ReadU64(I)) - if err := ReadBuff(I, w.Token); err != nil { - return err - } - return nil -} - -type ControlResponse struct { - InvalidSignature bool - Unauthorized bool - RequestQueued bool - TryAgainLater bool - Pong *Pong - AgentRegistered *AgentRegistered - AgentPortMapping *AgentPortMapping - UdpChannelDetails *UdpChannelDetails -} - -func (w *ControlResponse) WriteTo(I io.Writer) error { - if w.Pong != nil { - if err := WriteU32(I, 1); err != nil { - return err - } - return w.Pong.WriteTo(I) - } else if w.InvalidSignature { - if err := WriteU32(I, 2); err != nil { - return err - } - return nil - } else if w.Unauthorized { - if err := WriteU32(I, 3); err != nil { - return err - } - return nil - } else if w.RequestQueued { - if err := WriteU32(I, 4); err != nil { - return err - } - return nil - } else if w.TryAgainLater { - if err := WriteU32(I, 5); err != nil { - return err - } - return nil - } else if w.AgentRegistered != nil { - if err := WriteU32(I, 6); err != nil { - return err - } - return w.AgentRegistered.WriteTo(I) - } else if w.AgentPortMapping != nil { - if err := WriteU32(I, 7); err != nil { - return err - } - return w.AgentPortMapping.WriteTo(I) - } else if w.UdpChannelDetails != nil { - if err := WriteU32(I, 8); err != nil { - return err - } - return w.UdpChannelDetails.WriteTo(I) - } - return fmt.Errorf("set one option to write") -} -func (w *ControlResponse) ReadFrom(I io.Reader) error { - switch ReadU32(I) { - case 1: - w.Pong = &Pong{} - return w.Pong.ReadFrom(I) - case 2: - w.InvalidSignature = true - return nil - case 3: - w.Unauthorized = true - return nil - case 4: - w.RequestQueued = true - return nil - case 5: - w.TryAgainLater = true - return nil - case 6: - w.AgentRegistered = &AgentRegistered{} - return w.AgentRegistered.ReadFrom(I) - case 7: - w.AgentPortMapping = &AgentPortMapping{} - return w.AgentPortMapping.ReadFrom(I) - case 8: - w.UdpChannelDetails = &UdpChannelDetails{} - return w.UdpChannelDetails.ReadFrom(I) - } - return fmt.Errorf("invalid ControlResponse id") -} diff --git a/tunnel/lan_address.go b/tunnel/lan_address.go deleted file mode 100644 index 57b3b7b..0000000 --- a/tunnel/lan_address.go +++ /dev/null @@ -1,84 +0,0 @@ -package tunnel - -import ( - "encoding/binary" - "net" - "net/netip" -) - -func shuffle(v uint32) uint32 { - v = ((v >> 16) ^ v) * 0x45d9f3 - v = ((v >> 16) ^ v) * 0x45d9f3 - v = (v >> 16) ^ v - return v -} - -func asLocalMasked(ip uint32) uint32 { - ip = shuffle(ip) & 0x00FFFFFF - if ip == 0 { - ip = 1 - } - return ip | 0x7F000000 -} - -func mapToLocalIP4(ip net.IP) net.IP { - var ipUint32 uint32 - if ip.To4() != nil { // Check if it's already IPv4 - ipUint32 = binary.BigEndian.Uint32(ip.To4()) - } else { // Handle IPv6 - bytes := ip.To16() // Convert to IPv6 bytes - ipUint32 = shuffle(binary.BigEndian.Uint32(bytes[0:4])) ^ - shuffle(binary.BigEndian.Uint32(bytes[4:8])) ^ - shuffle(binary.BigEndian.Uint32(bytes[8:12])) ^ - shuffle(binary.BigEndian.Uint32(bytes[12:16])) - } - - return net.IPv4( - byte(asLocalMasked(ipUint32)>>24), - byte(asLocalMasked(ipUint32)>>16), - byte(asLocalMasked(ipUint32)>>8), - byte(asLocalMasked(ipUint32)), - ) -} - -func TcpSocket(SpecialLan bool, Peer, Host netip.AddrPort) (*net.TCPConn, error) { - isLoopback := Host.Addr().IsLoopback() - if isLoopback && SpecialLan { - local_ip := mapToLocalIP4(Peer.Addr().AsSlice()); - stream, err := net.DialTCP("tcp4", net.TCPAddrFromAddrPort(netip.AddrPortFrom(netip.AddrFrom4([4]byte(local_ip.To4())), 0)), net.TCPAddrFromAddrPort(Host)) - if err != nil { - LogDebug.Printf("Failed to establish connection using special lan %s for flow %s -> %s\n", local_ip, Peer.String(), Host.String()) - return nil, err - } - return stream, nil - } - LogDebug.Printf("Failed to bind connection to special local address to support IP based banning") - stream, err := net.DialTCP("tcp", nil, net.TCPAddrFromAddrPort(Host)) - if err != nil { - LogDebug.Printf("Failed to establish connection for flow %s -> %s. Is your server running? %q", Peer.String(), Host.String(), err.Error()) - return nil, err - } - return stream, nil -} - -func UdpSocket(SpecialLan bool, Peer, Host netip.AddrPort) (*net.UDPConn, error) { - isLoopback := Host.Addr().IsLoopback() - if isLoopback && SpecialLan { - local_ip := mapToLocalIP4(Peer.Addr().AsSlice()); - local_port := 40000 + (Peer.Port() % 24000); - stream, err := net.DialUDP("udp4", net.UDPAddrFromAddrPort(netip.AddrPortFrom(netip.AddrFrom16([16]byte(local_ip)), local_port)), net.UDPAddrFromAddrPort(Host)) - if err != nil { - LogDebug.Printf("Failed to bind UDP port to %d to have connections survive agent restart: %s", local_port, err.Error()) - stream, err = net.DialUDP("udp4", net.UDPAddrFromAddrPort(netip.AddrPortFrom(netip.AddrFrom16([16]byte(local_ip)), 0)), net.UDPAddrFromAddrPort(Host)) - if err != nil { - stream, err = net.DialUDP("udp4", nil, nil) - if err != nil { - return nil, err - } - LogDebug.Printf("Failed to bind UDP to special local address, in-game ip banning will not work: %s", err.Error()) - } - } - return stream, nil - } - return net.DialUDP("udp", nil, net.UDPAddrFromAddrPort(Host)) -} \ No newline at end of file diff --git a/tunnel/lib.go b/tunnel/lib.go deleted file mode 100644 index 694970f..0000000 --- a/tunnel/lib.go +++ /dev/null @@ -1,85 +0,0 @@ -package tunnel - -import ( - "encoding/binary" - "fmt" - "io" - "net" -) - -type PortProto struct { - // 1 => "tcp" - // - // 2 => "udp" - // - // 3 => "both" - Value string -} - -func (w *PortProto) WriteTo(I io.Writer) error { - switch w.Value { - case "tcp": return WriteU8(I, 1) - case "udp": return WriteU8(I, 2) - case "both": return WriteU8(I, 3) - } - return fmt.Errorf("set valid proto") -} -func (w *PortProto) ReadFrom(I io.Reader) error { - switch ReadU8(I) { - case 1: - w.Value = "tcp" - return nil - case 2: - w.Value = "udp" - return nil - case 3: - w.Value = "both" - return nil - } - return fmt.Errorf("invalid proto") -} - -type AgentSessionId struct { - SessionID, AccountID, AgentID uint64 -} - -func (w *AgentSessionId) WriteTo(I io.Writer) error { - var err error - if err = WriteU64(I, w.SessionID); err != nil { - return err - } else if err = WriteU64(I, w.AccountID); err != nil { - return err - } else if err = WriteU64(I, w.AgentID); err != nil { - return err - } - - return nil -} -func (w *AgentSessionId) ReadFrom(I io.Reader) error { - w.SessionID, w.AccountID, w.AgentID = ReadU64(I), ReadU64(I), ReadU64(I) - return nil -} - -type PortRange struct { - IP net.IP - PortStart uint16 - PortEnd uint16 - PortProto PortProto -} - -func (w *PortRange) WriteTo(I io.Writer) error { - if err := binary.Write(I, binary.BigEndian, w.IP); err != nil { - return err - } else if err := WriteU16(I, w.PortStart); err != nil { - return err - } else if err := WriteU16(I, w.PortEnd); err != nil { - return err - } else if err := w.PortProto.WriteTo(I); err != nil { - return err - } - return nil -} -func (w *PortRange) ReadFrom(I io.Reader) error { - - return nil -} \ No newline at end of file diff --git a/tunnel/rpc.go b/tunnel/rpc.go deleted file mode 100644 index 15cc5c9..0000000 --- a/tunnel/rpc.go +++ /dev/null @@ -1,36 +0,0 @@ -package tunnel - -import ( - "encoding/json" - "io" -) - -type MessageEncoding interface { - WriteTo(I io.Writer) error - ReadFrom(I io.Reader) error -} - -type ControlRpcMessage[T MessageEncoding] struct { - RequestID uint64 - Content T // Convert with .(*type) -} - -func (w *ControlRpcMessage[T]) WriteTo(I io.Writer) error { - if err := WriteU64(I, w.RequestID); err != nil { - return err - } - defer func() { - d, _ := json.MarshalIndent(w, "", " ") - LogDebug.Printf("Write RPC: %s\n", string(d)) - }() - return w.Content.WriteTo(I) -} - -func (w *ControlRpcMessage[T]) ReadFrom(I io.Reader) error { - w.RequestID = ReadU64(I) - defer func() { - d, _ := json.MarshalIndent(w, "", " ") - LogDebug.Printf("Read RPC: %s\n", string(d)) - }() - return w.Content.ReadFrom(I) -} diff --git a/tunnel/runner.go b/tunnel/runner.go deleted file mode 100644 index b766f3a..0000000 --- a/tunnel/runner.go +++ /dev/null @@ -1,94 +0,0 @@ -package tunnel - -import ( - "net/netip" - "os" - "os/signal" - "sync/atomic" - "time" -) - -type TunnelRunner struct { - Lookup AddressLookup[netip.AddrPort] - Tunnel SimplesTunnel - KeepRunning atomic.Bool -} - -func (tun *TunnelRunner) UseSpecialLan(set bool) { - panic("no implemented UseSpecialLan") -} - -func (tun *TunnelRunner) Run() error { - channel := make(chan error) - // udp := tun.Tunnel.UdpTunnel - - // Setup Tunnel - if err := tun.Tunnel.Setup(); err != nil { - return err - } - LogDebug.Println("Success Tunnel setup") - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { - <-c - LogDebug.Println("Cleaning process") - tun.KeepRunning.Store(false) - tun.Tunnel.ControlChannel.Conn.Udp.Close() - os.Exit(1) - }() - - // TCP Clients - go func() { - lastControlUpdate := time.Now().UnixMilli() - defer os.Exit(2) - for tun.KeepRunning.Load() { - now := time.Now().UnixMilli() - if 30_000 < now-lastControlUpdate { - lastControlUpdate = now - LogDebug.Println("Reloading control addr") - if _, err := tun.Tunnel.ReloadControlAddr(); err != nil { - LogDebug.Println("failed to reload control addr") - LogDebug.Println(err) - return - } - } - - newClient, err := tun.Tunnel.Update() - if err != nil { - LogDebug.Println(err.Error()) - channel <- err - return - } else if newClient == nil { - continue - } - LogDebug.Println("tcp client") - LogDebug.Printf("%#v\n", newClient) - LogDebug.Panic(newClient) - } - }() - - // go func() { - // buffer := make([]byte, 2048) - // had_success := false - // for tun.KeepRunning.Load() { - // LogDebug.Println("rec udp tun") - // rx, err := udp.ReceiveFrom(buffer) - // if err != nil { - // LogDebug.Println(err) - // if had_success { - // LogDebug.Panicln("got error") - // } - // time.Sleep(time.Second) - // continue - // } - // LogDebug.Println("success") - // had_success = true - // if rx.ConfirmerdConnection { - // continue - // } - // LogDebug.Printf("%#v\n", rx.ReceivedPacket) - // } - // }() - return <-channel -} diff --git a/tunnel/rwlock/rwlock.go b/tunnel/rwlock/rwlock.go deleted file mode 100644 index c297a61..0000000 --- a/tunnel/rwlock/rwlock.go +++ /dev/null @@ -1,22 +0,0 @@ -package rwlock - -import "sync" - -type Rwlock[T any] struct { - Value T - sync.RWMutex -} - -// Get writer value and return unlocker function -// -// if call this function before end call function -func (rw *Rwlock[T]) Write() (T, func()) { - rw.Lock() - return rw.Value, rw.Unlock -} - -// Get reader value and unlocker function -func (rw *Rwlock[T]) Read() (T, func()) { - rw.RLock() - return rw.Value, rw.RUnlock -} \ No newline at end of file diff --git a/tunnel/setup.go b/tunnel/setup.go index f07a7dc..557fecd 100644 --- a/tunnel/setup.go +++ b/tunnel/setup.go @@ -9,182 +9,155 @@ import ( "time" "sirherobrine23.org/playit-cloud/go-playit/api" + "sirherobrine23.org/playit-cloud/go-playit/proto" ) type SetupFindSuitableChannel struct { - Address []netip.AddrPort + options []netip.AddrPort } -func (Setup *SetupFindSuitableChannel) Setup() (*ConnectedControl, error) { - for _, Addr := range Setup.Address { - network := "udp6" - if Addr.Addr().Is4() && !Addr.Addr().Is4In6() { - network = "udp4" +func (self *SetupFindSuitableChannel) Setup() (ConnectedControl, error) { + for _, addr := range self.options { + var ( + err error + socket *net.UDPConn + ) + isIPv6 := addr.Addr().Is6() + if isIPv6 { + if socket, err = net.ListenUDP("udp6", nil); err != nil { + continue // Next address to listen + } + } else { + if socket, err = net.ListenUDP("udp4", nil); err != nil { + continue // Next address to listen + } } - conn, err := net.ListenUDP(network, nil) - if err != nil { - fmt.Println(err.Error()) - continue + var attempts int + if attempts = 3; isIPv6 { + attempts = 1 } - - for range 3 { - // Make initial ping - buffer := bytes.NewBuffer([]byte{}) - if err = (&ControlRpcMessage[*ControlRequest]{ + for range attempts { + buffer := new(bytes.Buffer) + if err := (&proto.ControlRpcMessage[*proto.ControlRequest]{ RequestID: 1, - Content: &ControlRequest{ - Ping: &Ping{ + Content: &proto.ControlRequest{ + Ping: &proto.Ping{ Now: time.Now(), CurrentPing: nil, - SessionID: nil, + SessionId: nil, }, }, }).WriteTo(buffer); err != nil { - conn.Close() - return nil, err + continue } - - // Write initial ping - _, err = conn.WriteToUDP(buffer.Bytes(), net.UDPAddrFromAddrPort(Addr)) - if err != nil { - conn.Close() + if _, err := socket.WriteTo(buffer.Bytes(), net.UDPAddrFromAddrPort(addr)); err != nil { break } - for range 5 { - buff := make([]byte, 2048) - if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 5)); err != nil { - return nil, err - } - bytesSize, peer, err := conn.ReadFrom(buff) + buffer.Reset() + var waits int + if waits = 5; isIPv6 { + waits = 3 + } + for range waits { + buff := make([]byte, 1024) + socket.SetReadDeadline(time.Now().Add(time.Millisecond * 500)) + size, peer, err := socket.ReadFrom(buff) if err != nil { - if netErr, isNet := err.(net.Error); isNet { - if netErr.Timeout() { - continue - } + if err, ok := err.(net.Error); ok && err.Timeout() { + continue } - return nil, err - } else if peer.String() != Addr.String() { + break + } else if peer.String() != addr.String() { continue } - - buff = buff[:bytesSize] - var feed ControlFeed - if err := feed.ReadFrom(bytes.NewReader(buff)); err != nil { - return nil, err + buffer = bytes.NewBuffer(buff[:size]) + feed := proto.ControlFeed{} + if err := feed.ReadFrom(buffer); err != nil { + break } else if feed.Response == nil { - return nil, fmt.Errorf("unexpected control feed") + break + } else if feed.Response.RequestID != 1 { + break + } else if feed.Response.Content.Pong == nil { + break } - - msg := feed.Response - if msg.RequestID != 1 { - continue - } else if msg.Content.Pong == nil { - return nil, fmt.Errorf("expected pong got other response") - } - return &ConnectedControl{ - ControlAddr: Addr, - Udp: conn, - Pong: msg.Content.Pong, - }, nil + return ConnectedControl{addr, *socket, *feed.Response.Content.Pong}, nil } } + socket.Close() } - - return nil, fmt.Errorf("cannot make UDP tunnel to playit controller, check you internet conenction") + return ConnectedControl{}, fmt.Errorf("failed to connectans setup initial connection") } type ConnectedControl struct { ControlAddr netip.AddrPort - Udp *net.UDPConn - Pong *Pong + Udp net.UDPConn + Pong proto.Pong } -func (Control *ConnectedControl) Authenticate(Api api.Api) (*AuthenticatedControl, error) { - if !Control.Pong.ClientAddr.AddrPort.IsValid() { - return nil, fmt.Errorf("invalid pong Client address") - } else if !Control.Pong.TunnelAddr.AddrPort.IsValid() { - return nil, fmt.Errorf("invalid pong Tunnel address") - } - - LogDebug.Println("Registring agent proto") - tk, err := Api.ProtoRegisterRegister(Control.Pong.ClientAddr.AddrPort, Control.Pong.TunnelAddr.AddrPort) +func (self *ConnectedControl) Authenticate(Api api.Api) (AuthenticatedControl, error) { + key, err := Api.ProtoRegisterRegister(self.Pong.ClientAddr, self.Pong.TunnelAddr) if err != nil { - LogDebug.Println("failed to sign and register") - return nil, err + return AuthenticatedControl{}, err } - - tkBytes, err := hex.DecodeString(tk) + keyBytes, err := hex.DecodeString(key) if err != nil { - return nil, err + return AuthenticatedControl{}, err } - for range 5 { - buffer := bytes.NewBuffer([]byte{}) - if err := (&ControlRpcMessage[*RawSlice]{ + buffer := new(bytes.Buffer) + if err := (&proto.ControlRpcMessage[proto.RawSlice]{ RequestID: 10, - Content: &RawSlice{ - Buff: tkBytes, - }, + Content: proto.RawSlice(keyBytes), }).WriteTo(buffer); err != nil { - return nil, err + return AuthenticatedControl{}, err + } else if _, err := self.Udp.WriteTo(buffer.Bytes(), net.UDPAddrFromAddrPort(self.ControlAddr)); err != nil { + return AuthenticatedControl{}, err } - _, err := Control.Udp.WriteTo(buffer.Bytes(), net.UDPAddrFromAddrPort(Control.ControlAddr)) - if err != nil { - return nil, err - } - for range 5 { - reciver := append(buffer.Bytes(), make([]byte, 1024)...) - Control.Udp.SetReadDeadline(time.Now().Add(time.Millisecond * 5)) - recSize, remote, err := Control.Udp.ReadFrom(reciver) + buff := make([]byte, 1024) + self.Udp.SetReadDeadline(time.Now().Add(time.Millisecond * 5)) + size, remote, err := self.Udp.ReadFromUDPAddrPort(buff) if err != nil { - if errNet, isNet := err.(net.Error); isNet { - if errNet.Timeout() { - LogDebug.Println("Timeout") - break - } + if at, ok := err.(net.Error); ok && at.Timeout() { + continue } - return nil, err - } else if remote.String() != Control.ControlAddr.String() { - LogDebug.Println("got response not from tunnel server") + break + } else if self.ControlAddr.Compare(remote) != 0 { + continue + } + buffer.Reset() + buffer.Write(buff[:size]) // Write only reader data + var feed proto.ControlFeed + if err := feed.ReadFrom(buffer); err != nil { continue } - feed := &ControlFeed{} - if err = feed.ReadFrom(bytes.NewReader(reciver[:recSize])); err != nil { - LogDebug.Println("failed to read response from tunnel") - return nil, err - } else if feed.Response.RequestID != 10 { - LogDebug.Println("got response for different request") - continue - } else if feed.Response == nil || feed.Response.Content == nil { - LogDebug.Println("feed response or Response content is empty") - return nil, fmt.Errorf("cannot get response") + if response := feed.Response; response != nil { + if response.RequestID != 10 { + continue + } + if content := response.Content; content.RequestQueued { + time.Sleep(time.Second) // Sleep to wait register + break + } else if content.InvalidSignature { + return AuthenticatedControl{}, fmt.Errorf("invalid signature") + } else if content.Unauthorized { + return AuthenticatedControl{}, fmt.Errorf("unauthorized") + } else if registered := content.AgentRegistered; registered != nil { + return AuthenticatedControl{ + Api: Api, + Conn: *self, + LastPong: self.Pong, + Registered: *registered, + buffer: buffer, + CurrentPing: nil, + ForceExpire: false, + }, nil + } } - - controlRes := feed.Response.Content - if controlRes.RequestQueued { - LogDebug.Println("register queued, waiting 1s") - time.Sleep(time.Second) - continue - } else if controlRes.InvalidSignature { - return nil, fmt.Errorf("register return invalid signature") - } else if controlRes.Unauthorized { - return nil, fmt.Errorf("unauthorized") - } else if controlRes.AgentRegistered != nil { - return &AuthenticatedControl{ - ApiClient: Api, - Conn: *Control, - LastPong: *Control.Pong, - CurrentPing: nil, - Registered: *controlRes.AgentRegistered, - Buff: reciver[recSize:], - ForceEpired: false, - }, nil - } - LogDebug.Println("expected AgentRegistered but got something else") } } - return nil, fmt.Errorf("failed1 to connect agent") + return AuthenticatedControl{}, fmt.Errorf("failed to connect and authenticate") } diff --git a/tunnel/simple_tunnel.go b/tunnel/simple_tunnel.go new file mode 100644 index 0000000..fd2d23b --- /dev/null +++ b/tunnel/simple_tunnel.go @@ -0,0 +1,191 @@ +package tunnel + +import ( + "net" + "net/netip" + "slices" + "time" + + "sirherobrine23.org/playit-cloud/go-playit/api" + "sirherobrine23.org/playit-cloud/go-playit/proto" +) + +func getControlAddresses(api api.Api) ([]netip.AddrPort, error) { + routing, err := api.AgentRoutings(nil) + if err != nil { + return nil, err + } + addresses := []netip.AddrPort{} + for _, ipd := range append(routing.Targets6, routing.Targets4...) { + addresses = append(addresses, netip.AddrPortFrom(ipd, 5525)) + } + return addresses, nil +} + +type SimpleTunnel struct { + api api.Api + controlAddr netip.AddrPort + ControlChannel AuthenticatedControl + udpTunnel *UdpTunnel + lastKeepAlive, lastPing, lastPong, lastUdpAuth time.Time + lastControlTargets []netip.AddrPort +} + +func NewSimpleTunnel(Api api.Api) SimpleTunnel { + return SimpleTunnel{ + api: Api, + } +} + +func (self *SimpleTunnel) Setup() error { + udpTunnel := new(UdpTunnel) + if err := AssignUdpTunnel(udpTunnel); err != nil { + return err + } + + addresses, err := getControlAddresses(self.api) + if err != nil { + return err + } + setup, err := (&SetupFindSuitableChannel{addresses}).Setup() + if err != nil { + return err + } + + controlChannel, err := setup.Authenticate(self.api) + if err != nil { + return err + } + + self.lastControlTargets = addresses + self.controlAddr = setup.ControlAddr + self.ControlChannel = controlChannel + self.udpTunnel = udpTunnel + self.lastKeepAlive = time.UnixMilli(0) + self.lastPing = time.UnixMilli(0) + self.lastPong = time.UnixMilli(0) + self.lastUdpAuth = time.UnixMilli(0) + return nil +} + +func (self *SimpleTunnel) ReloadControlAddr() (bool, error) { + addresses, err := getControlAddresses(self.api) + if err != nil { + return false, err + } else if slices.ContainsFunc(self.lastControlTargets, func(e1 netip.AddrPort) bool { + return !slices.Contains(addresses, e1) + }) { + return false, nil + } + setup, err := (&SetupFindSuitableChannel{addresses}).Setup() + if err != nil { + return false, err + } + updated, err := self.UpdateControlAddr(setup) + if err == nil { + self.lastControlTargets = addresses + } + return updated, err +} +func (self *SimpleTunnel) UpdateControlAddr(connected ConnectedControl) (bool, error) { + newControlAddr := connected.ControlAddr + if self.controlAddr.Compare(newControlAddr) == 0 { + return false, nil + } + controlChannel, err := connected.Authenticate(self.api) + if err != nil { + return false, err + } + self.ControlChannel = controlChannel + self.controlAddr = newControlAddr + self.lastPing, self.lastKeepAlive, self.lastUdpAuth = time.UnixMilli(0), time.UnixMilli(0), time.UnixMilli(0) + self.udpTunnel.InvalidateSession() + return true, nil +} + +func (self *SimpleTunnel) UdpTunnel() *UdpTunnel { + return self.udpTunnel +} + +func (self *SimpleTunnel) Update() (*proto.NewClient, error) { + if self.ControlChannel.IsExpired() { + auth, err := self.ControlChannel.Authenticate() + if err != nil { + time.Sleep(time.Second * 2) + return nil, err + } + self.ControlChannel = auth + } + + now := time.Now() + if now.UnixMilli()-self.lastPing.UnixMilli() > 1_000 { + self.lastPing = now + if err := self.ControlChannel.Ping(200, now); err != nil { + debug.Printf("Update: %s\n", err.Error()) + return nil, err + } + } + + if self.udpTunnel.RequiresAuth() { + if 5_000 < now.UnixMilli()-self.lastUdpAuth.UnixMilli() { + self.lastUdpAuth = now + if err := self.ControlChannel.SetupUdpChannel(9_000); err != nil { + debug.Printf("Update: %s\n", err.Error()) + return nil, err + } + } + } else if self.udpTunnel.RequireResend() { + if 1_000 < now.UnixMilli()-self.lastUdpAuth.UnixMilli() { + self.lastUdpAuth = now + if _, err := self.udpTunnel.ResendToken(); err != nil { + return nil, err + } + } + } + + timeTillExpire := max(self.ControlChannel.GetExpireAt().UnixMilli(), now.UnixMilli()) - now.UnixMilli() + if 10_000 < now.UnixMilli()-self.lastKeepAlive.UnixMilli() && timeTillExpire < 30_000 { + self.lastKeepAlive = now + if err := self.ControlChannel.SendKeepAlive(100); err != nil { + return nil, err + } else if err := self.ControlChannel.SendSetupUdpChannel(1); err != nil { + return nil, err + } + } + + for range 80 { + feed, err := self.ControlChannel.RecvFeedMsg() + if err != nil { + if es, is := err.(net.Error); is && !es.Timeout() { + debug.Printf("RecvFeedMsg error: %s\n", err.Error()) + return nil, err + } + continue + } + if newClient := feed.NewClient; newClient != nil { + return newClient, nil + } else if msg := feed.Response; msg != nil { + if content := msg.Content; content != nil { + if details := content.UdpChannelDetails; details != nil { + if err := self.udpTunnel.SetUdpTunnel(details); err != nil { + debug.Printf("Control Recive Message error: %s\n", err.Error()) + return nil, err + } + return self.Update() + } else if content.Unauthorized { + self.ControlChannel.SetExpired() + } else if pong := content.Pong; pong != nil { + self.lastPong = time.Now() + if pong.ClientAddr.Compare(self.ControlChannel.Conn.Pong.ClientAddr) != 0 { + debug.Println("client ip changed", pong.ClientAddr.String(), self.ControlChannel.Conn.Pong.ClientAddr.String()) + } + } + } + } + } + if self.lastPong.UnixMilli() != 0 && time.Now().UnixMilli()-self.lastPong.UnixMilli() > 6_000 { + self.lastPong = *new(time.Time) + self.ControlChannel.SetExpired() + } + return nil, nil +} diff --git a/tunnel/simples_tunnel.go b/tunnel/simples_tunnel.go deleted file mode 100644 index 4420950..0000000 --- a/tunnel/simples_tunnel.go +++ /dev/null @@ -1,213 +0,0 @@ -package tunnel - -import ( - "encoding/json" - "fmt" - "net/netip" - "slices" - "time" - - "sirherobrine23.org/playit-cloud/go-playit/api" -) - -type SimplesTunnel struct { - ApiClaim api.Api - ControlAddr netip.AddrPort - ControlChannel *AuthenticatedControl - UdpTunnel UdpTunnel - LastKeepAlive uint64 - LastPing uint64 - LastPong uint64 - LastUdpAuth uint64 - lastControlTargets []netip.AddrPort -} - -func ControlAddresses(Api api.Api) ([]netip.AddrPort, error) { - controls, err := Api.AgentRoutings(nil) - if err != nil { - return nil, err - } - - addrs := []netip.AddrPort{} - for _, v := range append(controls.Targets6, controls.Targets4...) { - addrs = append(addrs, netip.AddrPortFrom(v, 5525)) - } - return addrs, nil -} - -func (Tun *SimplesTunnel) Setup() error { - if err := AssignUdpTunnel(&Tun.UdpTunnel); err != nil { - return err - } - - addresses, err := ControlAddresses(Tun.ApiClaim) - if err != nil { - return err - } - - setup, err := (&SetupFindSuitableChannel{Address: addresses}).Setup() - if err != nil { - return err - } - - control_channel, err := setup.Authenticate(Tun.ApiClaim) - if err != nil { - return err - } - Tun.ControlAddr = setup.ControlAddr - Tun.ControlChannel = control_channel - return nil -} - -func (Tun *SimplesTunnel) ReloadControlAddr() (bool, error) { - addresses, err := ControlAddresses(Tun.ApiClaim) - if err != nil { - return false, err - } - - if slices.ContainsFunc(Tun.lastControlTargets, func(a netip.AddrPort) bool { - return !slices.ContainsFunc(addresses, func(b netip.AddrPort) bool { - return a.Compare(b) == 0 - }) - }) { - return false, nil - } - setup, err := (&SetupFindSuitableChannel{addresses}).Setup() - if err != nil { - return false, err - } - updated, err := Tun.UpdateControlAddr(*setup) - Tun.lastControlTargets = addresses - return updated, err -} - -func (Tun *SimplesTunnel) UpdateControlAddr(conncted ConnectedControl) (ok bool, err error) { - if conncted.ControlAddr.Compare(Tun.ControlAddr) == 0 { - LogDebug.Println("not required Update control addr") - return - } - - var controlChannel *AuthenticatedControl - controlChannel, err = conncted.Authenticate(Tun.ApiClaim) - if err != nil { - return - } - LogDebug.Printf("Update control address %s to %s\n", Tun.ControlAddr.String(), conncted.ControlAddr.String()) - - Tun.ControlChannel = controlChannel - Tun.ControlAddr = conncted.ControlAddr - Tun.LastPing = 0 - Tun.LastKeepAlive = 0 - Tun.LastUdpAuth = 0 - Tun.UdpTunnel.InvalidateSession() - ok = true - return -} - -func (Tun *SimplesTunnel) Update() (*NewClient, error) { - if Tun.ControlChannel.IsIspired() { - LogDebug.Println("Creating new controller channel...") - if err := Tun.ControlChannel.Authenticate(); err != nil { - LogDebug.Println(err) - time.Sleep(time.Second * 2) - return nil, nil - } - } - - now := uint64(time.Now().UnixMilli()) - if now-Tun.LastPing > 1_000 { - Tun.LastPing = now - if err := Tun.ControlChannel.SendPing(200, time.UnixMilli(int64(now))); err != nil { - LogDebug.Println("failed to send ping") - } - } - - // d, _ := json.MarshalIndent(Tun, "", " ") - // LogDebug.Panicf(string(d)) - - if Tun.UdpTunnel.RequiresAuth() { - if 5_000 < now-Tun.LastUdpAuth { - Tun.LastUdpAuth = now - if err := Tun.ControlChannel.SendSetupUDPChannel(9000); err != nil { - LogDebug.Println("failed to send udp setup request to control") - LogDebug.Println(err) - } - } - } else if Tun.UdpTunnel.RequireResend() { - if 1_000 < now-Tun.LastUdpAuth { - Tun.LastUdpAuth = now - if _, err := Tun.UdpTunnel.ResendToken(); err != nil { - LogDebug.Println("failed to send udp auth request") - LogDebug.Println(err) - } - } - - timeTillExpire := func(x, y uint64) uint64 { - if x > y { - return y - } - return x - }(uint64(Tun.ControlChannel.Registered.ExpiresAt.UnixMilli()), uint64(now)) - if 10_000 < now-Tun.LastKeepAlive && timeTillExpire < 30_000 { - Tun.LastKeepAlive = now - LogDebug.Println("send KeepAlive") - if err := Tun.ControlChannel.SendKeepAlive(100); err != nil { - LogDebug.Println("failed to send KeepAlive") - LogDebug.Println(err) - } - if err := Tun.ControlChannel.SendSetupUDPChannel(1); err != nil { - LogDebug.Println("failed to send setup udp channel request") - LogDebug.Println(err) - } - } - - timeout := 0 - for range 30 { - if timeout >= 10 { - LogDebug.Println("feed recv timeout") - break - } - LogDebug.Println("RX Feed message") - men, err := Tun.ControlChannel.RecFeedMsg() - if err != nil { - timeout++ - LogDebug.Printf("failed to parse response: %s\n", err.Error()) - continue - } - - if men.NewClient != nil { - return men.NewClient, nil - } else if men.Response != nil { - cont := men.Response.Content - if cont.UdpChannelDetails != nil { - LogDebug.Print("Response SetUdpTunnel") - if err := Tun.UdpTunnel.SetUdpTunnel(*men.Response.Content.UdpChannelDetails); err != nil { - timeout++ - LogDebug.Print(err) - } - } else if cont.Pong != nil { - Tun.LastPong = uint64(time.Now().UnixMilli()) - if cont.Pong.ClientAddr.Compare(Tun.ControlChannel.Conn.Pong.ClientAddr.AddrPort) != 0 { - LogDebug.Printf("Client IP changed: %q -> %q\n", cont.Pong.ClientAddr, Tun.ControlChannel.Conn.Pong.ClientAddr.AddrPort) - } - } else if cont.Unauthorized { - LogDebug.Panicln("unauthorized, check token or reload agent") - Tun.ControlChannel.ForceEpired = true - return nil, fmt.Errorf("unauthorized, check token or reload agent") - } else { - LogDebug.Printf("got response") - d , _ := json.MarshalIndent(men, "", " ") - LogDebug.Printf(string(d)) - } - } - } - } - - if Tun.LastPong != 0 && uint64(time.Now().UnixMilli())-Tun.LastPong > 6_000 { - LogDebug.Println("timeout waiting for pong") - Tun.LastPong = 0 - Tun.ControlChannel.ForceEpired = true - } - - return nil, nil -} diff --git a/tunnel/tcp_tunnel.go b/tunnel/tcp_tunnel.go index 167f67b..a12341b 100644 --- a/tunnel/tcp_tunnel.go +++ b/tunnel/tcp_tunnel.go @@ -1,25 +1,38 @@ package tunnel -import "net" +import ( + "fmt" + "net" + + "sirherobrine23.org/playit-cloud/go-playit/proto" +) type TcpTunnel struct { - ClaimInstructions ClaimInstructions + ClaimInstruction proto.ClaimInstructions } -func (tcp *TcpTunnel) Connect() (*net.TCPConn, error) { - stream, err := net.DialTCP("tcp", nil, net.TCPAddrFromAddrPort(tcp.ClaimInstructions.Address.AddrPort)) + +func (tcpTunnel *TcpTunnel) Connect() (*net.TCPConn, error) { + debug.Printf("Conecting to %q\n", tcpTunnel.ClaimInstruction.Address.String()) + conn, err := net.DialTCP("tcp", nil, net.TCPAddrFromAddrPort(tcpTunnel.ClaimInstruction.Address)) if err != nil { - LogDebug.Printf("%q: Failed to establish connection to tunnel server\n", tcp.ClaimInstructions.Address.AddrPort.String()) + if conn != nil { + conn.Close() + } return nil, err } - if _, err := stream.Write(tcp.ClaimInstructions.Token); err != nil { - stream.Close() + _, err = conn.Write(tcpTunnel.ClaimInstruction.Token) + if err != nil { + conn.Close() return nil, err } - res := make([]byte, 8) - if _, err := stream.Read(res); err != nil { - stream.Close() + buff := make([]byte, 8) + size, err := conn.Read(buff) + if err != nil { + conn.Close() return nil, err + } else if size != 8 { + conn.Close() + return nil, fmt.Errorf("invalid response reader size") } - LogDebug.Printf("%+v\n", res) - return stream, nil -} \ No newline at end of file + return conn, nil +} diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 41e1b9a..600896c 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -2,7 +2,8 @@ package tunnel import ( "log" - "os" + + "sirherobrine23.org/playit-cloud/go-playit/logfile" ) -var LogDebug = log.New(os.Stderr, "go-playit.gg: ", log.Ldate|log.Ltime) \ No newline at end of file +var debug = log.New(logfile.DebugFile, "tunnel.playit.gg: ", log.Ldate) \ No newline at end of file diff --git a/tunnel/udp_proto.go b/tunnel/udp_proto.go index 1f48d69..4a6118b 100644 --- a/tunnel/udp_proto.go +++ b/tunnel/udp_proto.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "net/netip" + + "sirherobrine23.org/playit-cloud/go-playit/enc" ) const ( @@ -18,63 +20,51 @@ const ( V6_LEN int = 48 ) -type UdpFlowBase struct { - Src, Dst netip.AddrPort -} - type UdpFlow struct { - V4 *UdpFlowBase - V6 *struct { - UdpFlowBase - Flow uint32 - } + IPSrc, IPDst netip.AddrPort + Flow uint32 } func (w *UdpFlow) Len() int { - if w.V4 == nil { - return V6_LEN + if w.IPSrc.Addr().Is4() { + return V4_LEN } - return V4_LEN + return V6_LEN } func (w *UdpFlow) Src() netip.AddrPort { - if w.V4 == nil { - return w.V6.UdpFlowBase.Src - } - return w.V4.Src + return w.IPSrc } func (w *UdpFlow) Dst() netip.AddrPort { - if w.V4 == nil { - return w.V6.UdpFlowBase.Dst + return w.IPDst +} + +func (w *UdpFlow) WithSrcPort(port uint16) UdpFlow { + return UdpFlow{ + IPSrc: netip.AddrPortFrom(w.IPSrc.Addr(), port), + IPDst: w.IPSrc, } - return w.V4.Dst } func (w *UdpFlow) WriteTo(writer io.Writer) error { - var conn UdpFlowBase - if w.V4 != nil { - conn = *w.V4 - } else { - conn = w.V6.UdpFlowBase - } - if err := WriteData(writer, conn.Src.Addr().AsSlice()); err != nil { + if err := enc.WriteBytes(writer, w.IPSrc.Addr().AsSlice()); err != nil { return err - } else if err := WriteData(writer, conn.Dst.Addr().AsSlice()); err != nil { + } else if err := enc.WriteBytes(writer, w.IPDst.Addr().AsSlice()); err != nil { return err - } else if err := WriteU16(writer, conn.Src.Port()); err != nil { + } else if err := enc.WriteU16(writer, w.IPSrc.Port()); err != nil { return err - } else if err := WriteU16(writer, conn.Dst.Port()); err != nil { + } else if err := enc.WriteU16(writer, w.IPDst.Port()); err != nil { return err } - if w.V4 != nil { - if err := WriteU64(writer, REDIRECT_FLOW_4_FOOTER_ID_OLD); err != nil { + if w.IPSrc.Addr().Is6() { + if err := enc.WriteU32(writer, w.Flow); err != nil { + return err + } else if err := enc.WriteU64(writer, REDIRECT_FLOW_6_FOOTER_ID); err != nil { return err } } else { - if err := WriteU32(writer, w.V6.Flow); err != nil { - return err - } else if err := WriteU64(writer, REDIRECT_FLOW_6_FOOTER_ID); err != nil { + if err := enc.WriteU64(writer, REDIRECT_FLOW_4_FOOTER_ID_OLD); err != nil { return err } } @@ -82,53 +72,59 @@ func (w *UdpFlow) WriteTo(writer io.Writer) error { return nil } -func FromTailUdpFlow(slice []byte) (*UdpFlow, uint64, error) { +func FromTailUdpFlow(slice []byte) (UdpFlow, uint64, error) { + debug.Printf("FromTailUdpFlow: Avaible bytes: %+v\n", slice) if len(slice) < 8 { - return nil, 0, fmt.Errorf("not space to footer") + return UdpFlow{}, 0, fmt.Errorf("not space to footer") } - footer := binary.BigEndian.Uint64(slice[len(slice)-8:]) - switch footer { - case REDIRECT_FLOW_4_FOOTER_ID | REDIRECT_FLOW_4_FOOTER_ID_OLD: + footer := binary.BigEndian.Uint64(slice[(len(slice)-8):]) + debug.Printf("FromTailUdpFlow: Footer %d, bytes: %+v\n", footer, slice[(len(slice)-8):]) + if footer == REDIRECT_FLOW_4_FOOTER_ID || footer == REDIRECT_FLOW_4_FOOTER_ID_OLD || footer == (REDIRECT_FLOW_4_FOOTER_ID | REDIRECT_FLOW_4_FOOTER_ID_OLD) { if len(slice) < V4_LEN { - return nil, 0, fmt.Errorf("v4 not have space") + return UdpFlow{}, 0, fmt.Errorf("v4 not have space") } - slice = slice[len(slice)-V4_LEN:] - src_ip, _ := ReadBuffN(bytes.NewReader(slice), 4) - srcIP, _ := netip.AddrFromSlice(src_ip) - dst_ip, _ := ReadBuffN(bytes.NewReader(slice), 4) - dstIP, _ := netip.AddrFromSlice(dst_ip) - src_port, dst_port := ReadU16(bytes.NewReader(slice)), ReadU16(bytes.NewReader(slice)) + debug.Printf("FromTailUdpFlow: bytes v4: %+v\n", slice[len(slice)-V4_LEN:]) + reader := bytes.NewReader(slice[len(slice)-V4_LEN:]) - return &UdpFlow{ - V4: &UdpFlowBase{ - Src: netip.AddrPortFrom(srcIP, src_port), - Dst: netip.AddrPortFrom(dstIP, dst_port), - }, - }, 0, nil - case REDIRECT_FLOW_6_FOOTER_ID: + var err error + var src_ip, dst_ip []byte + if src_ip, err = enc.ReadByteN(reader, 4); err != nil { + return UdpFlow{}, 0, err + } else if dst_ip, err = enc.ReadByteN(reader, 4); err != nil { + return UdpFlow{}, 0, err + } + src_port, dst_port := enc.ReadU16(reader), enc.ReadU16(reader) + srcIP := netip.AddrFrom4([4]byte(src_ip)) + dstIP := netip.AddrFrom4([4]byte(dst_ip)) + + var point UdpFlow + point.IPSrc = netip.AddrPortFrom(srcIP, src_port) + point.IPDst = netip.AddrPortFrom(dstIP, dst_port) + return point, 0, nil + } else if footer == REDIRECT_FLOW_6_FOOTER_ID { if len(slice) < V6_LEN { - return nil, footer, fmt.Errorf("v6 not have space") + return UdpFlow{}, footer, fmt.Errorf("v6 not have space") } - slice = slice[len(slice)-V6_LEN:] - src_ip, _ := ReadBuffN(bytes.NewReader(slice), 16) - srcIP, _ := netip.AddrFromSlice(src_ip) - dst_ip, _ := ReadBuffN(bytes.NewReader(slice), 16) - dstIP, _ := netip.AddrFromSlice(dst_ip) - src_port, dst_port := ReadU16(bytes.NewReader(slice)), ReadU16(bytes.NewReader(slice)) - flow := ReadU32(bytes.NewReader(slice)) + debug.Printf("FromTailUdpFlow: bytes v4: %+v\n", slice[len(slice)-V6_LEN:]) + reader := bytes.NewReader(slice[len(slice)-V6_LEN:]) - return &UdpFlow{ - V6: &struct { - UdpFlowBase - Flow uint32 - }{ - UdpFlowBase{ - Src: netip.AddrPortFrom(srcIP, src_port), - Dst: netip.AddrPortFrom(dstIP, dst_port), - }, - flow, - }, - }, 0, nil + var err error + var src_ip, dst_ip []byte + if src_ip, err = enc.ReadByteN(reader, 16); err != nil { + return UdpFlow{}, 0, err + } else if dst_ip, err = enc.ReadByteN(reader, 16); err != nil { + return UdpFlow{}, 0, err + } + src_port, dst_port, flow := enc.ReadU16(reader), enc.ReadU16(reader), enc.ReadU32(reader) + srcIP := netip.AddrFrom16([16]byte(src_ip)) + dstIP := netip.AddrFrom16([16]byte(dst_ip)) + + var point UdpFlow + point.IPSrc = netip.AddrPortFrom(srcIP, src_port) + point.IPDst = netip.AddrPortFrom(dstIP, dst_port) + point.Flow = flow + return point, 0, nil } - return nil, footer, nil + debug.Printf("Cannot reader tail udp flow, bytes: %+v\n", slice) + return UdpFlow{}, footer, fmt.Errorf("read fotter") } diff --git a/tunnel/udp_tunnel.go b/tunnel/udp_tunnel.go index 95fc657..7d4ae2d 100644 --- a/tunnel/udp_tunnel.go +++ b/tunnel/udp_tunnel.go @@ -7,61 +7,57 @@ import ( "net" "net/netip" "slices" - "sync/atomic" "time" - "sirherobrine23.org/playit-cloud/go-playit/tunnel/rwlock" + "sirherobrine23.org/playit-cloud/go-playit/logfile" + "sirherobrine23.org/playit-cloud/go-playit/proto" ) type UdpTunnel struct { Udp4 *net.UDPConn Udp6 *net.UDPConn - Details rwlock.Rwlock[ChannelDetails] - LastConfirm atomic.Uint32 - LastSend atomic.Uint32 + Details ChannelDetails + LastConfirm uint32 + LastSend uint32 } type ChannelDetails struct { - Udp *UdpChannelDetails + Udp *proto.UdpChannelDetails AddrHistory []netip.AddrPort } func AssignUdpTunnel(tunUdp *UdpTunnel) error { - LogDebug.Println("Assign UDP Tunnel IPv4") + // LogDebug.Println("Assign UDP Tunnel IPv4") udp4, err := net.ListenUDP("udp4", nil) if err != nil { return err } tunUdp.Udp4 = udp4 // IPv6 opcional - LogDebug.Println("Assign UDP Tunnel IPv6") + // LogDebug.Println("Assign UDP Tunnel IPv6") if tunUdp.Udp6, err = net.ListenUDP("udp6", nil); err != nil { - LogDebug.Println("Cannot listen IPv6 Udp Tunnel") + // LogDebug.Println("Cannot listen IPv6 Udp Tunnel") tunUdp.Udp6 = nil err = nil } - tunUdp.Details = rwlock.Rwlock[ChannelDetails]{Value: ChannelDetails{ + tunUdp.Details = ChannelDetails{ AddrHistory: []netip.AddrPort{}, - Udp: nil, - }} + Udp: nil, + } - tunUdp.LastConfirm = atomic.Uint32{} - tunUdp.LastSend = atomic.Uint32{} - tunUdp.LastConfirm.Store(0) - tunUdp.LastSend.Store(0) + tunUdp.LastConfirm = 0 + tunUdp.LastSend = 0 return nil } func (udp *UdpTunnel) IsSetup() bool { - data, unlock := udp.Details.Read() - defer unlock() - return data.Udp != nil + return udp.Details.Udp != nil } func (udp *UdpTunnel) InvalidateSession() { - udp.LastConfirm.Store(0) - udp.LastSend.Store(0) + udp.LastConfirm = 0 + udp.LastSend = 0 } func now_sec() uint32 { @@ -69,44 +65,43 @@ func now_sec() uint32 { } func (udp *UdpTunnel) RequireResend() bool { - last_confirm := udp.LastConfirm.Load() + last_confirm := udp.LastConfirm /* send token every 10 seconds */ return 10 < now_sec()-last_confirm } func (udp *UdpTunnel) RequiresAuth() bool { - lastConf, lastSend := udp.LastConfirm.Load(), udp.LastSend.Load() + lastConf, lastSend := udp.LastConfirm, udp.LastSend if lastSend < lastConf { return false } return 5 < now_sec()-lastSend } -func (udp *UdpTunnel) SetUdpTunnel(details UdpChannelDetails) error { - LogDebug.Println("Updating Udp Tunnel") - lock, unlock := udp.Details.Write() - - if lock.Udp != nil { - current := lock.Udp - if bytes.Equal(current.Token, details.Token) && current.TunnelAddr.Compare(details.TunnelAddr.AddrPort) == 0 { - unlock() +func (udp *UdpTunnel) SetUdpTunnel(details *proto.UdpChannelDetails) error { + // LogDebug.Println("Updating Udp Tunnel") + // udp.locker.Lock() + if current := udp.Details.Udp; current != nil { + if bytes.Equal(current.Token, details.Token) && current.TunnelAddr.Compare(details.TunnelAddr) == 0 { + // udp.locker.Unlock() return nil } - if current.TunnelAddr.Compare(details.TunnelAddr.AddrPort) != 0 { - LogDebug.Println("changed udp tunner addr") + if current.TunnelAddr.Compare(details.TunnelAddr) != 0 { + // LogDebug.Println("changed udp tunner addr") oldAddr := current.TunnelAddr - lock.AddrHistory = append(lock.AddrHistory, oldAddr.AddrPort) + udp.Details.AddrHistory = append(udp.Details.AddrHistory, oldAddr) } - lock.Udp = &details } + udp.Details.Udp = new(proto.UdpChannelDetails) + udp.Details.Udp.Token = details.Token + udp.Details.Udp.TunnelAddr = details.TunnelAddr + // udp.locker.Unlock() - unlock() - return udp.SendToken(&details) + return udp.SendToken(details) } func (udp *UdpTunnel) ResendToken() (bool, error) { - lock, unlock := udp.Details.Read() - defer unlock() + lock := udp.Details if lock.Udp == nil { return false, nil } else if err := udp.SendToken(lock.Udp); err != nil { @@ -115,33 +110,41 @@ func (udp *UdpTunnel) ResendToken() (bool, error) { return true, nil } -func (udp *UdpTunnel) SendToken(details *UdpChannelDetails) error { +func (udp *UdpTunnel) SendToken(details *proto.UdpChannelDetails) error { + // udp.locker.RLock() + // defer udp.locker.RUnlock() if details.TunnelAddr.Addr().Is4() { - udp.Udp4.WriteToUDPAddrPort(details.Token, details.TunnelAddr.AddrPort) + if _, err := udp.Udp4.WriteToUDPAddrPort(details.Token, details.TunnelAddr); err != nil { + return err + } } else { if udp.Udp6 == nil { return fmt.Errorf("ipv6 not supported") } - udp.Udp6.WriteToUDPAddrPort(details.Token, details.TunnelAddr.AddrPort) + if _, err := udp.Udp6.WriteToUDPAddrPort(details.Token, details.TunnelAddr); err != nil { + return err + } } - LogDebug.Printf("send udp session token (len=%d) to %s\n", len(details.Token), details.TunnelAddr.AddrPort.String()) - udp.LastSend.Store(now_sec()) + // LogDebug.Printf("send udp session token (len=%d) to %s\n", len(details.Token), details.TunnelAddr.AddrPort.String()) + udp.LastSend = now_sec() return nil } func (udp *UdpTunnel) GetSock() (*net.UDPConn, *netip.AddrPort, error) { - lock, unlock := udp.Details.Read() - defer unlock() + // udp.locker.RLock() + // defer udp.locker.RUnlock() + + lock := udp.Details if lock.Udp == nil { - LogDebug.Println("udp tunnel not connected") + // LogDebug.Println("udp tunnel not connected") return nil, nil, fmt.Errorf("udp tunnel not connected") } else if lock.Udp.TunnelAddr.Addr().Is4() { - return udp.Udp4, &lock.Udp.TunnelAddr.AddrPort, nil + return udp.Udp4, &lock.Udp.TunnelAddr, nil } else if udp.Udp6 == nil { - LogDebug.Println("ipv6 not setup") + // LogDebug.Println("ipv6 not setup") return nil, nil, fmt.Errorf("ipv6 not setup") } - return udp.Udp6, &lock.Udp.TunnelAddr.AddrPort, nil + return udp.Udp6, &lock.Udp.TunnelAddr, nil } func (Udp *UdpTunnel) Send(data []byte, Flow UdpFlow) (int, error) { @@ -159,67 +162,68 @@ func (Udp *UdpTunnel) Send(data []byte, Flow UdpFlow) (int, error) { } func (Udp *UdpTunnel) GetToken() ([]byte, error) { - lock, unlock := Udp.Details.Read() - defer unlock() + // Udp.locker.RLock() + // defer Udp.locker.RUnlock() + lock := Udp.Details if lock.Udp == nil { return nil, fmt.Errorf("udp tunnel not connected") } return lock.Udp.Token[:], nil } +type UdpTunnelRxPacket struct { + Bytes uint64 + Flow UdpFlow +} type UdpTunnelRx struct { ConfirmerdConnection bool - ReceivedPacket *struct { - Bytes uint64 - Flow UdpFlow - } + ReceivedPacket UdpTunnelRxPacket } -func (Udp *UdpTunnel) ReceiveFrom(buff []byte) (*UdpTunnelRx, error) { +func (Udp *UdpTunnel) ReceiveFrom() ([]byte, *UdpTunnelRx, error) { + buff := make([]byte, 2048) udp, tunnelAddr, err := Udp.GetSock() if err != nil { - return nil, err + return nil, nil, err } + + udp.SetReadDeadline(time.Now().Add(time.Microsecond * 5)) byteSize, remote, err := udp.ReadFromUDPAddrPort(buff) if err != nil { - return nil, err - } - if tunnelAddr.Compare(remote) != 0 { - lock, unlock := Udp.Details.Read() - defer unlock() - if !slices.ContainsFunc(lock.AddrHistory, func(a netip.AddrPort) bool { + return nil, nil, err + } else if tunnelAddr.Compare(remote) != 0 { + if !slices.ContainsFunc(Udp.Details.AddrHistory, func(a netip.AddrPort) bool { return a.Compare(remote) == 0 }) { - return nil, fmt.Errorf("got data from other source") + return nil, nil, fmt.Errorf("got data from other source") } } + buff = buff[:byteSize] token, err := Udp.GetToken() if err != nil { - return nil, err + return nil, nil, err } - LogDebug.Println("Check token") - LogDebug.Println(buff) - LogDebug.Println(token) - LogDebug.Println("end check token") - if bytes.Equal(buff[:byteSize], token) { - LogDebug.Println("udp session confirmed") - Udp.LastConfirm.Store(now_sec()) - return &UdpTunnelRx{ConfirmerdConnection: true}, nil + point := new(UdpTunnelRx) + if bytes.Equal(buff[:], token) { + debug.Println("udp session confirmed") + Udp.LastConfirm = now_sec() + point.ConfirmerdConnection = true + return nil, point, nil } - if len(buff) + V6_LEN < byteSize { - return nil, fmt.Errorf("receive buffer too small") - } - - footer, footerInt, err := FromTailUdpFlow(buff[byteSize:]) + footer, footerInt, err := FromTailUdpFlow(buff[:]) if err != nil { + debug.Printf("UdpTunnel recive error: %s\n", err.Error()) if footerInt == UDP_CHANNEL_ESTABLISH_ID { - actual := hex.EncodeToString(buff[byteSize:]); - expected := hex.EncodeToString(token); - return nil, fmt.Errorf("unexpected UDP establish packet, actual: %s, expected: %s", actual, expected) + actual := hex.EncodeToString(buff) + expected := hex.EncodeToString(token) + return nil, nil, fmt.Errorf("unexpected UDP establish packet, actual: %s, expected: %s", actual, expected) } - return nil, fmt.Errorf("failed to extract udp footer: %s, err: %s", hex.EncodeToString(buff[byteSize:]), err.Error()) + return nil, nil, fmt.Errorf("failed to extract udp footer: %s, err: %s", hex.EncodeToString(buff), err.Error()) } - return &UdpTunnelRx{ReceivedPacket: &struct{Bytes uint64; Flow UdpFlow}{uint64(byteSize) - uint64(footer.Len()), *footer}}, nil + + point.ReceivedPacket = UdpTunnelRxPacket{uint64(byteSize - footer.Len()), footer} + debug.Printf("UdpTunnel packet: %s\n", logfile.JSONString(point)) + return buff[:point.ReceivedPacket.Bytes], point, nil }