Files
go-playit/core/maintened.go

204 lines
5.7 KiB
Go

package core
import (
"encoding/hex"
"errors"
"fmt"
"net"
"slices"
"time"
"sirherobrine23.com.br/go-bds/go-playit/proto"
"sirherobrine23.com.br/go-bds/go-playit/xcode"
)
func (agent *PlayitAgent) recvFeed() (*proto.ControlFeed, error) {
buffer := make([]byte, 2048)
agent.PacketIo.SetReadDeadline(time.Now().Add(time.Millisecond * 500))
n, peer, err := agent.PacketIo.ReadFromUDPAddrPort(buffer)
if err != nil {
return nil, fmt.Errorf("cannot read from udp: %w", err)
} else if !(peer.Port() == agent.ControlAddr.Port() && peer.Addr().As16() == agent.ControlAddr.Addr().As16()) {
return nil, fmt.Errorf("mismatch controll address, got %s expected %s", peer, agent.ControlAddr)
}
feed := new(proto.ControlFeed)
if err = xcode.Unmarshal(buffer[:n], feed); err != nil {
return nil, err
}
return feed, nil
}
// Authenticate to Controller
func (agent *PlayitAgent) setupConnection() (err error) {
res, err := authenticateAgent(agent.Config.Api, agent.PongLatest)
if err != nil {
return
}
bytesAuth, err := hex.DecodeString(res.Key)
if err != nil {
return fmt.Errorf("cannot decode agent key: %w", err)
}
requestID := uint64(time.Now().UnixMilli())
for range 5 {
authSlice, err := xcode.Marshal(proto.ControlRpcMessage[proto.RawSlice]{Request: requestID, Content: bytesAuth})
if err != nil {
return err
} else if _, err = agent.PacketIo.WriteToUDPAddrPort(authSlice, agent.ControlAddr); err != nil {
return err
}
recv:
for range 5 {
feed, err := agent.recvFeed()
if err != nil {
if e, ok := err.(net.Error); ok && e.Timeout() {
continue
}
return err
} else if feed.Response == nil {
continue
} else if feed.Response.Request != requestID {
continue
}
res := feed.Response.Content
switch {
case res.AgentRegistered != nil:
agent.ForceExpired = false
agent.Registered = res.AgentRegistered
agent.PongAtAuth = agent.PongLatest
return nil
case res.Pong != nil:
pong, pongClient, pongTunnel := res.Pong, res.Pong.ClientAddr, res.Pong.TunnelAddr
if !(pong.ClientAddr.Port() == pongClient.Port() && pong.ClientAddr.Addr().As16() == pongClient.Addr().As16()) ||
!(pong.TunnelAddr.Port() == pongTunnel.Port() && pong.TunnelAddr.Addr().As16() == pongTunnel.Addr().As16()) {
return fmt.Errorf("attempting to auth in old flow")
}
agent.PongLatest = *pong
continue
case res.InvalidSignature != nil:
return fmt.Errorf("invalid signature")
case res.Unauthorized != nil:
msg, err := xcode.Marshal(proto.ControlRpcMessage[proto.ControlRequest]{
Request: requestID,
Content: proto.ControlRequest{
Ping: &proto.Ping{
Now: time.Now(),
},
},
})
if err == nil {
agent.PacketIo.WriteToUDPAddrPort(msg, agent.ControlAddr)
}
return fmt.Errorf("unauthorized")
case res.RequestQueued != nil:
<-time.After(time.Second)
break recv
case res.TryAgainLater != nil:
return fmt.Errorf("try again later")
default:
return fmt.Errorf("unknown response")
}
}
}
return fmt.Errorf("fail to authenticate agent")
}
func (agent *PlayitAgent) reloadControlAddrs() error {
addrs, err := getControlAddresses(agent.Config.Api)
if err != nil {
return fmt.Errorf("cannot get control addresses: %w", err)
} else if slices.Equal(agent.LastControlTargets, addrs) {
return nil
} else if err = connectToFirst(agent); err != nil {
return fmt.Errorf("cannot connect to first address: %w", err)
} else if err = agent.setupConnection(); err != nil {
return fmt.Errorf("cannot auth to controller: %w", err)
}
return agent.setupConnection()
}
type TunnelControlEvent struct {
NewClientTcp *proto.NewClient
UdpRxPacket *proto.UdpChannelDetails
}
func (agent *PlayitAgent) update() (*TunnelControlEvent, error) {
if agent.isExpired() != nil {
err := agent.setupConnection()
if err != nil {
return nil, err
}
}
now := time.Now()
if agent.LastPing.Add(time.Millisecond*1_000).Compare(now) < 0 {
agent.LastPing = now
agent.sendPing(200, now)
// if err := ; err != nil {
// fmt.Fprintf(os.Stderr, "cannot send ping: %s\n", err)
// }
}
// let time_till_expire = self.control.get_expire_at().max(now) - now;
timeTillExpire := max(agent.PongAtAuth.SessionExpireAt.Get().UnixMilli(), now.UnixMilli()) - now.UnixMilli()
var interval uint64
if interval = 60_000; timeTillExpire < 30_000 {
interval = 10_000
}
// interval < now - self.last_keep_alive
if agent.LastKeepAlive.Add(time.Millisecond*time.Duration(interval)).Compare(now) < 0 {
agent.LastKeepAlive = now
agent.sendKeepAlive(100)
// if err := ; err != nil {
// fmt.Fprintf(os.Stderr, "cannot send keep alive: %s\n", err)
// }
}
timeouts := 0
for range 50 {
msg, err := agent.recvFeed()
if err != nil {
var timeout net.Error
if errors.As(err, &timeout) && timeout.Timeout() {
if timeouts++; timeouts >= 10 {
return nil, nil
}
continue
}
return nil, err
}
switch {
case msg.NewClient != nil:
return &TunnelControlEvent{NewClientTcp: msg.NewClient}, nil
case msg.NewClientOld != nil:
return &TunnelControlEvent{NewClientTcp: msg.NewClientOld.ToNew()}, nil
case msg.Response != nil:
switch {
case msg.Response.Content.UdpChannelDetails != nil:
return &TunnelControlEvent{UdpRxPacket: msg.Response.Content.UdpChannelDetails}, nil
case msg.Response.Content.Unauthorized != nil:
agent.ForceExpired = true
return nil, fmt.Errorf("unauthorized")
case msg.Response.Content.Pong != nil:
agent.LastPong = time.Now()
// pong := msg.Response.Content.Pong
// if pong.ClientAddr.Port() != agent.PongLatest.ClientAddr.Port() || pong.ClientAddr.Addr().As16() != agent.PongLatest.ClientAddr.Addr().As16() {
// }
}
}
}
return nil, nil
}