playit.gg go implementation #1

Open
Sirherobrine23 wants to merge 6 commits from breaked into main
8 changed files with 176 additions and 126 deletions
Showing only changes of commit 18f85faf89 - Show all commits

View File

@@ -9,7 +9,7 @@ import (
func ReadU8(r io.Reader) uint8 {
var d uint8
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -17,7 +17,7 @@ func ReadU8(r io.Reader) uint8 {
}
func ReadU16(r io.Reader) uint16 {
var d uint16
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -25,7 +25,7 @@ func ReadU16(r io.Reader) uint16 {
}
func ReadU32(r io.Reader) uint32 {
var d uint32
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -33,7 +33,7 @@ func ReadU32(r io.Reader) uint32 {
}
func ReadU64(r io.Reader) uint64 {
var d uint64
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -55,7 +55,7 @@ func WriteU64(w io.Writer, d uint64) error {
func Read8(r io.Reader) int8 {
var d int8
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -63,7 +63,7 @@ func Read8(r io.Reader) int8 {
}
func Read16(r io.Reader) int16 {
var d int16
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -71,7 +71,7 @@ func Read16(r io.Reader) int16 {
}
func Read32(r io.Reader) int32 {
var d int32
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
@@ -79,23 +79,23 @@ func Read32(r io.Reader) int32 {
}
func Read64(r io.Reader) int64 {
var d int64
err := binary.Read(r, binary.BigEndian, d)
err := binary.Read(r, binary.BigEndian, &d)
if err != nil {
panic(err)
}
return d
}
func Write8(w io.Writer, d int8) error {
return binary.Write(w, binary.BigEndian, d)
return binary.Write(w, binary.BigEndian, &d)
}
func Write16(w io.Writer, d int16) error {
return binary.Write(w, binary.BigEndian, d)
return binary.Write(w, binary.BigEndian, &d)
}
func Write32(w io.Writer, d int32) error {
return binary.Write(w, binary.BigEndian, d)
return binary.Write(w, binary.BigEndian, &d)
}
func Write64(w io.Writer, d int64) error {
return binary.Write(w, binary.BigEndian, d)
return binary.Write(w, binary.BigEndian, &d)
}
func ReadByteN(r io.Reader, size int) (buff []byte, err error) {
@@ -109,7 +109,7 @@ func ReadByteN(r io.Reader, size int) (buff []byte, err error) {
return
}
func WriteBytes(w io.Writer, buff []byte) error {
return binary.Write(w, binary.BigEndian, buff)
return binary.Write(w, binary.BigEndian, &buff)
}
func AddrWrite(w io.Writer, addr netip.Addr) error {
@@ -184,14 +184,6 @@ func AddrPortWrite(w io.Writer, addr netip.AddrPort) error {
return nil
}
func WriteOption(w io.Writer, d any, callback func(w io.Writer) (err error)) error {
if d == nil {
return WriteU8(w, 0)
} else if err := WriteU8(w, 1); err != nil {
return err
}
return callback(w)
}
func ReadOption(r io.Reader, callback func(r io.Reader) (err error)) error {
switch ReadU8(r) {
case 0:

View File

@@ -88,7 +88,7 @@ func (Agent *AgentCheckPortMapping) ReadFrom(r io.Reader) error {
type Ping struct {
Now time.Time
CurrentPing *uint64
CurrentPing *uint32
SessionId *AgentSessionId
}
@@ -96,20 +96,36 @@ func (ping *Ping) WriteTo(w io.Writer) error {
if err := enc.WriteU64(w, uint64(ping.Now.UnixMilli())); err != nil {
return err
}
if err := enc.WriteOption(w, ping.CurrentPing, func(w io.Writer) error {
return enc.WriteU64(w, *ping.CurrentPing)
}); err != nil {
return err
if ping.CurrentPing == nil {
if err := enc.WriteU8(w, 0); err != nil {
return err
}
} else {
if err := enc.WriteU8(w, 1); err != nil {
return err
} else if err := enc.WriteU32(w, *ping.CurrentPing); err != nil {
return err
}
}
if err := enc.WriteOption(w, ping.SessionId, ping.SessionId.WriteTo); err != nil {
return err
if ping.SessionId == nil {
if err := enc.WriteU8(w, 0); err != nil {
return err
}
} else {
if err := enc.WriteU8(w, 1); err != nil {
return err
} else if err := ping.SessionId.WriteTo(w); err != nil {
return err
}
}
return nil
}
func (ping *Ping) ReadFrom(r io.Reader) error {
ping.Now = time.UnixMilli(int64(enc.ReadU64(r)))
if err := enc.ReadOption(r, func(r io.Reader) error {
*ping.CurrentPing = enc.ReadU64(r)
*ping.CurrentPing = enc.ReadU32(r)
return nil
}); err != nil {
return err
@@ -342,10 +358,18 @@ func (pong *Pong) WriteTo(w io.Writer) error {
return err
} else if err := enc.AddrPortWrite(w, pong.TunnelAddr); err != nil {
return err
} else if err := enc.WriteOption(w, pong.SessionExpireAt, func(w io.Writer) (err error) {
return enc.Write64(w, pong.SessionExpireAt.UnixMilli())
}); err != nil {
return err
}
if pong.SessionExpireAt == nil {
if err := enc.Write8(w, 0); err != nil {
return err
}
} else {
if err := enc.Write8(w, 1); err != nil {
return err
} else if err := enc.Write64(w, pong.SessionExpireAt.UnixMilli()); err != nil {
return err
}
}
return nil
}
@@ -361,11 +385,11 @@ func (pong *Pong) ReadFrom(r io.Reader) error {
return err
} else if err := enc.ReadOption(r, func(r io.Reader) (err error) {
pong.SessionExpireAt = new(time.Time)
d, err := time.UnixMilli(enc.Read64(r)).MarshalBinary()
if err != nil {
return err
}
return pong.SessionExpireAt.UnmarshalBinary(d)
*pong.SessionExpireAt = time.UnixMilli(enc.Read64(r)) // Fix set SessionExpireAt
// expAt := time.UnixMilli(enc.Read64(r)) // Fix set SessionExpireAt
// fmt.Printf("pong.SessionExpireAt: %s\n", expAt.String())
// pong.SessionExpireAt = &expAt
return nil
}); err != nil {
return err
}

View File

@@ -1,8 +1,9 @@
package proto
import (
"encoding/binary"
"io"
"sirherobrine23.org/playit-cloud/go-playit/enc"
)
type ControlRpcMessage[T MessageEncoding] struct {
@@ -11,7 +12,7 @@ type ControlRpcMessage[T MessageEncoding] struct {
}
func (rpc *ControlRpcMessage[T]) WriteTo(w io.Writer) error {
if err := binary.Write(w, binary.BigEndian, rpc.RequestID); err != nil {
if err := enc.WriteU64(w, rpc.RequestID); err != nil {
return err
} else if err = rpc.Content.WriteTo(w); err != nil {
return err
@@ -19,9 +20,8 @@ func (rpc *ControlRpcMessage[T]) WriteTo(w io.Writer) error {
return nil
}
func (rpc *ControlRpcMessage[T]) ReadFrom(r io.Reader) error {
if err := binary.Read(r, binary.BigEndian, &rpc.RequestID); err != nil {
return err
} else if err = rpc.Content.ReadFrom(r); err != nil {
rpc.RequestID = enc.ReadU64(r)
if err := rpc.Content.ReadFrom(r); err != nil {
return err
}
return nil

View File

@@ -1,6 +1,7 @@
package runner
import (
"fmt"
"io"
"net/netip"
"sync/atomic"
@@ -44,7 +45,6 @@ func (self *TunnelRunner) SetSpecialLan(setUse bool) {
func (self *TunnelRunner) Run() {
end := make(chan error)
tunnel := self.Tunnel
udp := tunnel.UdpTunnel()
go func() {
lastControlUpdate := time.Now().UnixMilli()
for self.KeepRunning.Load() {
@@ -54,7 +54,9 @@ func (self *TunnelRunner) Run() {
if _, err := tunnel.ReloadControlAddr(); err != nil {
}
}
fmt.Println("Reload")
if new_client := tunnel.Update(); new_client != nil {
fmt.Println("New TCP Client")
clients := self.TcpClients
found := self.Lookup.Lookup(new_client.ConnectAddr.Addr(), new_client.ConnectAddr.Port(), api.PortProto("tcp"))
if found == nil {
@@ -90,24 +92,25 @@ func (self *TunnelRunner) Run() {
}
}()
udp_clients := self.UdpClients
go func(){
buffer := make([]byte, 2048)
// had_success := false
for self.KeepRunning.Load() {
rx, err := udp.ReceiveFrom(buffer)
if err != nil {
time.Sleep(time.Second)
continue
}
if rx.ConfirmerdConnection {
continue
}
bytes, flow := rx.ReceivedPacket.Bytes, rx.ReceivedPacket.Flow
if err := udp_clients.ForwardPacket(flow, buffer[:bytes]); err != nil {
panic(err)
}
}
}()
// udp_clients := self.UdpClients
// go func(){
// buffer := make([]byte, 2048)
// // had_success := false
// for self.KeepRunning.Load() {
// rx, err := udp.ReceiveFrom(buffer)
// fmt.Println(rx)
// if err != nil {
// time.Sleep(time.Second)
// continue
// }
// if rx.ConfirmerdConnection {
// continue
// }
// bytes, flow := rx.ReceivedPacket.Bytes, rx.ReceivedPacket.Flow
// if err := udp_clients.ForwardPacket(flow, buffer[:bytes]); err != nil {
// panic(err)
// }
// }
// }()
<-end
}

View File

@@ -17,7 +17,7 @@ type AuthenticatedControl struct {
Registered proto.AgentRegistered
buffer *bytes.Buffer
ForceExpire bool
CurrentPing *uint64
CurrentPing *uint32
}
func (self *AuthenticatedControl) SendKeepAlive(requestId uint64) error {
@@ -81,37 +81,39 @@ func (self *AuthenticatedControl) IntoRequireAuth() ConnectedControl {
}
}
func (self *AuthenticatedControl) Authenticate() error {
func (self *AuthenticatedControl) Authenticate() (AuthenticatedControl, error) {
conn := self.IntoRequireAuth()
var err error
if *self, err = conn.Authenticate(self.Api); err != nil {
return err
}
return nil
return conn.Authenticate(self.Api)
}
func (self *AuthenticatedControl) RecvFeedMsg() (proto.ControlFeed, error) {
buff := make([]byte, 1024)
fmt.Println("RecvFeedMsg")
self.Conn.Udp.SetReadDeadline(*new(time.Time))
// self.Conn.Udp.SetReadDeadline(time.Now().Add(23_000_000_000))
size, remote, err := self.Conn.Udp.ReadFromUDPAddrPort(buff)
self.buffer.Reset()
self.buffer.Write(buff[:size])
fmt.Println("End RecvFeedMsg")
if err != nil {
return proto.ControlFeed{}, err
} else if remote.Compare(self.Conn.ControlAddr) != 0 {
return proto.ControlFeed{}, fmt.Errorf("invalid remote, expected %q got %q", remote.String(), self.Conn.ControlAddr.String())
}
self.buffer.Reset()
self.buffer.Write(buff[:size])
feed := proto.ControlFeed{}
if err := feed.ReadFrom(self.buffer); err != nil {
return proto.ControlFeed{}, err
}
if feed.Response != nil {
res := feed.Response
if res := feed.Response; res != nil {
if registered := res.Content.AgentRegistered; registered != nil {
self.Registered = *registered
}
if pong := res.Content.Pong; pong != nil {
*self.CurrentPing = uint64(time.Now().UnixMilli() - pong.RequestNow.UnixMilli())
} else if pong := res.Content.Pong; pong != nil {
self.CurrentPing = new(uint32)
*self.CurrentPing = uint32(time.Now().UnixMilli() - pong.RequestNow.UnixMilli())
self.LastPong = *pong
if expires_at := pong.SessionExpireAt; expires_at != nil {
self.Registered.ExpiresAt = *expires_at
}
}
}
return feed, nil

View File

@@ -3,6 +3,7 @@ package tunnel
import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"net/netip"
@@ -38,7 +39,7 @@ func (self *SetupFindSuitableChannel) Setup() (ConnectedControl, error) {
}
for range attempts {
buffer := new(bytes.Buffer)
if err = (&proto.ControlRpcMessage[*proto.ControlRequest]{
if err := (&proto.ControlRpcMessage[*proto.ControlRequest]{
RequestID: 1,
Content: &proto.ControlRequest{
Ping: &proto.Ping{
@@ -48,10 +49,12 @@ func (self *SetupFindSuitableChannel) Setup() (ConnectedControl, error) {
},
},
}).WriteTo(buffer); err != nil {
return ConnectedControl{}, err
} else if _, err = socket.WriteTo(buffer.Bytes(), net.UDPAddrFromAddrPort(addr)); err != nil {
return ConnectedControl{}, fmt.Errorf("failed to send initial ping: %s", err.Error())
continue
}
if _, err := socket.WriteTo(buffer.Bytes(), net.UDPAddrFromAddrPort(addr)); err != nil {
break
}
buffer.Reset()
var waits int
if waits = 5; isIPv6 {
@@ -59,29 +62,33 @@ func (self *SetupFindSuitableChannel) Setup() (ConnectedControl, error) {
}
for range waits {
buff := make([]byte, 1024)
socket.SetReadDeadline(time.Now().Add(time.Millisecond * 5))
size, peer, err := socket.ReadFromUDPAddrPort(buff)
socket.SetReadDeadline(time.Now().Add(time.Millisecond * 500))
size, peer, err := socket.ReadFrom(buff)
if err != nil {
continue
} else if peer.Compare(addr) != 0 {
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
}
break
} else if peer.String() != addr.String() {
continue
}
buffer = bytes.NewBuffer(buff[:size])
feed := proto.ControlFeed{}
if err := feed.ReadFrom(buffer); err != nil {
continue
} else if feed.Response != nil {
continue
break
} else if feed.Response == nil {
break
} else if feed.Response.RequestID != 1 {
continue
} else if feed.Response.Content.Pong != nil {
continue
break
} else if feed.Response.Content.Pong == nil {
break
}
return ConnectedControl{addr, *socket, *feed.Response.Content.Pong}, nil
}
}
socket.Close()
}
return ConnectedControl{}, fmt.Errorf("failed to connect")
return ConnectedControl{}, fmt.Errorf("failed to connectans setup initial connection")
}
type ConnectedControl struct {
@@ -111,8 +118,12 @@ func (self *ConnectedControl) Authenticate(Api api.Api) (AuthenticatedControl, e
}
for range 5 {
buff := make([]byte, 1024)
self.Udp.SetReadDeadline(time.Now().Add(time.Millisecond * 5))
size, remote, err := self.Udp.ReadFromUDPAddrPort(buff)
if err != nil {
if at, ok := err.(net.Error); ok && at.Timeout() {
continue
}
break
} else if self.ControlAddr.Compare(remote) != 0 {
continue
@@ -122,8 +133,12 @@ func (self *ConnectedControl) Authenticate(Api api.Api) (AuthenticatedControl, e
var feed proto.ControlFeed
if err := feed.ReadFrom(buffer); err != nil {
continue
} else if response := feed.Response; response != nil {
}
if response := feed.Response; response != nil {
if response.RequestID != 10 {
d,_:=json.MarshalIndent(feed, "", " ")
fmt.Printf("Setup: %s\n", string(d))
continue
}
if content := response.Content; content.RequestQueued {
@@ -134,14 +149,6 @@ func (self *ConnectedControl) Authenticate(Api api.Api) (AuthenticatedControl, e
} else if content.Unauthorized {
return AuthenticatedControl{}, fmt.Errorf("unauthorized")
} else if registered := content.AgentRegistered; registered != nil {
// secret_key,
// api_client: api,
// conn: self,
// last_pong: pong,
// registered,
// buffer,
// current_ping: None,
// force_expired: false,
return AuthenticatedControl{
Api: Api,
Conn: *self,
@@ -155,5 +162,5 @@ func (self *ConnectedControl) Authenticate(Api api.Api) (AuthenticatedControl, e
}
}
}
return AuthenticatedControl{}, fmt.Errorf("failed to connect")
return AuthenticatedControl{}, fmt.Errorf("failed to connect and authenticate")
}

View File

@@ -1,6 +1,8 @@
package tunnel
import (
"encoding/json"
"fmt"
"net/netip"
"slices"
"time"
@@ -15,8 +17,8 @@ func getControlAddresses(api api.Api) ([]netip.AddrPort, error) {
return nil, err
}
addresses := []netip.AddrPort{}
for _, ip6 := range append(routing.Targets6, routing.Targets4...) {
addresses = append(addresses, netip.AddrPortFrom(ip6, 5525))
for _, ipd := range append(routing.Targets6, routing.Targets4...) {
addresses = append(addresses, netip.AddrPortFrom(ipd, 5525))
}
return addresses, nil
}
@@ -108,10 +110,12 @@ func (self *SimpleTunnel) UdpTunnel() UdpTunnel {
func (self *SimpleTunnel) Update() *proto.NewClient {
if self.controlChannel.IsExpired() {
if err := self.controlChannel.Authenticate(); err != nil {
auth, err := self.controlChannel.Authenticate()
if err != nil {
time.Sleep(time.Second * 2)
return nil
}
self.controlChannel = auth
}
now := time.Now()
@@ -137,12 +141,16 @@ func (self *SimpleTunnel) Update() *proto.NewClient {
if err := self.controlChannel.SendKeepAlive(100); err != nil {}
if err := self.controlChannel.SendSetupUdpChannel(1); err != nil {}
}
timeout := 0
for range 30 {
feed, err := self.controlChannel.RecvFeedMsg()
if err != nil {
fmt.Println(err)
continue
} else if newClient := feed.NewClient; newClient != nil {
}
d,_:=json.MarshalIndent(feed, "", " ")
fmt.Printf("SimTunne: %s\n", string(d))
if newClient := feed.NewClient; newClient != nil {
return newClient
} else if msg := feed.Response; msg != nil {
if content := msg.Content; content != nil {
@@ -154,13 +162,12 @@ func (self *SimpleTunnel) Update() *proto.NewClient {
self.controlChannel.SetExpired()
} else if pong := content.Pong; pong != nil {
self.lastPong = time.Now()
if pong.ClientAddr.Compare(self.controlChannel.Conn.Pong.ClientAddr) != 0 {}
if pong.ClientAddr.Compare(self.controlChannel.Conn.Pong.ClientAddr) != 0 {
fmt.Println("client ip changed", pong.ClientAddr.String(), self.controlChannel.Conn.Pong.ClientAddr.String())
}
}
}
}
if timeout++; timeout >= 10 {
break
}
}
if self.lastPong.UnixMilli() != 0 && time.Now().UnixMilli() - self.lastPong.UnixMilli() > 6_000 {
self.lastPong = time.UnixMilli(0)

View File

@@ -17,7 +17,7 @@ import (
type UdpTunnel struct {
Udp4 *net.UDPConn
Udp6 *net.UDPConn
locker sync.Mutex
locker sync.RWMutex
Details ChannelDetails
LastConfirm atomic.Uint32
LastSend atomic.Uint32
@@ -84,20 +84,21 @@ func (udp *UdpTunnel) RequiresAuth() bool {
func (udp *UdpTunnel) SetUdpTunnel(details proto.UdpChannelDetails) error {
// LogDebug.Println("Updating Udp Tunnel")
udp.locker.Lock()
defer udp.locker.Unlock()
if udp.Details.Udp != nil {
current := udp.Details.Udp
if bytes.Equal(current.Token, details.Token) && current.TunnelAddr.Compare(details.TunnelAddr) == 0 {
udp.locker.Unlock()
return nil
}
if current.TunnelAddr.Compare(details.TunnelAddr) != 0 {
// LogDebug.Println("changed udp tunner addr")
oldAddr := current.TunnelAddr
udp.Details.AddrHistory = append(udp.Details.AddrHistory, oldAddr)
{
udp.locker.Lock()
if current := udp.Details.Udp; current != nil {
if bytes.Equal(current.Token, details.Token) && current.TunnelAddr.Compare(details.TunnelAddr) == 0 {
udp.locker.Unlock()
return nil
}
if current.TunnelAddr.Compare(details.TunnelAddr) != 0 {
// LogDebug.Println("changed udp tunner addr")
oldAddr := current.TunnelAddr
udp.Details.AddrHistory = append(udp.Details.AddrHistory, oldAddr)
}
}
udp.Details.Udp = &details
udp.locker.Unlock()
}
return udp.SendToken(&details)
@@ -114,13 +115,19 @@ func (udp *UdpTunnel) ResendToken() (bool, error) {
}
func (udp *UdpTunnel) SendToken(details *proto.UdpChannelDetails) error {
udp.locker.RLock()
defer udp.locker.RUnlock()
if details.TunnelAddr.Addr().Is4() {
udp.Udp4.WriteToUDPAddrPort(details.Token, details.TunnelAddr)
if _, err := udp.Udp4.WriteToUDPAddrPort(details.Token, details.TunnelAddr); err != nil {
return err
}
} else {
if udp.Udp6 == nil {
return fmt.Errorf("ipv6 not supported")
}
udp.Udp6.WriteToUDPAddrPort(details.Token, details.TunnelAddr)
if _, err := udp.Udp6.WriteToUDPAddrPort(details.Token, details.TunnelAddr); err != nil {
return err
}
}
// LogDebug.Printf("send udp session token (len=%d) to %s\n", len(details.Token), details.TunnelAddr.AddrPort.String())
udp.LastSend.Store(now_sec())
@@ -128,6 +135,9 @@ func (udp *UdpTunnel) SendToken(details *proto.UdpChannelDetails) error {
}
func (udp *UdpTunnel) GetSock() (*net.UDPConn, *netip.AddrPort, error) {
udp.locker.RLock()
defer udp.locker.RUnlock()
lock := udp.Details
if lock.Udp == nil {
// LogDebug.Println("udp tunnel not connected")
@@ -156,6 +166,8 @@ func (Udp *UdpTunnel) Send(data []byte, Flow UdpFlow) (int, error) {
}
func (Udp *UdpTunnel) GetToken() ([]byte, error) {
Udp.locker.RLock()
defer Udp.locker.RUnlock()
lock := Udp.Details
if lock.Udp == nil {
return nil, fmt.Errorf("udp tunnel not connected")
@@ -172,6 +184,9 @@ type UdpTunnelRx struct {
}
func (Udp *UdpTunnel) ReceiveFrom(buff []byte) (*UdpTunnelRx, error) {
Udp.locker.RLock()
defer Udp.locker.RUnlock()
udp, tunnelAddr, err := Udp.GetSock()
if err != nil {
return nil, err