playit.gg go implementation #1

Open
Sirherobrine23 wants to merge 6 commits from breaked into main
41 changed files with 2377 additions and 1803 deletions
Showing only changes of commit 37207e9678 - Show all commits

1
.gitignore vendored
View File

@ -22,3 +22,4 @@
go.work
main.go
*.log

View File

@ -1,9 +1,14 @@
package api
import (
"log"
"os"
"runtime"
)
var NullFile, _ = os.Open(os.DevNull)
var debug = log.New(NullFile, "api.playit.gg: ", log.Ldate)
const (
GoPlayitVersion string = "0.17.1"
PlayitAPI string = "https://api.playit.gg" // Playit API
@ -58,11 +63,13 @@ type Api struct {
}
type PortProto string
func (proto PortProto) IsValid() bool {
switch proto {
case PortProto(PortTypeBoth):
case PortProto(PortTypeTcp):
case PortProto(PortTypeUdp): return true
case PortProto(PortTypeUdp):
return true
}
return false
}
@ -77,15 +84,23 @@ func (proto PortProto) SetUdp() {
}
type Platform string
func (Platform Platform) Host() {
switch runtime.GOOS {
case "linux": Platform = "linux";
case "freebsd": Platform = "freebsd";
case "windows": Platform = "windows";
case "darwin": Platform = "macos";
case "android": Platform = "android";
case "ios": Platform = "ios";
default: Platform = "unknown";
case "linux":
Platform = "linux"
case "freebsd":
Platform = "freebsd"
case "windows":
Platform = "windows"
case "darwin":
Platform = "macos"
case "android":
Platform = "android"
case "ios":
Platform = "ios"
default:
Platform = "unknown"
}
}
func (Platform Platform) Linux() {
@ -111,4 +126,4 @@ func (Platform Platform) MinecraftPlugin() {
}
func (Platform Platform) Unknown() {
Platform = "unknown"
}
}

View File

@ -17,6 +17,18 @@ func recodeJson(from, to any) error {
return nil
}
func prettyJSON(from string) string {
var data any
if err := json.Unmarshal([]byte(from), &data); err != nil {
return from
}
marData, err := json.MarshalIndent(data, "", " ")
if err != nil {
return from
}
return string(marData)
}
func (w *Api) requestToApi(Path string, Body io.Reader, Response any, Headers map[string]string) (*http.Response, error) {
req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", PlayitAPI, Path), Body)
if err != nil {
@ -38,16 +50,24 @@ func (w *Api) requestToApi(Path string, Body io.Reader, Response any, Headers ma
return nil, err
}
defer res.Body.Close()
data, err := io.ReadAll(res.Body)
if err != nil {
return res, err
}
debug.Printf("(%q %d): %s\n", Path, res.StatusCode, prettyJSON(string(data)))
var ResBody struct {
Error string `json:"error"`
Status string `json:"status"`
Data any `json:"data"`
}
if err = json.NewDecoder(res.Body).Decode(&ResBody); err != nil {
if err = json.Unmarshal(data, &ResBody); err != nil {
return res, err
}
if res.StatusCode >= 300 {
defer res.Body.Close()
if ResBody.Error != "" {
return res, fmt.Errorf("api.playit.gg: %s", ResBody.Error)
}
var errStatus struct {
Type string `json:"type"`
Message string `json:"message"`

19
logfile/log.go Normal file
View File

@ -0,0 +1,19 @@
package logfile
import (
"encoding/json"
"os"
)
var DebugFile = func() *os.File {
file, err := os.Create("./debug.log")
if err != nil {
panic(err)
}
return file
}()
func JSONString(data any) string {
d, _ := json.Marshal(data)
return string(d)
}

View File

@ -73,5 +73,5 @@ func UdpSocket(SpecialLan bool, Peer, Host netip.AddrPort) (*net.UDPConn, error)
}
return stream, err
}
return net.DialUDP("udp", nil, net.UDPAddrFromAddrPort(Host))
return net.ListenUDP("udp", nil)
}

9
network/network.go Normal file
View File

@ -0,0 +1,9 @@
package network
import (
"log"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
var debug = log.New(logfile.DebugFile, "network.playit.gg: ", log.Ldate)

View File

@ -89,10 +89,10 @@ func (tcp *TcpClients) Connect(newClient proto.NewClient) (*TcpClient, error) {
return nil, err
}
return &TcpClient{*stream, *droppe}, nil
return &TcpClient{stream, droppe}, nil
}
type TcpClient struct {
Stream net.TCPConn
Dropper Dropper
Stream *net.TCPConn
Dropper *Dropper
}

View File

@ -2,11 +2,9 @@ package network
import (
"fmt"
"log"
"net"
"net/netip"
"reflect"
"sync"
"sync/atomic"
"time"
@ -18,11 +16,10 @@ type UdpClient struct {
clientKey ClientKey
sendFlow tunnel.UdpFlow
udpTunnel tunnel.UdpTunnel
localUdp net.UDPConn
localUdp *net.UDPConn
localStartAddr netip.AddrPort
tunnelFromPort uint16
tunnelToPort uint16
udpClientsLock sync.Mutex
udpClients map[ClientKey]UdpClient
lastActivity atomic.Uint32
}
@ -47,7 +44,7 @@ func (self *HostToTunnelForwarder) Run() {
self.localUdp.SetReadDeadline(time.Now().Add(time.Second * 30))
size, source, err := self.localUdp.ReadFromUDPAddrPort(buffer)
if err != nil {
log.Println(err)
debug.Println(err)
break
} else if source.Addr().Compare(self.localStartAddr.Addr()) != 0 {
// "dropping packet from different unexpected source"
@ -69,12 +66,10 @@ func (self *HostToTunnelForwarder) Run() {
}
}
self.UdpClient.udpClientsLock.Lock()
if _, is := self.UdpClient.udpClients[self.clientKey]; is {
// if !reflect.DeepEqual(v, self) {} else {}
delete(self.UdpClient.udpClients, self.clientKey)
}
self.UdpClient.udpClientsLock.Unlock()
}
type ClientKey struct {
@ -84,7 +79,6 @@ type ClientKey struct {
type UdpClients struct {
udpTunnel tunnel.UdpTunnel
lookup AddressLookup[netip.AddrPort]
udpClientsLocker sync.Mutex
udpClients map[ClientKey]UdpClient
UseSpecialLan bool
}
@ -93,7 +87,6 @@ func NewUdpClients(Tunnel tunnel.UdpTunnel, Lookup AddressLookup[netip.AddrPort]
return UdpClients{
udpTunnel: Tunnel,
lookup: Lookup,
udpClientsLocker: sync.Mutex{},
udpClients: make(map[ClientKey]UdpClient),
UseSpecialLan: true,
}
@ -117,9 +110,6 @@ func (self *UdpClients) ForwardPacket(Flow tunnel.UdpFlow, data []byte) error {
}
}
defer self.udpClientsLocker.Unlock()
self.udpClientsLocker.Lock()
client, err := func() (*UdpClient, error) {
for kkey, client := range self.udpClients {
if reflect.DeepEqual(kkey, key) {
@ -151,7 +141,7 @@ func (self *UdpClients) ForwardPacket(Flow tunnel.UdpFlow, data []byte) error {
client := UdpClient{
clientKey: key,
sendFlow: sendFlow,
localUdp: *usock,
localUdp: usock,
udpTunnel: self.udpTunnel,
localStartAddr: localAddr,
tunnelFromPort: found.FromPort,

View File

@ -6,6 +6,7 @@ import (
"net/netip"
"sirherobrine23.org/playit-cloud/go-playit/enc"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
var (
@ -17,19 +18,24 @@ type ControlFeed struct {
NewClient *NewClient
}
func (Feed *ControlFeed) ReadFrom(r io.Reader) error {
func (Feed *ControlFeed) ReadFrom(r io.Reader) (err error) {
id := enc.ReadU32(r)
if id == 1 {
Feed.Response = new(ControlRpcMessage[*ControlResponse])
Feed.Response.Content = new(ControlResponse)
return Feed.Response.ReadFrom(r)
err = Feed.Response.ReadFrom(r)
debug.Printf("Read Feed (id %d): %s\n", id, logfile.JSONString(Feed))
} else if id == 2 {
Feed.NewClient = &NewClient{}
return Feed.NewClient.ReadFrom(r)
err = Feed.NewClient.ReadFrom(r)
debug.Printf("Read Feed (id %d): %s\n", id, logfile.JSONString(Feed))
} else {
err = ErrFeedRead
}
return ErrFeedRead
return
}
func (Feed *ControlFeed) WriteTo(w io.Writer) error {
defer debug.Printf("Write Feed: %s\n", logfile.JSONString(Feed))
if Feed.Response != nil {
if err := enc.WriteU32(w, 1); err != nil {
return err

View File

@ -8,6 +8,7 @@ import (
"time"
"sirherobrine23.org/playit-cloud/go-playit/enc"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
type ControlRequest struct {
@ -19,53 +20,65 @@ type ControlRequest struct {
}
func (Control *ControlRequest) WriteTo(w io.Writer) error {
defer debug.Printf("Write ControlRequest: %s\n", logfile.JSONString(Control))
if Control.Ping != nil {
if err := enc.WriteU32(w, 6); err != nil {
debug.Printf("Write ControlRequest error: %s\n", err.Error())
return err
}
return Control.Ping.WriteTo(w)
} else if Control.AgentRegister != nil {
if err := enc.WriteU32(w, 2); err != nil {
debug.Printf("Write ControlRequest error: %s\n", err.Error())
return err
}
return Control.AgentRegister.WriteTo(w)
} else if Control.AgentKeepAlive != nil {
if err := enc.WriteU32(w, 3); err != nil {
debug.Printf("Write ControlRequest error: %s\n", err.Error())
return err
}
return Control.AgentKeepAlive.WriteTo(w)
} else if Control.SetupUdpChannel != nil {
if err := enc.WriteU32(w, 4); err != nil {
debug.Printf("Write ControlRequest error: %s\n", err.Error())
return err
}
return Control.SetupUdpChannel.WriteTo(w)
} else if Control.AgentCheckPortMapping != nil {
if err := enc.WriteU32(w, 5); err != nil {
debug.Printf("Write ControlRequest error: %s\n", err.Error())
return err
}
return Control.AgentCheckPortMapping.WriteTo(w)
}
return fmt.Errorf("set ControlRequest")
}
func (Control *ControlRequest) ReadFrom(r io.Reader) error {
func (Control *ControlRequest) ReadFrom(r io.Reader) (err error) {
switch enc.ReadU32(r) {
case 1:
Control.Ping = new(Ping)
return Control.Ping.ReadFrom(r)
err = Control.Ping.ReadFrom(r)
case 2:
Control.AgentRegister = new(AgentRegister)
return Control.AgentRegister.ReadFrom(r)
err = Control.AgentRegister.ReadFrom(r)
case 3:
Control.AgentKeepAlive = new(AgentSessionId)
return Control.AgentKeepAlive.ReadFrom(r)
err = Control.AgentKeepAlive.ReadFrom(r)
case 4:
Control.SetupUdpChannel = new(AgentSessionId)
return Control.SetupUdpChannel.ReadFrom(r)
err = Control.SetupUdpChannel.ReadFrom(r)
case 5:
Control.AgentCheckPortMapping = new(AgentCheckPortMapping)
return Control.AgentCheckPortMapping.ReadFrom(r)
err = Control.AgentCheckPortMapping.ReadFrom(r)
default:
err = fmt.Errorf("invalid ControlRequest id")
}
return fmt.Errorf("invalid ControlRequest id")
debug.Printf("Read ControlRequest: %s\n", logfile.JSONString(Control))
if err != nil {
debug.Printf("Read ControlRequest error: %s\n", err.Error())
}
return
}
type AgentCheckPortMapping struct {
@ -206,6 +219,7 @@ type ControlResponse struct {
}
func (Control *ControlResponse) WriteTo(w io.Writer) error {
defer debug.Printf("Write Feed: %s\n", logfile.JSONString(&Control))
if Control.Pong != nil {
if err := enc.WriteU32(w, 1); err != nil {
return err
@ -237,34 +251,41 @@ func (Control *ControlResponse) WriteTo(w io.Writer) error {
}
return fmt.Errorf("insert any options to write")
}
func (Control *ControlResponse) ReadFrom(r io.Reader) error {
switch enc.ReadU32(r) {
func (Control *ControlResponse) ReadFrom(r io.Reader) (err error) {
code := enc.ReadU32(r)
switch code {
case 1:
Control.Pong = new(Pong)
return Control.Pong.ReadFrom(r)
err = Control.Pong.ReadFrom(r)
case 2:
Control.InvalidSignature = true
return nil
err = nil
case 3:
Control.Unauthorized = true
return nil
err = nil
case 4:
Control.RequestQueued = true
return nil
err = nil
case 5:
Control.TryAgainLater = true
return nil
err = nil
case 6:
Control.AgentRegistered = new(AgentRegistered)
return Control.AgentRegistered.ReadFrom(r)
err = Control.AgentRegistered.ReadFrom(r)
case 7:
Control.AgentPortMapping = new(AgentPortMapping)
return Control.AgentPortMapping.ReadFrom(r)
err = Control.AgentPortMapping.ReadFrom(r)
case 8:
Control.UdpChannelDetails = new(UdpChannelDetails)
return Control.UdpChannelDetails.ReadFrom(r)
err = Control.UdpChannelDetails.ReadFrom(r)
default:
err = fmt.Errorf("invalid ControlResponse id")
}
return fmt.Errorf("invalid ControlResponse id")
debug.Printf("Read ControlResponse (Code %d): %s\n", code, logfile.JSONString(Control))
if err != nil {
debug.Printf("Read ControlResponse (Code %d) error: %s\n", code, err.Error())
}
return
}
type AgentPortMapping struct {

9
proto/proto.go Normal file
View File

@ -0,0 +1,9 @@
package proto
import (
"log"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
var debug = log.New(logfile.DebugFile, "proto.playit.gg: ", log.Ldate)

View File

@ -2,8 +2,10 @@ package proto
import (
"io"
"reflect"
"sirherobrine23.org/playit-cloud/go-playit/enc"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
type ControlRpcMessage[T MessageEncoding] struct {
@ -12,6 +14,8 @@ type ControlRpcMessage[T MessageEncoding] struct {
}
func (rpc *ControlRpcMessage[T]) WriteTo(w io.Writer) error {
defer debug.Printf("Write ControlRpcMessage[%s]: %s\n", reflect.TypeOf(rpc.Content).String(), logfile.JSONString(rpc))
if err := enc.WriteU64(w, rpc.RequestID); err != nil {
return err
} else if err = rpc.Content.WriteTo(w); err != nil {
@ -22,7 +26,9 @@ func (rpc *ControlRpcMessage[T]) WriteTo(w io.Writer) error {
func (rpc *ControlRpcMessage[T]) ReadFrom(r io.Reader) error {
rpc.RequestID = enc.ReadU64(r)
if err := rpc.Content.ReadFrom(r); err != nil {
debug.Printf("Read ControlRpcMessage[%s] error: %s\n", reflect.TypeOf(rpc.Content).String(), err.Error())
return err
}
debug.Printf("Read ControlRpcMessage[%s]: %s\n", reflect.TypeOf(rpc.Content).String(), logfile.JSONString(rpc))
return nil
}

View File

@ -1,9 +0,0 @@
package proto
import (
"log"
"os"
)
// Write log and show in terminal to debug
var logDebug *log.Logger = log.New(os.Stderr, "plait.gg", log.Ltime|log.Ldate)

9
runner/runner.go Normal file
View File

@ -0,0 +1,9 @@
package runner
import (
"log"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
var debug = log.New(logfile.DebugFile, "runner.playit.gg: ", log.Ldate)

View File

@ -1,7 +1,6 @@
package runner
import (
"fmt"
"io"
"net"
"net/netip"
@ -9,6 +8,7 @@ import (
"time"
"sirherobrine23.org/playit-cloud/go-playit/api"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
"sirherobrine23.org/playit-cloud/go-playit/network"
"sirherobrine23.org/playit-cloud/go-playit/tunnel"
)
@ -57,45 +57,53 @@ func (self *TunnelRunner) Run() chan error {
continue
}
}
if new_client := tunnel.Update(); new_client != nil {
fmt.Println("New TCP Client")
var found *network.AddressValue[netip.AddrPort]
if found = self.Lookup.Lookup(new_client.ConnectAddr.Addr(), new_client.ConnectAddr.Port(), api.PortProto("tcp")); found == nil {
fmt.Println("could not find local address for connection")
continue
}
go func() {
var (
tunnel_conn *network.TcpClient
local_conn *net.TCPConn
err error
)
if tunnel_conn, err = self.TcpClients.Connect(*new_client); err != nil {
return
}
defer tunnel_conn.Stream.Close()
defer tunnel_conn.Dropper.Drop()
if local_conn, err = network.TcpSocket(self.TcpClients.UseSpecialLAN, new_client.PeerAddr, netip.AddrPortFrom(found.Value.Addr(), (new_client.ConnectAddr.Port()-found.FromPort)+found.Value.Port())); err != nil {
fmt.Println(err)
return
}
defer local_conn.Close()
done := make(chan struct{})
defer close(done)
go func() {
io.Copy(&tunnel_conn.Stream, local_conn)
done <- struct{}{}
}()
go func() {
io.Copy(local_conn, &tunnel_conn.Stream)
done <- struct{}{}
}()
<-done
<-done
}()
new_client, err := tunnel.Update()
if err != nil {
debug.Printf("Error recived: %s\n", err.Error())
<-time.After(time.Second)
continue
} else if new_client == nil {
<-time.After(time.Second)
continue
}
debug.Println("New TCP Client")
var found *network.AddressValue[netip.AddrPort]
if found = self.Lookup.Lookup(new_client.ConnectAddr.Addr(), new_client.ConnectAddr.Port(), api.PortProto("tcp")); found == nil {
debug.Println("could not find local address for connection")
continue
}
go func() {
var (
tunnel_conn *network.TcpClient
local_conn *net.TCPConn
err error
)
if tunnel_conn, err = self.TcpClients.Connect(*new_client); err != nil {
return
}
if tunnel_conn.Stream != nil {
defer tunnel_conn.Stream.Close()
}
if local_conn, err = network.TcpSocket(self.TcpClients.UseSpecialLAN, new_client.PeerAddr, netip.AddrPortFrom(found.Value.Addr(), (new_client.ConnectAddr.Port()-found.FromPort)+found.Value.Port())); err != nil {
debug.Println(err)
return
}
defer local_conn.Close()
done := make(chan struct{})
defer close(done)
go func() {
io.Copy(tunnel_conn.Stream, local_conn)
done <- struct{}{}
}()
go func() {
io.Copy(local_conn, tunnel_conn.Stream)
done <- struct{}{}
}()
<-done
<-done
}()
}
end <- nil
}()
@ -103,19 +111,19 @@ func (self *TunnelRunner) Run() chan error {
go func() {
udp := tunnel.UdpTunnel()
for self.KeepRunning.Load() {
buffer := make([]byte, 2048)
fmt.Println("udp rec")
rx, err := udp.ReceiveFrom(buffer)
buffer, rx, err := udp.ReceiveFrom()
if err != nil {
fmt.Println(err)
// if et, is := err.(net.Error); is && !et.Timeout() {
debug.Printf("UdpTunnel Error: %s\n", err.Error())
// }
time.Sleep(time.Second)
continue
}
debug.Printf("UdpTunnel: %s\n", logfile.JSONString(rx))
if rx.ConfirmerdConnection {
continue
}
bytes, flow := rx.ReceivedPacket.Bytes, rx.ReceivedPacket.Flow
if err := self.UdpClients.ForwardPacket(flow, buffer[:bytes]); err != nil {
} else if err := self.UdpClients.ForwardPacket(rx.ReceivedPacket.Flow, buffer); err != nil {
debug.Println(err)
panic(err)
}
}

View File

@ -20,29 +20,38 @@ type AuthenticatedControl struct {
CurrentPing *uint32
}
func (self *AuthenticatedControl) SendKeepAlive(requestId uint64) error {
return self.Send(proto.ControlRpcMessage[*proto.ControlRequest]{
requestId,
&proto.ControlRequest{
AgentKeepAlive: &self.Registered.Id,
func (control *AuthenticatedControl) SendKeepAlive(requestID uint64) error {
return control.Send(proto.ControlRpcMessage[*proto.ControlRequest]{
RequestID: requestID,
Content: &proto.ControlRequest{
AgentKeepAlive: &control.Registered.Id,
},
})
}
func (self *AuthenticatedControl) SendSetupUdpChannel(requestId uint64) error {
return self.Send(proto.ControlRpcMessage[*proto.ControlRequest]{
requestId,
&proto.ControlRequest{
RequestID: requestId,
Content: &proto.ControlRequest{
SetupUdpChannel: &self.Registered.Id,
},
})
}
func (self *AuthenticatedControl) SendPing(requestId uint64, Now time.Time) error {
return self.Send(proto.ControlRpcMessage[*proto.ControlRequest]{
requestId,
&proto.ControlRequest{
Ping: &proto.Ping{Now, self.CurrentPing, &self.Registered.Id},
func (control *AuthenticatedControl) SetupUdpChannel(requestID uint64) error {
return control.Send(proto.ControlRpcMessage[*proto.ControlRequest]{
RequestID: requestID,
Content: &proto.ControlRequest{
SetupUdpChannel: &control.Registered.Id,
},
})
}
func (control *AuthenticatedControl) Ping(requestID uint64, Now time.Time) error {
return control.Send(proto.ControlRpcMessage[*proto.ControlRequest]{
RequestID: requestID,
Content: &proto.ControlRequest{
Ping: &proto.Ping{Now: Now, CurrentPing: control.CurrentPing, SessionId: &control.Registered.Id},
},
})
}
@ -76,8 +85,8 @@ func (self *AuthenticatedControl) Send(req proto.ControlRpcMessage[*proto.Contro
func (self *AuthenticatedControl) IntoRequireAuth() ConnectedControl {
return ConnectedControl{
ControlAddr: self.Conn.ControlAddr,
Udp: self.Conn.Udp,
Pong: self.LastPong,
Udp: self.Conn.Udp,
Pong: self.LastPong,
}
}
@ -88,10 +97,13 @@ func (self *AuthenticatedControl) Authenticate() (AuthenticatedControl, error) {
func (self *AuthenticatedControl) RecvFeedMsg() (proto.ControlFeed, error) {
buff := make([]byte, 1024)
// self.Conn.Udp.SetReadDeadline(*new(time.Time))
self.Conn.Udp.SetReadDeadline(*new(time.Time)) // Remove deadline
// self.Conn.Udp.SetReadDeadline(*new(time.Time)) // Remove deadline
self.Conn.Udp.SetReadDeadline(time.Now().Add(time.Microsecond * 5))
size, remote, err := self.Conn.Udp.ReadFromUDPAddrPort(buff)
if err != nil {
if et, is := err.(net.Error); is && !et.Timeout() {
debug.Printf("control reader UDP control: %s", err.Error())
}
return proto.ControlFeed{}, err
} else if remote.Compare(self.Conn.ControlAddr) != 0 {
return proto.ControlFeed{}, fmt.Errorf("invalid remote, expected %q got %q", remote.String(), self.Conn.ControlAddr.String())
@ -100,6 +112,7 @@ func (self *AuthenticatedControl) RecvFeedMsg() (proto.ControlFeed, error) {
self.buffer.Write(buff[:size])
feed := proto.ControlFeed{}
if err := feed.ReadFrom(self.buffer); err != nil {
debug.Printf("control feed reader: %s", err.Error())
return proto.ControlFeed{}, err
}
if res := feed.Response; res != nil {
@ -115,4 +128,4 @@ func (self *AuthenticatedControl) RecvFeedMsg() (proto.ControlFeed, error) {
}
}
return feed, nil
}
}

View File

@ -1,8 +1,7 @@
package tunnel
import (
"encoding/json"
"fmt"
"net"
"net/netip"
"slices"
"time"
@ -26,7 +25,7 @@ func getControlAddresses(api api.Api) ([]netip.AddrPort, error) {
type SimpleTunnel struct {
api api.Api
controlAddr netip.AddrPort
controlChannel AuthenticatedControl
ControlChannel AuthenticatedControl
udpTunnel *UdpTunnel
lastKeepAlive, lastPing, lastPong, lastUdpAuth time.Time
lastControlTargets []netip.AddrPort
@ -60,7 +59,7 @@ func (self *SimpleTunnel) Setup() error {
self.lastControlTargets = addresses
self.controlAddr = setup.ControlAddr
self.controlChannel = controlChannel
self.ControlChannel = controlChannel
self.udpTunnel = udpTunnel
self.lastKeepAlive = time.UnixMilli(0)
self.lastPing = time.UnixMilli(0)
@ -97,7 +96,7 @@ func (self *SimpleTunnel) UpdateControlAddr(connected ConnectedControl) (bool, e
if err != nil {
return false, err
}
self.controlChannel = controlChannel
self.ControlChannel = controlChannel
self.controlAddr = newControlAddr
self.lastPing, self.lastKeepAlive, self.lastUdpAuth = time.UnixMilli(0), time.UnixMilli(0), time.UnixMilli(0)
self.udpTunnel.InvalidateSession()
@ -108,76 +107,85 @@ func (self *SimpleTunnel) UdpTunnel() *UdpTunnel {
return self.udpTunnel
}
func (self *SimpleTunnel) Update() *proto.NewClient {
if self.controlChannel.IsExpired() {
auth, err := self.controlChannel.Authenticate()
func (self *SimpleTunnel) Update() (*proto.NewClient, error) {
if self.ControlChannel.IsExpired() {
auth, err := self.ControlChannel.Authenticate()
if err != nil {
time.Sleep(time.Second * 2)
return nil
return nil, err
}
self.controlChannel = auth
self.ControlChannel = auth
}
now := time.Now()
if now.UnixMilli()-self.lastPing.UnixMilli() > 1_000 {
self.lastPing = now
if err := self.controlChannel.SendPing(200, now); err != nil {
if err := self.ControlChannel.Ping(200, now); err != nil {
debug.Printf("Update: %s\n", err.Error())
return nil, err
}
}
if self.udpTunnel.RequiresAuth() {
if 5_000 < now.UnixMilli()-self.lastUdpAuth.UnixMilli() {
self.lastUdpAuth = now
if err := self.controlChannel.SendSetupUdpChannel(9_000); err != nil {
if err := self.ControlChannel.SetupUdpChannel(9_000); err != nil {
debug.Printf("Update: %s\n", err.Error())
return nil, err
}
}
} else if self.udpTunnel.RequireResend() {
if 1_000 < now.UnixMilli()-self.lastUdpAuth.UnixMilli() {
self.lastUdpAuth = now
if _, err := self.udpTunnel.ResendToken(); err != nil {
return nil, err
}
}
}
timeTillExpire := max(self.controlChannel.GetExpireAt().UnixMilli(), now.UnixMilli()) - now.UnixMilli()
timeTillExpire := max(self.ControlChannel.GetExpireAt().UnixMilli(), now.UnixMilli()) - now.UnixMilli()
if 10_000 < now.UnixMilli()-self.lastKeepAlive.UnixMilli() && timeTillExpire < 30_000 {
self.lastKeepAlive = now
if err := self.controlChannel.SendKeepAlive(100); err != nil {
}
if err := self.controlChannel.SendSetupUdpChannel(1); err != nil {
if err := self.ControlChannel.SendKeepAlive(100); err != nil {
return nil, err
} else if err := self.ControlChannel.SendSetupUdpChannel(1); err != nil {
return nil, err
}
}
for range 30 {
feed, err := self.controlChannel.RecvFeedMsg()
for range 80 {
feed, err := self.ControlChannel.RecvFeedMsg()
if err != nil {
fmt.Printf("Update: %s", err.Error())
if es, is := err.(net.Error); is && !es.Timeout() {
debug.Printf("RecvFeedMsg error: %s\n", err.Error())
return nil, err
}
continue
}
d, _ := json.MarshalIndent(feed, "", " ")
fmt.Println(string(d))
if newClient := feed.NewClient; newClient != nil {
return newClient
return newClient, nil
} else if msg := feed.Response; msg != nil {
if content := msg.Content; content != nil {
if details := content.UdpChannelDetails; details != nil {
if err := self.udpTunnel.SetUdpTunnel(details); err != nil {
panic(err)
debug.Printf("Control Recive Message error: %s\n", err.Error())
return nil, err
}
return self.Update()
} else if content.Unauthorized {
self.controlChannel.SetExpired()
self.ControlChannel.SetExpired()
} else if pong := content.Pong; pong != nil {
self.lastPong = time.Now()
// if pong.ClientAddr.Compare(self.controlChannel.Conn.Pong.ClientAddr) != 0 {
// fmt.Println("client ip changed", pong.ClientAddr.String(), self.controlChannel.Conn.Pong.ClientAddr.String())
// }
if pong.ClientAddr.Compare(self.ControlChannel.Conn.Pong.ClientAddr) != 0 {
debug.Println("client ip changed", pong.ClientAddr.String(), self.ControlChannel.Conn.Pong.ClientAddr.String())
}
}
}
}
}
if self.lastPong.UnixMilli() != 0 && time.Now().UnixMilli()-self.lastPong.UnixMilli() > 6_000 {
self.lastPong = time.UnixMilli(0)
self.controlChannel.SetExpired()
self.lastPong = *new(time.Time)
self.ControlChannel.SetExpired()
}
return nil
return nil, nil
}

View File

@ -12,6 +12,7 @@ type TcpTunnel struct {
}
func (tcpTunnel *TcpTunnel) Connect() (*net.TCPConn, error) {
debug.Printf("Conecting to %q\n", tcpTunnel.ClaimInstruction.Address.String())
conn, err := net.DialTCP("tcp", nil, net.TCPAddrFromAddrPort(tcpTunnel.ClaimInstruction.Address))
if err != nil {
if conn != nil {
@ -34,4 +35,4 @@ func (tcpTunnel *TcpTunnel) Connect() (*net.TCPConn, error) {
return nil, fmt.Errorf("invalid response reader size")
}
return conn, nil
}
}

9
tunnel/tunnel.go Normal file
View File

@ -0,0 +1,9 @@
package tunnel
import (
"log"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
)
var debug = log.New(logfile.DebugFile, "tunnel.playit.gg: ", log.Ldate)

View File

@ -73,15 +73,17 @@ func (w *UdpFlow) WriteTo(writer io.Writer) error {
}
func FromTailUdpFlow(slice []byte) (UdpFlow, uint64, error) {
debug.Printf("FromTailUdpFlow: Avaible bytes: %+v\n", slice)
if len(slice) < 8 {
return UdpFlow{}, 0, fmt.Errorf("not space to footer")
}
footer := binary.BigEndian.Uint64(slice[(len(slice)-8):])
debug.Printf("FromTailUdpFlow: Footer %d, bytes: %+v\n", footer, slice[(len(slice)-8):])
if footer == REDIRECT_FLOW_4_FOOTER_ID || footer == REDIRECT_FLOW_4_FOOTER_ID_OLD || footer == (REDIRECT_FLOW_4_FOOTER_ID | REDIRECT_FLOW_4_FOOTER_ID_OLD) {
if len(slice) < V4_LEN {
return UdpFlow{}, 0, fmt.Errorf("v4 not have space")
}
debug.Printf("FromTailUdpFlow: bytes v4: %+v\n", slice[len(slice)-V4_LEN:])
reader := bytes.NewReader(slice[len(slice)-V4_LEN:])
var err error
@ -103,7 +105,8 @@ func FromTailUdpFlow(slice []byte) (UdpFlow, uint64, error) {
if len(slice) < V6_LEN {
return UdpFlow{}, footer, fmt.Errorf("v6 not have space")
}
reader := bytes.NewReader(slice[len(slice)-V4_LEN:])
debug.Printf("FromTailUdpFlow: bytes v4: %+v\n", slice[len(slice)-V6_LEN:])
reader := bytes.NewReader(slice[len(slice)-V6_LEN:])
var err error
var src_ip, dst_ip []byte
@ -122,5 +125,6 @@ func FromTailUdpFlow(slice []byte) (UdpFlow, uint64, error) {
point.Flow = flow
return point, 0, nil
}
debug.Printf("Cannot reader tail udp flow, bytes: %+v\n", slice)
return UdpFlow{}, footer, fmt.Errorf("read fotter")
}

View File

@ -7,20 +7,18 @@ import (
"net"
"net/netip"
"slices"
"sync"
"sync/atomic"
"time"
"sirherobrine23.org/playit-cloud/go-playit/logfile"
"sirherobrine23.org/playit-cloud/go-playit/proto"
)
type UdpTunnel struct {
Udp4 *net.UDPConn
Udp6 *net.UDPConn
locker sync.RWMutex
Details ChannelDetails
LastConfirm atomic.Uint32
LastSend atomic.Uint32
LastConfirm uint32
LastSend uint32
}
type ChannelDetails struct {
@ -48,10 +46,8 @@ func AssignUdpTunnel(tunUdp *UdpTunnel) error {
Udp: nil,
}
tunUdp.LastConfirm = atomic.Uint32{}
tunUdp.LastSend = atomic.Uint32{}
tunUdp.LastConfirm.Store(0)
tunUdp.LastSend.Store(0)
tunUdp.LastConfirm = 0
tunUdp.LastSend = 0
return nil
}
@ -60,8 +56,8 @@ func (udp *UdpTunnel) IsSetup() bool {
}
func (udp *UdpTunnel) InvalidateSession() {
udp.LastConfirm.Store(0)
udp.LastSend.Store(0)
udp.LastConfirm = 0
udp.LastSend = 0
}
func now_sec() uint32 {
@ -69,13 +65,13 @@ func now_sec() uint32 {
}
func (udp *UdpTunnel) RequireResend() bool {
last_confirm := udp.LastConfirm.Load()
last_confirm := udp.LastConfirm
/* send token every 10 seconds */
return 10 < now_sec()-last_confirm
}
func (udp *UdpTunnel) RequiresAuth() bool {
lastConf, lastSend := udp.LastConfirm.Load(), udp.LastSend.Load()
lastConf, lastSend := udp.LastConfirm, udp.LastSend
if lastSend < lastConf {
return false
}
@ -130,7 +126,7 @@ func (udp *UdpTunnel) SendToken(details *proto.UdpChannelDetails) error {
}
}
// LogDebug.Printf("send udp session token (len=%d) to %s\n", len(details.Token), details.TunnelAddr.AddrPort.String())
udp.LastSend.Store(now_sec())
udp.LastSend = now_sec()
return nil
}
@ -184,56 +180,50 @@ type UdpTunnelRx struct {
ReceivedPacket UdpTunnelRxPacket
}
func (Udp *UdpTunnel) ReceiveFrom(buff []byte) (*UdpTunnelRx, error) {
// Udp.locker.RLock()
// defer Udp.locker.RUnlock()
func (Udp *UdpTunnel) ReceiveFrom() ([]byte, *UdpTunnelRx, error) {
buff := make([]byte, 2048)
udp, tunnelAddr, err := Udp.GetSock()
if err != nil {
return nil, err
return nil, nil, err
}
// udp.SetReadDeadline(time.Now().Add(time.Second * 2))
udp.SetReadDeadline(time.Now().Add(time.Microsecond * 5))
byteSize, remote, err := udp.ReadFromUDPAddrPort(buff)
if err != nil {
return nil, err
}
if tunnelAddr.Compare(remote) != 0 {
lock := Udp.Details
if !slices.ContainsFunc(lock.AddrHistory, func(a netip.AddrPort) bool {
return nil, nil, err
} else if tunnelAddr.Compare(remote) != 0 {
if !slices.ContainsFunc(Udp.Details.AddrHistory, func(a netip.AddrPort) bool {
return a.Compare(remote) == 0
}) {
return nil, fmt.Errorf("got data from other source")
return nil, nil, fmt.Errorf("got data from other source")
}
}
buff = buff[:byteSize]
token, err := Udp.GetToken()
if err != nil {
return nil, err
return nil, nil, err
}
var point UdpTunnelRx
if bytes.Equal(buff, token) {
// LogDebug.Println("udp session confirmed")
Udp.LastConfirm.Store(now_sec())
point := new(UdpTunnelRx)
if bytes.Equal(buff[:], token) {
debug.Println("udp session confirmed")
Udp.LastConfirm = now_sec()
point.ConfirmerdConnection = true
return &point, nil
} else if len(buff)+V6_LEN < byteSize {
return nil, fmt.Errorf("receive buffer too small")
return nil, point, nil
}
footer, footerInt, err := FromTailUdpFlow(buff)
footer, footerInt, err := FromTailUdpFlow(buff[:])
if err != nil {
debug.Printf("UdpTunnel recive error: %s\n", err.Error())
if footerInt == UDP_CHANNEL_ESTABLISH_ID {
actual := hex.EncodeToString(buff)
expected := hex.EncodeToString(token)
return nil, fmt.Errorf("unexpected UDP establish packet, actual: %s, expected: %s", actual, expected)
return nil, nil, fmt.Errorf("unexpected UDP establish packet, actual: %s, expected: %s", actual, expected)
}
return nil, fmt.Errorf("failed to extract udp footer: %s, err: %s", hex.EncodeToString(buff), err.Error())
return nil, nil, fmt.Errorf("failed to extract udp footer: %s, err: %s", hex.EncodeToString(buff), err.Error())
}
point.ReceivedPacket = UdpTunnelRxPacket{
uint64(byteSize - footer.Len()),
footer,
}
return &point, nil
point.ReceivedPacket = UdpTunnelRxPacket{uint64(byteSize - footer.Len()), footer}
debug.Printf("UdpTunnel packet: %s\n", logfile.JSONString(point))
return buff[:point.ReceivedPacket.Bytes], point, nil
}