wireguard-go/device/device.go

530 lines
13 KiB
Go
Raw Normal View History

2019-01-02 00:55:51 +00:00
/* SPDX-License-Identifier: MIT
*
* Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved.
*/
2019-03-03 03:04:41 +00:00
package device
import (
"runtime"
"sync"
"sync/atomic"
"time"
2019-05-14 07:09:52 +00:00
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"golang.zx2c4.com/wireguard/conn"
2019-05-14 07:09:52 +00:00
"golang.zx2c4.com/wireguard/ratelimiter"
"golang.zx2c4.com/wireguard/rwcancel"
2019-05-14 07:09:52 +00:00
"golang.zx2c4.com/wireguard/tun"
)
type Device struct {
log *Logger
// synchronized resources (locks acquired in order)
state struct {
// state holds the device's state. It is accessed atomically.
// Use the device.deviceState method to read it.
// If state.mu is (r)locked, state is the current state of the device.
// Without state.mu (r)locked, state is either the current state
// of the device or the intended future state of the device.
// For example, while executing a call to Up, state will be deviceStateUp.
// There is no guarantee that that intended future state of the device
// will become the actual state; Up can fail.
// The device can also change state multiple times between time of check and time of use.
// Unsynchronized uses of state must therefore be advisory/best-effort only.
state uint32 // actually a deviceState, but typed uint32 for conveniene
// stopping blocks until all inputs to Device have been closed.
2018-05-05 04:00:38 +00:00
stopping sync.WaitGroup
// mu protects state changes.
mu sync.Mutex
}
net struct {
stopping sync.WaitGroup
sync.RWMutex
bind conn.Bind // bind interface
netlinkCancel *rwcancel.RWCancel
port uint16 // listening port
fwmark uint32 // mark value (0 = disabled)
}
2018-05-13 21:14:43 +00:00
staticIdentity struct {
sync.RWMutex
privateKey NoisePrivateKey
publicKey NoisePublicKey
}
peers struct {
empty AtomicBool // empty reports whether len(keyMap) == 0
sync.RWMutex // protects keyMap
keyMap map[NoisePublicKey]*Peer
}
// unprotected / "self-synchronising resources"
2018-05-13 21:14:43 +00:00
allowedips AllowedIPs
indexTable IndexTable
cookieChecker CookieChecker
rate struct {
underLoadUntil int64
limiter ratelimiter.Ratelimiter
}
pool struct {
messageBuffers *WaitPool
inboundElements *WaitPool
outboundElements *WaitPool
}
queue struct {
encryption *outboundQueue
decryption *inboundQueue
handshake *handshakeQueue
}
tun struct {
device tun.Device
mtu int32
}
ipcMutex sync.RWMutex
closed chan struct{}
2017-06-01 19:31:30 +00:00
}
// deviceState represents the state of a Device.
// There are four states: new, down, up, closed.
// However, state new should never be observable.
// Transitions:
//
// new -> down -----+
// ↑↓ ↓
// up -> closed
//
type deviceState uint32
//go:generate stringer -type deviceState -trimprefix=deviceState
const (
deviceStateNew deviceState = iota
deviceStateDown
deviceStateUp
deviceStateClosed
)
// deviceState returns device.state.state as a deviceState
// See those docs for how to interpret this value.
func (device *Device) deviceState() deviceState {
return deviceState(atomic.LoadUint32(&device.state.state))
}
// isClosed reports whether the device is closed (or is closing).
// See device.state.state comments for how to interpret this value.
func (device *Device) isClosed() bool {
return device.deviceState() == deviceStateClosed
}
// isUp reports whether the device is up (or is attempting to come up).
// See device.state.state comments for how to interpret this value.
func (device *Device) isUp() bool {
return device.deviceState() == deviceStateUp
}
/* Converts the peer into a "zombie", which remains in the peer map,
* but processes no packets and does not exists in the routing table.
*
* Must hold device.peers.Mutex
*/
func unsafeRemovePeer(device *Device, peer *Peer, key NoisePublicKey) {
// stop routing and processing of packets
2018-05-13 21:14:43 +00:00
device.allowedips.RemoveByPeer(peer)
peer.Stop()
// remove from peer map
delete(device.peers.keyMap, key)
device.peers.empty.Set(len(device.peers.keyMap) == 0)
}
// changeState attempts to change the device state to match want.
func (device *Device) changeState(want deviceState) {
device.state.mu.Lock()
defer device.state.mu.Unlock()
old := device.deviceState()
if old == deviceStateClosed {
// once closed, always closed
device.log.Verbosef("Interface closed, ignored requested state %s", want)
return
}
switch want {
case old:
device.log.Verbosef("Interface already in state %s", want)
2018-02-04 15:46:24 +00:00
return
case deviceStateUp:
atomic.StoreUint32(&device.state.state, uint32(deviceStateUp))
if ok := device.upLocked(); ok {
2018-02-04 15:46:24 +00:00
break
}
fallthrough // up failed; bring the device all the way back down
case deviceStateDown:
atomic.StoreUint32(&device.state.state, uint32(deviceStateDown))
device.downLocked()
2018-02-04 15:46:24 +00:00
}
device.log.Verbosef("Interface state was %s, requested %s, now %s", old, want, device.deviceState())
}
// upLocked attempts to bring the device up and reports whether it succeeded.
// The caller must hold device.state.mu and is responsible for updating device.state.state.
func (device *Device) upLocked() bool {
if err := device.BindUpdate(); err != nil {
device.log.Errorf("Unable to update bind: %v", err)
return false
}
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Start()
if atomic.LoadUint32(&peer.persistentKeepaliveInterval) > 0 {
peer.SendKeepalive()
}
}
device.peers.RUnlock()
return true
2017-12-29 16:42:09 +00:00
}
// downLocked attempts to bring the device down.
// The caller must hold device.state.mu and is responsible for updating device.state.state.
func (device *Device) downLocked() {
err := device.BindClose()
if err != nil {
device.log.Errorf("Bind close failed: %v", err)
}
2017-12-29 16:42:09 +00:00
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Stop()
}
device.peers.RUnlock()
}
func (device *Device) Up() {
device.changeState(deviceStateUp)
}
func (device *Device) Down() {
device.changeState(deviceStateDown)
2017-12-29 16:42:09 +00:00
}
func (device *Device) IsUnderLoad() bool {
// check if currently under load
now := time.Now()
underLoad := len(device.queue.handshake.c) >= UnderLoadQueueSize
if underLoad {
atomic.StoreInt64(&device.rate.underLoadUntil, now.Add(UnderLoadAfterTime).UnixNano())
return true
}
// check if recently under load
return atomic.LoadInt64(&device.rate.underLoadUntil) > now.UnixNano()
}
2017-08-04 14:15:53 +00:00
func (device *Device) SetPrivateKey(sk NoisePrivateKey) error {
// lock required resources
device.staticIdentity.Lock()
defer device.staticIdentity.Unlock()
if sk.Equals(device.staticIdentity.privateKey) {
return nil
}
device.peers.Lock()
defer device.peers.Unlock()
2019-08-05 15:46:34 +00:00
lockedPeers := make([]*Peer, 0, len(device.peers.keyMap))
for _, peer := range device.peers.keyMap {
peer.handshake.mutex.RLock()
2019-08-05 15:46:34 +00:00
lockedPeers = append(lockedPeers, peer)
}
2017-06-24 13:34:17 +00:00
// remove peers with matching public keys
2017-08-04 14:15:53 +00:00
publicKey := sk.publicKey()
for key, peer := range device.peers.keyMap {
if peer.handshake.remoteStatic.Equals(publicKey) {
peer.handshake.mutex.RUnlock()
unsafeRemovePeer(device, peer, key)
peer.handshake.mutex.RLock()
2017-08-04 14:15:53 +00:00
}
}
2017-06-24 13:34:17 +00:00
// update key material
2018-05-13 21:14:43 +00:00
device.staticIdentity.privateKey = sk
device.staticIdentity.publicKey = publicKey
device.cookieChecker.Init(publicKey)
2017-06-24 13:34:17 +00:00
// do static-static DH pre-computations
2019-08-05 15:46:34 +00:00
expiredPeers := make([]*Peer, 0, len(device.peers.keyMap))
2020-02-04 17:08:51 +00:00
for _, peer := range device.peers.keyMap {
2018-05-14 10:27:29 +00:00
handshake := &peer.handshake
2020-02-04 17:08:51 +00:00
handshake.precomputedStaticStatic = device.staticIdentity.privateKey.sharedSecret(handshake.remoteStatic)
expiredPeers = append(expiredPeers, peer)
2017-06-23 11:41:59 +00:00
}
2017-08-04 14:15:53 +00:00
2019-08-05 15:46:34 +00:00
for _, peer := range lockedPeers {
peer.handshake.mutex.RUnlock()
}
for _, peer := range expiredPeers {
peer.ExpireCurrentKeypairs()
}
2017-08-04 14:15:53 +00:00
return nil
2017-06-23 11:41:59 +00:00
}
func NewDevice(tunDevice tun.Device, logger *Logger) *Device {
device := new(Device)
device.state.state = uint32(deviceStateDown)
device.closed = make(chan struct{})
2017-11-14 17:26:28 +00:00
device.log = logger
2018-05-23 00:10:54 +00:00
device.tun.device = tunDevice
2018-04-18 14:39:14 +00:00
mtu, err := device.tun.device.MTU()
if err != nil {
device.log.Errorf("Trouble determining MTU, assuming default: %v", err)
2018-04-19 13:52:59 +00:00
mtu = DefaultMTU
2018-04-18 14:39:14 +00:00
}
device.tun.mtu = int32(mtu)
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
device.rate.limiter.Init()
2018-05-13 16:23:40 +00:00
device.indexTable.Init()
2018-09-22 04:29:02 +00:00
device.PopulatePools()
// create queues
device.queue.handshake = newHandshakeQueue()
device.queue.encryption = newOutboundQueue()
device.queue.decryption = newInboundQueue()
2017-07-01 21:29:22 +00:00
2017-11-11 14:43:55 +00:00
// prepare net
device.net.port = 0
device.net.bind = nil
// start workers
2018-05-05 04:00:38 +00:00
cpus := runtime.NumCPU()
device.state.stopping.Wait()
for i := 0; i < cpus; i++ {
go device.RoutineEncryption()
2017-07-01 21:29:22 +00:00
go device.RoutineDecryption()
go device.RoutineHandshake()
}
2017-12-01 22:37:26 +00:00
device.state.stopping.Add(1) // read from TUN
go device.RoutineReadFromTUN()
go device.RoutineTUNEventReader()
2017-12-01 22:37:26 +00:00
return device
2017-06-24 13:34:17 +00:00
}
func (device *Device) LookupPeer(pk NoisePublicKey) *Peer {
device.peers.RLock()
defer device.peers.RUnlock()
return device.peers.keyMap[pk]
2017-06-24 13:34:17 +00:00
}
func (device *Device) RemovePeer(key NoisePublicKey) {
device.peers.Lock()
defer device.peers.Unlock()
// stop peer and remove from routing
peer, ok := device.peers.keyMap[key]
if ok {
unsafeRemovePeer(device, peer, key)
}
2017-06-01 19:31:30 +00:00
}
2017-06-24 13:34:17 +00:00
func (device *Device) RemoveAllPeers() {
device.peers.Lock()
defer device.peers.Unlock()
for key, peer := range device.peers.keyMap {
unsafeRemovePeer(device, peer, key)
2017-06-01 19:31:30 +00:00
}
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
}
func (device *Device) Close() {
device.state.mu.Lock()
defer device.state.mu.Unlock()
if device.isClosed() {
return
}
atomic.StoreUint32(&device.state.state, uint32(deviceStateClosed))
device.log.Verbosef("Device closing")
2017-11-11 22:26:44 +00:00
device.tun.device.Close()
device.downLocked()
// Remove peers before closing queues,
// because peers assume that queues are active.
device.RemoveAllPeers()
// We kept a reference to the encryption and decryption queues,
// in case we started any new peers that might write to them.
// No new peers are coming; we are done with these queues.
device: use channel close to shut down and drain encryption channel The new test introduced in this commit used to deadlock about 1% of the time. I believe that the deadlock occurs as follows: * The test completes, calling device.Close. * device.Close closes device.signals.stop. * RoutineEncryption stops. * The deferred function in RoutineEncryption drains device.queue.encryption. * RoutineEncryption exits. * A peer's RoutineNonce processes an element queued in peer.queue.nonce. * RoutineNonce puts that element into the outbound and encryption queues. * RoutineSequentialSender reads that elements from the outbound queue. * It waits for that element to get Unlocked by RoutineEncryption. * RoutineEncryption has already exited, so RoutineSequentialSender blocks forever. * device.RemoveAllPeers calls peer.Stop on all peers. * peer.Stop waits for peer.routines.stopping, which blocks forever. Rather than attempt to add even more ordering to the already complex centralized shutdown orchestration, this commit moves towards a data-flow-oriented shutdown. The device.queue.encryption gets closed when there will be no more writes to it. All device.queue.encryption readers always read until the channel is closed and then exit. We thus guarantee that any element that enters the encryption queue also exits it. This removes the need for central control of the lifetime of RoutineEncryption, removes the need to drain the encryption queue on shutdown, and simplifies RoutineEncryption. This commit also fixes a data race. When RoutineSequentialSender drains its queue on shutdown, it needs to lock the elem before operating on it, just as the main body does. The new test in this commit passed 50k iterations with the race detector enabled and 150k iterations with the race detector disabled, with no failures. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
2020-12-14 23:07:23 +00:00
device.queue.encryption.wg.Done()
device.queue.decryption.wg.Done()
device.queue.handshake.wg.Done()
device.state.stopping.Wait()
2018-02-11 21:53:39 +00:00
device.rate.limiter.Close()
device.log.Verbosef("Device closed")
close(device.closed)
}
2017-12-01 22:37:26 +00:00
func (device *Device) Wait() chan struct{} {
return device.closed
}
func (device *Device) SendKeepalivesToPeersWithCurrentKeypair() {
if !device.isUp() {
return
}
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.keypairs.RLock()
sendKeepalive := peer.keypairs.current != nil && !peer.keypairs.current.created.Add(RejectAfterTime).Before(time.Now())
peer.keypairs.RUnlock()
if sendKeepalive {
peer.SendKeepalive()
}
}
device.peers.RUnlock()
}
func unsafeCloseBind(device *Device) error {
var err error
netc := &device.net
if netc.netlinkCancel != nil {
netc.netlinkCancel.Cancel()
}
if netc.bind != nil {
err = netc.bind.Close()
netc.bind = nil
}
netc.stopping.Wait()
return err
}
func (device *Device) Bind() conn.Bind {
device.net.Lock()
defer device.net.Unlock()
return device.net.bind
}
func (device *Device) BindSetMark(mark uint32) error {
device.net.Lock()
defer device.net.Unlock()
// check if modified
if device.net.fwmark == mark {
return nil
}
// update fwmark on existing bind
device.net.fwmark = mark
if device.isUp() && device.net.bind != nil {
if err := device.net.bind.SetMark(mark); err != nil {
return err
}
}
// clear cached source addresses
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Lock()
defer peer.Unlock()
if peer.endpoint != nil {
peer.endpoint.ClearSrc()
}
}
device.peers.RUnlock()
return nil
}
func (device *Device) BindUpdate() error {
device.net.Lock()
defer device.net.Unlock()
// close existing sockets
if err := unsafeCloseBind(device); err != nil {
return err
}
// open new sockets
if !device.isUp() {
return nil
}
// bind to new port
var err error
netc := &device.net
netc.bind, netc.port, err = conn.CreateBind(netc.port)
if err != nil {
netc.bind = nil
netc.port = 0
return err
}
netc.netlinkCancel, err = device.startRouteListener(netc.bind)
if err != nil {
netc.bind.Close()
netc.bind = nil
netc.port = 0
return err
}
// set fwmark
if netc.fwmark != 0 {
err = netc.bind.SetMark(netc.fwmark)
if err != nil {
return err
}
}
// clear cached source addresses
device.peers.RLock()
for _, peer := range device.peers.keyMap {
peer.Lock()
defer peer.Unlock()
if peer.endpoint != nil {
peer.endpoint.ClearSrc()
}
}
device.peers.RUnlock()
// start receiving routines
device.net.stopping.Add(2)
device.queue.decryption.wg.Add(2) // each RoutineReceiveIncoming goroutine writes to device.queue.decryption
device.queue.handshake.wg.Add(2) // each RoutineReceiveIncoming goroutine writes to device.queue.handshake
go device.RoutineReceiveIncoming(ipv4.Version, netc.bind)
go device.RoutineReceiveIncoming(ipv6.Version, netc.bind)
device.log.Verbosef("UDP bind has been updated")
return nil
}
func (device *Device) BindClose() error {
device.net.Lock()
err := unsafeCloseBind(device)
device.net.Unlock()
return err
}