mirror of
https://git.zx2c4.com/wireguard-go
synced 2024-11-10 16:59:17 +00:00
542e565baa
Only bother updating the rxBytes counter once we've processed a whole vector, since additions are atomic. Signed-off-by: Jason A. Donenfeld <Jason@zx2c4.com>
541 lines
13 KiB
Go
541 lines
13 KiB
Go
/* SPDX-License-Identifier: MIT
|
|
*
|
|
* Copyright (C) 2017-2023 WireGuard LLC. All Rights Reserved.
|
|
*/
|
|
|
|
package device
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/crypto/chacha20poly1305"
|
|
"golang.org/x/net/ipv4"
|
|
"golang.org/x/net/ipv6"
|
|
"golang.zx2c4.com/wireguard/conn"
|
|
)
|
|
|
|
type QueueHandshakeElement struct {
|
|
msgType uint32
|
|
packet []byte
|
|
endpoint conn.Endpoint
|
|
buffer *[MaxMessageSize]byte
|
|
}
|
|
|
|
type QueueInboundElement struct {
|
|
buffer *[MaxMessageSize]byte
|
|
packet []byte
|
|
counter uint64
|
|
keypair *Keypair
|
|
endpoint conn.Endpoint
|
|
}
|
|
|
|
type QueueInboundElementsContainer struct {
|
|
sync.Mutex
|
|
elems []*QueueInboundElement
|
|
}
|
|
|
|
// clearPointers clears elem fields that contain pointers.
|
|
// This makes the garbage collector's life easier and
|
|
// avoids accidentally keeping other objects around unnecessarily.
|
|
// It also reduces the possible collateral damage from use-after-free bugs.
|
|
func (elem *QueueInboundElement) clearPointers() {
|
|
elem.buffer = nil
|
|
elem.packet = nil
|
|
elem.keypair = nil
|
|
elem.endpoint = nil
|
|
}
|
|
|
|
/* Called when a new authenticated message has been received
|
|
*
|
|
* NOTE: Not thread safe, but called by sequential receiver!
|
|
*/
|
|
func (peer *Peer) keepKeyFreshReceiving() {
|
|
if peer.timers.sentLastMinuteHandshake.Load() {
|
|
return
|
|
}
|
|
keypair := peer.keypairs.Current()
|
|
if keypair != nil && keypair.isInitiator && time.Since(keypair.created) > (RejectAfterTime-KeepaliveTimeout-RekeyTimeout) {
|
|
peer.timers.sentLastMinuteHandshake.Store(true)
|
|
peer.SendHandshakeInitiation(false)
|
|
}
|
|
}
|
|
|
|
/* Receives incoming datagrams for the device
|
|
*
|
|
* Every time the bind is updated a new routine is started for
|
|
* IPv4 and IPv6 (separately)
|
|
*/
|
|
func (device *Device) RoutineReceiveIncoming(maxBatchSize int, recv conn.ReceiveFunc) {
|
|
recvName := recv.PrettyName()
|
|
defer func() {
|
|
device.log.Verbosef("Routine: receive incoming %s - stopped", recvName)
|
|
device.queue.decryption.wg.Done()
|
|
device.queue.handshake.wg.Done()
|
|
device.net.stopping.Done()
|
|
}()
|
|
|
|
device.log.Verbosef("Routine: receive incoming %s - started", recvName)
|
|
|
|
// receive datagrams until conn is closed
|
|
|
|
var (
|
|
bufsArrs = make([]*[MaxMessageSize]byte, maxBatchSize)
|
|
bufs = make([][]byte, maxBatchSize)
|
|
err error
|
|
sizes = make([]int, maxBatchSize)
|
|
count int
|
|
endpoints = make([]conn.Endpoint, maxBatchSize)
|
|
deathSpiral int
|
|
elemsByPeer = make(map[*Peer]*QueueInboundElementsContainer, maxBatchSize)
|
|
)
|
|
|
|
for i := range bufsArrs {
|
|
bufsArrs[i] = device.GetMessageBuffer()
|
|
bufs[i] = bufsArrs[i][:]
|
|
}
|
|
|
|
defer func() {
|
|
for i := 0; i < maxBatchSize; i++ {
|
|
if bufsArrs[i] != nil {
|
|
device.PutMessageBuffer(bufsArrs[i])
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
count, err = recv(bufs, sizes, endpoints)
|
|
if err != nil {
|
|
if errors.Is(err, net.ErrClosed) {
|
|
return
|
|
}
|
|
device.log.Verbosef("Failed to receive %s packet: %v", recvName, err)
|
|
if neterr, ok := err.(net.Error); ok && !neterr.Temporary() {
|
|
return
|
|
}
|
|
if deathSpiral < 10 {
|
|
deathSpiral++
|
|
time.Sleep(time.Second / 3)
|
|
continue
|
|
}
|
|
return
|
|
}
|
|
deathSpiral = 0
|
|
|
|
// handle each packet in the batch
|
|
for i, size := range sizes[:count] {
|
|
if size < MinMessageSize {
|
|
continue
|
|
}
|
|
|
|
// check size of packet
|
|
|
|
packet := bufsArrs[i][:size]
|
|
msgType := binary.LittleEndian.Uint32(packet[:4])
|
|
|
|
switch msgType {
|
|
|
|
// check if transport
|
|
|
|
case MessageTransportType:
|
|
|
|
// check size
|
|
|
|
if len(packet) < MessageTransportSize {
|
|
continue
|
|
}
|
|
|
|
// lookup key pair
|
|
|
|
receiver := binary.LittleEndian.Uint32(
|
|
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
|
|
)
|
|
value := device.indexTable.Lookup(receiver)
|
|
keypair := value.keypair
|
|
if keypair == nil {
|
|
continue
|
|
}
|
|
|
|
// check keypair expiry
|
|
|
|
if keypair.created.Add(RejectAfterTime).Before(time.Now()) {
|
|
continue
|
|
}
|
|
|
|
// create work element
|
|
peer := value.peer
|
|
elem := device.GetInboundElement()
|
|
elem.packet = packet
|
|
elem.buffer = bufsArrs[i]
|
|
elem.keypair = keypair
|
|
elem.endpoint = endpoints[i]
|
|
elem.counter = 0
|
|
|
|
elemsForPeer, ok := elemsByPeer[peer]
|
|
if !ok {
|
|
elemsForPeer = device.GetInboundElementsContainer()
|
|
elemsForPeer.Lock()
|
|
elemsByPeer[peer] = elemsForPeer
|
|
}
|
|
elemsForPeer.elems = append(elemsForPeer.elems, elem)
|
|
bufsArrs[i] = device.GetMessageBuffer()
|
|
bufs[i] = bufsArrs[i][:]
|
|
continue
|
|
|
|
// otherwise it is a fixed size & handshake related packet
|
|
|
|
case MessageInitiationType:
|
|
if len(packet) != MessageInitiationSize {
|
|
continue
|
|
}
|
|
|
|
case MessageResponseType:
|
|
if len(packet) != MessageResponseSize {
|
|
continue
|
|
}
|
|
|
|
case MessageCookieReplyType:
|
|
if len(packet) != MessageCookieReplySize {
|
|
continue
|
|
}
|
|
|
|
default:
|
|
device.log.Verbosef("Received message with unknown type")
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case device.queue.handshake.c <- QueueHandshakeElement{
|
|
msgType: msgType,
|
|
buffer: bufsArrs[i],
|
|
packet: packet,
|
|
endpoint: endpoints[i],
|
|
}:
|
|
bufsArrs[i] = device.GetMessageBuffer()
|
|
bufs[i] = bufsArrs[i][:]
|
|
default:
|
|
}
|
|
}
|
|
for peer, elemsContainer := range elemsByPeer {
|
|
if peer.isRunning.Load() {
|
|
peer.queue.inbound.c <- elemsContainer
|
|
device.queue.decryption.c <- elemsContainer
|
|
} else {
|
|
for _, elem := range elemsContainer.elems {
|
|
device.PutMessageBuffer(elem.buffer)
|
|
device.PutInboundElement(elem)
|
|
}
|
|
device.PutInboundElementsContainer(elemsContainer)
|
|
}
|
|
delete(elemsByPeer, peer)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (device *Device) RoutineDecryption(id int) {
|
|
var nonce [chacha20poly1305.NonceSize]byte
|
|
|
|
defer device.log.Verbosef("Routine: decryption worker %d - stopped", id)
|
|
device.log.Verbosef("Routine: decryption worker %d - started", id)
|
|
|
|
for elemsContainer := range device.queue.decryption.c {
|
|
for _, elem := range elemsContainer.elems {
|
|
// split message into fields
|
|
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
|
|
content := elem.packet[MessageTransportOffsetContent:]
|
|
|
|
// decrypt and release to consumer
|
|
var err error
|
|
elem.counter = binary.LittleEndian.Uint64(counter)
|
|
// copy counter to nonce
|
|
binary.LittleEndian.PutUint64(nonce[0x4:0xc], elem.counter)
|
|
elem.packet, err = elem.keypair.receive.Open(
|
|
content[:0],
|
|
nonce[:],
|
|
content,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
elem.packet = nil
|
|
}
|
|
}
|
|
elemsContainer.Unlock()
|
|
}
|
|
}
|
|
|
|
/* Handles incoming packets related to handshake
|
|
*/
|
|
func (device *Device) RoutineHandshake(id int) {
|
|
defer func() {
|
|
device.log.Verbosef("Routine: handshake worker %d - stopped", id)
|
|
device.queue.encryption.wg.Done()
|
|
}()
|
|
device.log.Verbosef("Routine: handshake worker %d - started", id)
|
|
|
|
for elem := range device.queue.handshake.c {
|
|
|
|
// handle cookie fields and ratelimiting
|
|
|
|
switch elem.msgType {
|
|
|
|
case MessageCookieReplyType:
|
|
|
|
// unmarshal packet
|
|
|
|
var reply MessageCookieReply
|
|
reader := bytes.NewReader(elem.packet)
|
|
err := binary.Read(reader, binary.LittleEndian, &reply)
|
|
if err != nil {
|
|
device.log.Verbosef("Failed to decode cookie reply")
|
|
goto skip
|
|
}
|
|
|
|
// lookup peer from index
|
|
|
|
entry := device.indexTable.Lookup(reply.Receiver)
|
|
|
|
if entry.peer == nil {
|
|
goto skip
|
|
}
|
|
|
|
// consume reply
|
|
|
|
if peer := entry.peer; peer.isRunning.Load() {
|
|
device.log.Verbosef("Receiving cookie response from %s", elem.endpoint.DstToString())
|
|
if !peer.cookieGenerator.ConsumeReply(&reply) {
|
|
device.log.Verbosef("Could not decrypt invalid cookie response")
|
|
}
|
|
}
|
|
|
|
goto skip
|
|
|
|
case MessageInitiationType, MessageResponseType:
|
|
|
|
// check mac fields and maybe ratelimit
|
|
|
|
if !device.cookieChecker.CheckMAC1(elem.packet) {
|
|
device.log.Verbosef("Received packet with invalid mac1")
|
|
goto skip
|
|
}
|
|
|
|
// endpoints destination address is the source of the datagram
|
|
|
|
if device.IsUnderLoad() {
|
|
|
|
// verify MAC2 field
|
|
|
|
if !device.cookieChecker.CheckMAC2(elem.packet, elem.endpoint.DstToBytes()) {
|
|
device.SendHandshakeCookie(&elem)
|
|
goto skip
|
|
}
|
|
|
|
// check ratelimiter
|
|
|
|
if !device.rate.limiter.Allow(elem.endpoint.DstIP()) {
|
|
goto skip
|
|
}
|
|
}
|
|
|
|
default:
|
|
device.log.Errorf("Invalid packet ended up in the handshake queue")
|
|
goto skip
|
|
}
|
|
|
|
// handle handshake initiation/response content
|
|
|
|
switch elem.msgType {
|
|
case MessageInitiationType:
|
|
|
|
// unmarshal
|
|
|
|
var msg MessageInitiation
|
|
reader := bytes.NewReader(elem.packet)
|
|
err := binary.Read(reader, binary.LittleEndian, &msg)
|
|
if err != nil {
|
|
device.log.Errorf("Failed to decode initiation message")
|
|
goto skip
|
|
}
|
|
|
|
// consume initiation
|
|
|
|
peer := device.ConsumeMessageInitiation(&msg)
|
|
if peer == nil {
|
|
device.log.Verbosef("Received invalid initiation message from %s", elem.endpoint.DstToString())
|
|
goto skip
|
|
}
|
|
|
|
// update timers
|
|
|
|
peer.timersAnyAuthenticatedPacketTraversal()
|
|
peer.timersAnyAuthenticatedPacketReceived()
|
|
|
|
// update endpoint
|
|
peer.SetEndpointFromPacket(elem.endpoint)
|
|
|
|
device.log.Verbosef("%v - Received handshake initiation", peer)
|
|
peer.rxBytes.Add(uint64(len(elem.packet)))
|
|
|
|
peer.SendHandshakeResponse()
|
|
|
|
case MessageResponseType:
|
|
|
|
// unmarshal
|
|
|
|
var msg MessageResponse
|
|
reader := bytes.NewReader(elem.packet)
|
|
err := binary.Read(reader, binary.LittleEndian, &msg)
|
|
if err != nil {
|
|
device.log.Errorf("Failed to decode response message")
|
|
goto skip
|
|
}
|
|
|
|
// consume response
|
|
|
|
peer := device.ConsumeMessageResponse(&msg)
|
|
if peer == nil {
|
|
device.log.Verbosef("Received invalid response message from %s", elem.endpoint.DstToString())
|
|
goto skip
|
|
}
|
|
|
|
// update endpoint
|
|
peer.SetEndpointFromPacket(elem.endpoint)
|
|
|
|
device.log.Verbosef("%v - Received handshake response", peer)
|
|
peer.rxBytes.Add(uint64(len(elem.packet)))
|
|
|
|
// update timers
|
|
|
|
peer.timersAnyAuthenticatedPacketTraversal()
|
|
peer.timersAnyAuthenticatedPacketReceived()
|
|
|
|
// derive keypair
|
|
|
|
err = peer.BeginSymmetricSession()
|
|
|
|
if err != nil {
|
|
device.log.Errorf("%v - Failed to derive keypair: %v", peer, err)
|
|
goto skip
|
|
}
|
|
|
|
peer.timersSessionDerived()
|
|
peer.timersHandshakeComplete()
|
|
peer.SendKeepalive()
|
|
}
|
|
skip:
|
|
device.PutMessageBuffer(elem.buffer)
|
|
}
|
|
}
|
|
|
|
func (peer *Peer) RoutineSequentialReceiver(maxBatchSize int) {
|
|
device := peer.device
|
|
defer func() {
|
|
device.log.Verbosef("%v - Routine: sequential receiver - stopped", peer)
|
|
peer.stopping.Done()
|
|
}()
|
|
device.log.Verbosef("%v - Routine: sequential receiver - started", peer)
|
|
|
|
bufs := make([][]byte, 0, maxBatchSize)
|
|
|
|
for elemsContainer := range peer.queue.inbound.c {
|
|
if elemsContainer == nil {
|
|
return
|
|
}
|
|
elemsContainer.Lock()
|
|
validTailPacket := -1
|
|
dataPacketReceived := false
|
|
rxBytesLen := uint64(0)
|
|
for i, elem := range elemsContainer.elems {
|
|
if elem.packet == nil {
|
|
// decryption failed
|
|
continue
|
|
}
|
|
|
|
if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) {
|
|
continue
|
|
}
|
|
|
|
validTailPacket = i
|
|
if peer.ReceivedWithKeypair(elem.keypair) {
|
|
peer.SetEndpointFromPacket(elem.endpoint)
|
|
peer.timersHandshakeComplete()
|
|
peer.SendStagedPackets()
|
|
}
|
|
rxBytesLen += uint64(len(elem.packet) + MinMessageSize)
|
|
|
|
if len(elem.packet) == 0 {
|
|
device.log.Verbosef("%v - Receiving keepalive packet", peer)
|
|
continue
|
|
}
|
|
dataPacketReceived = true
|
|
|
|
switch elem.packet[0] >> 4 {
|
|
case 4:
|
|
if len(elem.packet) < ipv4.HeaderLen {
|
|
continue
|
|
}
|
|
field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2]
|
|
length := binary.BigEndian.Uint16(field)
|
|
if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen {
|
|
continue
|
|
}
|
|
elem.packet = elem.packet[:length]
|
|
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
|
|
if device.allowedips.Lookup(src) != peer {
|
|
device.log.Verbosef("IPv4 packet with disallowed source address from %v", peer)
|
|
continue
|
|
}
|
|
|
|
case 6:
|
|
if len(elem.packet) < ipv6.HeaderLen {
|
|
continue
|
|
}
|
|
field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2]
|
|
length := binary.BigEndian.Uint16(field)
|
|
length += ipv6.HeaderLen
|
|
if int(length) > len(elem.packet) {
|
|
continue
|
|
}
|
|
elem.packet = elem.packet[:length]
|
|
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
|
|
if device.allowedips.Lookup(src) != peer {
|
|
device.log.Verbosef("IPv6 packet with disallowed source address from %v", peer)
|
|
continue
|
|
}
|
|
|
|
default:
|
|
device.log.Verbosef("Packet with invalid IP version from %v", peer)
|
|
continue
|
|
}
|
|
|
|
bufs = append(bufs, elem.buffer[:MessageTransportOffsetContent+len(elem.packet)])
|
|
}
|
|
|
|
peer.rxBytes.Add(rxBytesLen)
|
|
if validTailPacket >= 0 {
|
|
peer.SetEndpointFromPacket(elemsContainer.elems[validTailPacket].endpoint)
|
|
peer.keepKeyFreshReceiving()
|
|
peer.timersAnyAuthenticatedPacketTraversal()
|
|
peer.timersAnyAuthenticatedPacketReceived()
|
|
}
|
|
if dataPacketReceived {
|
|
peer.timersDataReceived()
|
|
}
|
|
if len(bufs) > 0 {
|
|
_, err := device.tun.device.Write(bufs, MessageTransportOffsetContent)
|
|
if err != nil && !device.isClosed() {
|
|
device.log.Errorf("Failed to write packets to TUN device: %v", err)
|
|
}
|
|
}
|
|
for _, elem := range elemsContainer.elems {
|
|
device.PutMessageBuffer(elem.buffer)
|
|
device.PutInboundElement(elem)
|
|
}
|
|
bufs = bufs[:0]
|
|
device.PutInboundElementsContainer(elemsContainer)
|
|
}
|
|
}
|