wireguard-go/src/receive.go

609 lines
12 KiB
Go
Raw Normal View History

2017-07-01 21:29:22 +00:00
package main
import (
"bytes"
"encoding/binary"
"golang.org/x/crypto/chacha20poly1305"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
2017-07-01 21:29:22 +00:00
"net"
"sync"
"sync/atomic"
"time"
)
type QueueHandshakeElement struct {
msgType uint32
packet []byte
endpoint Endpoint
buffer *[MaxMessageSize]byte
2017-07-01 21:29:22 +00:00
}
type QueueInboundElement struct {
2017-11-14 15:27:53 +00:00
dropped int32
mutex sync.Mutex
buffer *[MaxMessageSize]byte
packet []byte
counter uint64
keyPair *KeyPair
endpoint Endpoint
2017-07-01 21:29:22 +00:00
}
func (elem *QueueInboundElement) Drop() {
2017-07-08 21:51:26 +00:00
atomic.StoreInt32(&elem.dropped, AtomicTrue)
}
func (elem *QueueInboundElement) IsDropped() bool {
2017-07-08 21:51:26 +00:00
return atomic.LoadInt32(&elem.dropped) == AtomicTrue
}
func (device *Device) addToInboundQueue(
queue chan *QueueInboundElement,
element *QueueInboundElement,
) {
for {
select {
case queue <- element:
return
default:
select {
case old := <-queue:
old.Drop()
default:
}
}
}
2017-07-01 21:29:22 +00:00
}
2017-08-25 12:53:23 +00:00
func (device *Device) addToDecryptionQueue(
queue chan *QueueInboundElement,
element *QueueInboundElement,
) {
for {
select {
case queue <- element:
return
default:
select {
case old := <-queue:
// drop & release to potential consumer
old.Drop()
old.mutex.Unlock()
default:
}
}
}
}
func (device *Device) addToHandshakeQueue(
2017-07-07 11:47:09 +00:00
queue chan QueueHandshakeElement,
element QueueHandshakeElement,
) {
for {
select {
case queue <- element:
return
default:
select {
case elem := <-queue:
device.PutMessageBuffer(elem.buffer)
2017-07-07 11:47:09 +00:00
default:
}
}
}
}
func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
2017-07-01 21:29:22 +00:00
2017-07-08 07:23:10 +00:00
logDebug := device.log.Debug
logDebug.Println("Routine, receive incoming, IP version:", IP)
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// receive datagrams until conn is closed
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
buffer := device.GetMessageBuffer()
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
var (
err error
size int
endpoint Endpoint
)
2017-11-30 23:03:06 +00:00
for {
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// read next datagram
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
switch IP {
case ipv4.Version:
size, endpoint, err = bind.ReceiveIPv4(buffer[:])
case ipv6.Version:
size, endpoint, err = bind.ReceiveIPv6(buffer[:])
default:
return
}
2017-08-04 14:15:53 +00:00
2017-11-30 23:03:06 +00:00
if err != nil {
return
}
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
if size < MinMessageSize {
continue
}
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// check size of packet
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
packet := buffer[:size]
msgType := binary.LittleEndian.Uint32(packet[:4])
2017-08-04 14:15:53 +00:00
2017-11-30 23:03:06 +00:00
var okay bool
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
switch msgType {
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// check if transport
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
case MessageTransportType:
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// check size
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
if len(packet) < MessageTransportType {
continue
}
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// lookup key pair
2017-11-30 23:03:06 +00:00
receiver := binary.LittleEndian.Uint32(
packet[MessageTransportOffsetReceiver:MessageTransportOffsetCounter],
)
value := device.indices.Lookup(receiver)
keyPair := value.keyPair
if keyPair == nil {
continue
}
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// check key-pair expiry
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
if keyPair.created.Add(RejectAfterTime).Before(time.Now()) {
continue
}
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// create work element
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
peer := value.peer
elem := &QueueInboundElement{
packet: packet,
buffer: buffer,
keyPair: keyPair,
dropped: AtomicFalse,
endpoint: endpoint,
}
elem.mutex.Lock()
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// add to decryption queues
2017-11-30 23:03:06 +00:00
device.addToDecryptionQueue(device.queue.decryption, elem)
device.addToInboundQueue(peer.queue.inbound, elem)
buffer = device.GetMessageBuffer()
continue
2017-07-01 21:29:22 +00:00
2017-11-30 23:03:06 +00:00
// otherwise it is a fixed size & handshake related packet
2017-11-30 23:03:06 +00:00
case MessageInitiationType:
okay = len(packet) == MessageInitiationSize
2017-11-30 23:03:06 +00:00
case MessageResponseType:
okay = len(packet) == MessageResponseSize
2017-11-30 23:03:06 +00:00
case MessageCookieReplyType:
okay = len(packet) == MessageCookieReplySize
}
2017-11-30 23:03:06 +00:00
if okay {
device.addToHandshakeQueue(
device.queue.handshake,
QueueHandshakeElement{
msgType: msgType,
buffer: buffer,
packet: packet,
endpoint: endpoint,
},
)
buffer = device.GetMessageBuffer()
}
2017-07-01 21:29:22 +00:00
}
}
func (device *Device) RoutineDecryption() {
2017-07-01 21:29:22 +00:00
var nonce [chacha20poly1305.NonceSize]byte
logDebug := device.log.Debug
logDebug.Println("Routine, decryption, started for device")
2017-07-01 21:29:22 +00:00
for {
select {
case <-device.signal.stop:
logDebug.Println("Routine, decryption worker, stopped")
2017-07-01 21:29:22 +00:00
return
case elem := <-device.queue.decryption:
2017-07-01 21:29:22 +00:00
// check if dropped
2017-07-01 21:29:22 +00:00
if elem.IsDropped() {
continue
}
// split message into fields
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
content := elem.packet[MessageTransportOffsetContent:]
// decrypt and release to consumer
var err error
copy(nonce[4:], counter)
elem.counter = binary.LittleEndian.Uint64(counter)
elem.packet, err = elem.keyPair.receive.Open(
elem.buffer[:0],
nonce[:],
content,
nil,
)
if err != nil {
2017-09-01 12:21:53 +00:00
elem.Drop()
}
elem.mutex.Unlock()
2017-07-01 21:29:22 +00:00
}
}
}
/* Handles incomming packets related to handshake
*/
func (device *Device) RoutineHandshake() {
logInfo := device.log.Info
logError := device.log.Error
logDebug := device.log.Debug
logDebug.Println("Routine, handshake routine, started for device")
2017-07-01 21:29:22 +00:00
2017-08-25 12:53:23 +00:00
var temp [MessageHandshakeSize]byte
2017-07-01 21:29:22 +00:00
var elem QueueHandshakeElement
for {
select {
case elem = <-device.queue.handshake:
case <-device.signal.stop:
return
}
// handle cookie fields and ratelimiting
2017-07-01 21:29:22 +00:00
switch elem.msgType {
2017-07-08 07:23:10 +00:00
case MessageCookieReplyType:
2017-08-14 15:09:25 +00:00
// unmarshal packet
var reply MessageCookieReply
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &reply)
if err != nil {
logDebug.Println("Failed to decode cookie reply")
2017-07-08 07:23:10 +00:00
return
}
2017-08-14 15:09:25 +00:00
// lookup peer and consume response
entry := device.indices.Lookup(reply.Receiver)
if entry.peer == nil {
return
}
entry.peer.mac.ConsumeReply(&reply)
continue
2017-07-08 07:23:10 +00:00
case MessageInitiationType, MessageResponseType:
2017-07-08 07:23:10 +00:00
// check mac fields and ratelimit
2017-07-08 07:23:10 +00:00
if !device.mac.CheckMAC1(elem.packet) {
logDebug.Println("Received packet with invalid mac1")
2017-07-08 07:23:10 +00:00
return
}
// endpoints destination address is the source of the datagram
srcBytes := elem.endpoint.DstToBytes()
if device.IsUnderLoad() {
2017-10-08 20:03:32 +00:00
// verify MAC2 field
if !device.mac.CheckMAC2(elem.packet, srcBytes) {
// construct cookie reply
logDebug.Println(
"Sending cookie reply to:",
elem.endpoint.DstToString(),
)
sender := binary.LittleEndian.Uint32(elem.packet[4:8])
2017-10-08 20:03:32 +00:00
reply, err := device.mac.CreateReply(elem.packet, sender, srcBytes)
if err != nil {
logError.Println("Failed to create cookie reply:", err)
return
}
2017-08-14 15:09:25 +00:00
// marshal and send reply
writer := bytes.NewBuffer(temp[:0])
binary.Write(writer, binary.LittleEndian, reply)
device.net.bind.Send(writer.Bytes(), elem.endpoint)
if err != nil {
logDebug.Println("Failed to send cookie reply:", err)
}
continue
}
2017-10-08 20:03:32 +00:00
// check ratelimiter
if !device.ratelimiter.Allow(elem.endpoint.DstIP()) {
continue
}
}
default:
logError.Println("Invalid packet ended up in the handshake queue")
continue
}
2017-07-08 07:23:10 +00:00
// handle handshake initation/response content
2017-07-01 21:29:22 +00:00
switch elem.msgType {
case MessageInitiationType:
2017-07-01 21:29:22 +00:00
// unmarshal
2017-07-01 21:29:22 +00:00
var msg MessageInitiation
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode initiation message")
continue
}
2017-07-01 21:29:22 +00:00
// consume initiation
2017-07-01 21:29:22 +00:00
peer := device.ConsumeMessageInitiation(&msg)
if peer == nil {
logInfo.Println(
"Recieved invalid initiation message from",
elem.endpoint.DstToString(),
)
continue
}
2017-08-04 14:15:53 +00:00
// update timers
2017-08-04 14:15:53 +00:00
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
2017-07-07 11:47:09 +00:00
// update endpoint
peer.mutex.Lock()
peer.endpoint = elem.endpoint
peer.mutex.Unlock()
// create response
2017-07-07 11:47:09 +00:00
response, err := device.CreateMessageResponse(peer)
if err != nil {
logError.Println("Failed to create response message:", err)
continue
}
2017-07-08 07:23:10 +00:00
peer.TimerEphemeralKeyCreated()
peer.NewKeyPair()
2017-07-27 21:45:37 +00:00
logDebug.Println("Creating response message for", peer.String())
2017-07-10 15:20:43 +00:00
writer := bytes.NewBuffer(temp[:0])
binary.Write(writer, binary.LittleEndian, response)
packet := writer.Bytes()
peer.mac.AddMacs(packet)
2017-07-01 21:29:22 +00:00
// send response
err = peer.SendBuffer(packet)
if err == nil {
2017-08-04 14:15:53 +00:00
peer.TimerAnyAuthenticatedPacketTraversal()
2017-11-11 22:26:44 +00:00
} else {
logError.Println("Failed to send response to:", peer.String(), err)
}
case MessageResponseType:
2017-07-01 21:29:22 +00:00
// unmarshal
2017-07-01 21:29:22 +00:00
var msg MessageResponse
reader := bytes.NewReader(elem.packet)
err := binary.Read(reader, binary.LittleEndian, &msg)
if err != nil {
logError.Println("Failed to decode response message")
continue
}
2017-07-01 21:29:22 +00:00
// consume response
2017-07-01 21:29:22 +00:00
peer := device.ConsumeMessageResponse(&msg)
if peer == nil {
logInfo.Println(
"Recieved invalid response message from",
elem.endpoint.DstToString(),
)
continue
}
2017-07-27 21:45:37 +00:00
2017-11-14 15:27:53 +00:00
// update endpoint
peer.mutex.Lock()
peer.endpoint = elem.endpoint
2017-11-14 15:27:53 +00:00
peer.mutex.Unlock()
logDebug.Println("Received handshake initation from", peer)
peer.TimerEphemeralKeyCreated()
2017-08-04 14:15:53 +00:00
// update timers
2017-08-04 14:15:53 +00:00
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
peer.TimerHandshakeComplete()
2017-08-04 14:15:53 +00:00
// derive key-pair
2017-07-01 21:29:22 +00:00
peer.NewKeyPair()
peer.SendKeepAlive()
}
2017-07-01 21:29:22 +00:00
}
}
func (peer *Peer) RoutineSequentialReceiver() {
device := peer.device
logInfo := device.log.Info
logError := device.log.Error
2017-07-01 21:29:22 +00:00
logDebug := device.log.Debug
logDebug.Println("Routine, sequential receiver, started for peer", peer.id)
for {
select {
2017-11-30 22:22:40 +00:00
case <-peer.signal.stop.Wait():
logDebug.Println("Routine, sequential receiver, stopped for peer", peer.id)
2017-07-01 21:29:22 +00:00
return
case elem := <-peer.queue.inbound:
2017-07-07 11:47:09 +00:00
// wait for decryption
elem.mutex.Lock()
if elem.IsDropped() {
continue
}
2017-07-10 10:09:19 +00:00
// check for replay
2017-07-08 21:51:26 +00:00
if !elem.keyPair.replayFilter.ValidateCounter(elem.counter) {
continue
}
2017-07-08 21:51:26 +00:00
peer.TimerAnyAuthenticatedPacketTraversal()
peer.TimerAnyAuthenticatedPacketReceived()
peer.KeepKeyFreshReceiving()
2017-07-01 21:29:22 +00:00
// check if using new key-pair
kp := &peer.keyPairs
kp.mutex.Lock()
if kp.next == elem.keyPair {
peer.TimerHandshakeComplete()
if kp.previous != nil {
device.DeleteKeyPair(kp.previous)
}
kp.previous = kp.current
kp.current = kp.next
kp.next = nil
2017-09-01 12:21:53 +00:00
}
kp.mutex.Unlock()
2017-07-01 21:29:22 +00:00
2017-11-14 15:27:53 +00:00
// update endpoint
peer.mutex.Lock()
peer.endpoint = elem.endpoint
2017-11-14 15:27:53 +00:00
peer.mutex.Unlock()
// check for keep-alive
2017-07-01 21:29:22 +00:00
if len(elem.packet) == 0 {
logDebug.Println("Received keep-alive from", peer.String())
continue
}
peer.TimerDataReceived()
2017-07-08 07:23:10 +00:00
// verify source and strip padding
2017-07-08 07:23:10 +00:00
switch elem.packet[0] >> 4 {
case ipv4.Version:
2017-07-08 07:23:10 +00:00
// strip padding
2017-07-07 11:47:09 +00:00
if len(elem.packet) < ipv4.HeaderLen {
continue
}
2017-07-08 07:23:10 +00:00
field := elem.packet[IPv4offsetTotalLength : IPv4offsetTotalLength+2]
length := binary.BigEndian.Uint16(field)
if int(length) > len(elem.packet) || int(length) < ipv4.HeaderLen {
continue
}
2017-07-08 07:23:10 +00:00
elem.packet = elem.packet[:length]
2017-07-08 07:23:10 +00:00
// verify IPv4 source
2017-07-08 07:23:10 +00:00
src := elem.packet[IPv4offsetSrc : IPv4offsetSrc+net.IPv4len]
if device.routingTable.LookupIPv4(src) != peer {
logInfo.Println(
"IPv4 packet with unallowed source address from",
peer.String(),
)
continue
}
2017-07-08 07:23:10 +00:00
case ipv6.Version:
2017-07-07 11:47:09 +00:00
// strip padding
2017-07-08 07:23:10 +00:00
if len(elem.packet) < ipv6.HeaderLen {
continue
}
2017-07-08 07:23:10 +00:00
field := elem.packet[IPv6offsetPayloadLength : IPv6offsetPayloadLength+2]
length := binary.BigEndian.Uint16(field)
length += ipv6.HeaderLen
if int(length) > len(elem.packet) {
continue
}
2017-07-08 07:23:10 +00:00
elem.packet = elem.packet[:length]
2017-08-04 14:15:53 +00:00
// verify IPv6 source
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
if device.routingTable.LookupIPv6(src) != peer {
logInfo.Println(
"IPv6 packet with unallowed source address from",
peer.String(),
)
continue
}
2017-07-01 21:29:22 +00:00
default:
logInfo.Println("Packet with invalid IP version from", peer.String())
continue
}
// write to tun device
atomic.AddUint64(&peer.stats.rxBytes, uint64(len(elem.packet)))
_, err := device.tun.device.Write(elem.packet)
device.PutMessageBuffer(elem.buffer)
if err != nil {
logError.Println("Failed to write packet to TUN device:", err)
}
2017-07-01 21:29:22 +00:00
}
}
}