d8dd1f254f
The immediate motivation for this change is an observed deadlock. 1. A goroutine calls peer.Stop. That calls peer.queue.Lock(). 2. Another goroutine is in RoutineSequentialReceiver. It receives an elem from peer.queue.inbound. 3. The peer.Stop goroutine calls close(peer.queue.inbound), close(peer.queue.outbound), and peer.stopping.Wait(). It blocks waiting for RoutineSequentialReceiver and RoutineSequentialSender to exit. 4. The RoutineSequentialReceiver goroutine calls peer.SendStagedPackets(). SendStagedPackets attempts peer.queue.RLock(). That blocks forever because the peer.Stop goroutine holds a write lock on that mutex. A background motivation for this change is that it can be expensive to have a mutex in the hot code path of RoutineSequential*. The mutex was necessary to avoid attempting to send elems on a closed channel. This commit removes that danger by never closing the channel. Instead, we send a sentinel nil value on the channel to indicate to the receiver that it should exit. The only problem with this is that if the receiver exits, we could write an elem into the channel which would never get received. If it never gets received, it cannot get returned to the device pools. To work around this, we use a finalizer. When the channel can be GC'd, the finalizer drains any remaining elements from the channel and restores them to the device pool. After that change, peer.queue.RWMutex no longer makes sense where it is. It is only used to prevent concurrent calls to Start and Stop. Move it to a more sensible location and make it a plain sync.Mutex. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
272 lines
6.5 KiB
Go
272 lines
6.5 KiB
Go
/* SPDX-License-Identifier: MIT
|
|
*
|
|
* Copyright (C) 2017-2021 WireGuard LLC. All Rights Reserved.
|
|
*/
|
|
|
|
package device
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"golang.zx2c4.com/wireguard/conn"
|
|
)
|
|
|
|
type Peer struct {
|
|
isRunning AtomicBool
|
|
sync.RWMutex // Mostly protects endpoint, but is generally taken whenever we modify peer
|
|
keypairs Keypairs
|
|
handshake Handshake
|
|
device *Device
|
|
endpoint conn.Endpoint
|
|
persistentKeepaliveInterval uint32 // accessed atomically
|
|
firstTrieEntry *trieEntry
|
|
stopping sync.WaitGroup // routines pending stop
|
|
|
|
// These fields are accessed with atomic operations, which must be
|
|
// 64-bit aligned even on 32-bit platforms. Go guarantees that an
|
|
// allocated struct will be 64-bit aligned. So we place
|
|
// atomically-accessed fields up front, so that they can share in
|
|
// this alignment before smaller fields throw it off.
|
|
stats struct {
|
|
txBytes uint64 // bytes send to peer (endpoint)
|
|
rxBytes uint64 // bytes received from peer
|
|
lastHandshakeNano int64 // nano seconds since epoch
|
|
}
|
|
|
|
disableRoaming bool
|
|
|
|
timers struct {
|
|
retransmitHandshake *Timer
|
|
sendKeepalive *Timer
|
|
newHandshake *Timer
|
|
zeroKeyMaterial *Timer
|
|
persistentKeepalive *Timer
|
|
handshakeAttempts uint32
|
|
needAnotherKeepalive AtomicBool
|
|
sentLastMinuteHandshake AtomicBool
|
|
}
|
|
|
|
state struct {
|
|
mu sync.Mutex // protects against concurrent Start/Stop
|
|
}
|
|
|
|
queue struct {
|
|
staged chan *QueueOutboundElement // staged packets before a handshake is available
|
|
outbound chan *QueueOutboundElement // sequential ordering of udp transmission
|
|
inbound chan *QueueInboundElement // sequential ordering of tun writing
|
|
}
|
|
|
|
cookieGenerator CookieGenerator
|
|
}
|
|
|
|
func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
|
|
if device.isClosed() {
|
|
return nil, errors.New("device closed")
|
|
}
|
|
|
|
// lock resources
|
|
device.staticIdentity.RLock()
|
|
defer device.staticIdentity.RUnlock()
|
|
|
|
device.peers.Lock()
|
|
defer device.peers.Unlock()
|
|
|
|
// check if over limit
|
|
if len(device.peers.keyMap) >= MaxPeers {
|
|
return nil, errors.New("too many peers")
|
|
}
|
|
|
|
// create peer
|
|
peer := new(Peer)
|
|
peer.Lock()
|
|
defer peer.Unlock()
|
|
|
|
peer.cookieGenerator.Init(pk)
|
|
peer.device = device
|
|
|
|
// map public key
|
|
_, ok := device.peers.keyMap[pk]
|
|
if ok {
|
|
return nil, errors.New("adding existing peer")
|
|
}
|
|
|
|
// pre-compute DH
|
|
handshake := &peer.handshake
|
|
handshake.mutex.Lock()
|
|
handshake.precomputedStaticStatic = device.staticIdentity.privateKey.sharedSecret(pk)
|
|
handshake.remoteStatic = pk
|
|
handshake.mutex.Unlock()
|
|
|
|
// reset endpoint
|
|
peer.endpoint = nil
|
|
|
|
// add
|
|
device.peers.keyMap[pk] = peer
|
|
device.peers.empty.Set(false)
|
|
|
|
// start peer
|
|
peer.timersInit()
|
|
if peer.device.isUp() {
|
|
peer.Start()
|
|
}
|
|
|
|
return peer, nil
|
|
}
|
|
|
|
func (peer *Peer) SendBuffer(buffer []byte) error {
|
|
peer.device.net.RLock()
|
|
defer peer.device.net.RUnlock()
|
|
|
|
if peer.device.net.bind == nil {
|
|
// Packets can leak through to SendBuffer while the device is closing.
|
|
// When that happens, drop them silently to avoid spurious errors.
|
|
if peer.device.isClosed() {
|
|
return nil
|
|
}
|
|
return errors.New("no bind")
|
|
}
|
|
|
|
peer.RLock()
|
|
defer peer.RUnlock()
|
|
|
|
if peer.endpoint == nil {
|
|
return errors.New("no known endpoint for peer")
|
|
}
|
|
|
|
err := peer.device.net.bind.Send(buffer, peer.endpoint)
|
|
if err == nil {
|
|
atomic.AddUint64(&peer.stats.txBytes, uint64(len(buffer)))
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (peer *Peer) String() string {
|
|
base64Key := base64.StdEncoding.EncodeToString(peer.handshake.remoteStatic[:])
|
|
abbreviatedKey := "invalid"
|
|
if len(base64Key) == 44 {
|
|
abbreviatedKey = base64Key[0:4] + "…" + base64Key[39:43]
|
|
}
|
|
return fmt.Sprintf("peer(%s)", abbreviatedKey)
|
|
}
|
|
|
|
func (peer *Peer) Start() {
|
|
// should never start a peer on a closed device
|
|
if peer.device.isClosed() {
|
|
return
|
|
}
|
|
|
|
// prevent simultaneous start/stop operations
|
|
peer.state.mu.Lock()
|
|
defer peer.state.mu.Unlock()
|
|
|
|
if peer.isRunning.Get() {
|
|
return
|
|
}
|
|
|
|
device := peer.device
|
|
device.log.Verbosef("%v - Starting...", peer)
|
|
|
|
// reset routine state
|
|
peer.stopping.Wait()
|
|
peer.stopping.Add(2)
|
|
|
|
peer.handshake.mutex.Lock()
|
|
peer.handshake.lastSentHandshake = time.Now().Add(-(RekeyTimeout + time.Second))
|
|
peer.handshake.mutex.Unlock()
|
|
|
|
// prepare queues
|
|
peer.queue.outbound = newAutodrainingOutboundQueue(device)
|
|
peer.queue.inbound = newAutodrainingInboundQueue(device)
|
|
if peer.queue.staged == nil {
|
|
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
|
|
}
|
|
peer.device.queue.encryption.wg.Add(1) // keep encryption queue open for our writes
|
|
|
|
peer.timersStart()
|
|
|
|
go peer.RoutineSequentialSender()
|
|
go peer.RoutineSequentialReceiver()
|
|
|
|
peer.isRunning.Set(true)
|
|
}
|
|
|
|
func (peer *Peer) ZeroAndFlushAll() {
|
|
device := peer.device
|
|
|
|
// clear key pairs
|
|
|
|
keypairs := &peer.keypairs
|
|
keypairs.Lock()
|
|
device.DeleteKeypair(keypairs.previous)
|
|
device.DeleteKeypair(keypairs.current)
|
|
device.DeleteKeypair(keypairs.loadNext())
|
|
keypairs.previous = nil
|
|
keypairs.current = nil
|
|
keypairs.storeNext(nil)
|
|
keypairs.Unlock()
|
|
|
|
// clear handshake state
|
|
|
|
handshake := &peer.handshake
|
|
handshake.mutex.Lock()
|
|
device.indexTable.Delete(handshake.localIndex)
|
|
handshake.Clear()
|
|
handshake.mutex.Unlock()
|
|
|
|
peer.FlushStagedPackets()
|
|
}
|
|
|
|
func (peer *Peer) ExpireCurrentKeypairs() {
|
|
handshake := &peer.handshake
|
|
handshake.mutex.Lock()
|
|
peer.device.indexTable.Delete(handshake.localIndex)
|
|
handshake.Clear()
|
|
peer.handshake.lastSentHandshake = time.Now().Add(-(RekeyTimeout + time.Second))
|
|
handshake.mutex.Unlock()
|
|
|
|
keypairs := &peer.keypairs
|
|
keypairs.Lock()
|
|
if keypairs.current != nil {
|
|
atomic.StoreUint64(&keypairs.current.sendNonce, RejectAfterMessages)
|
|
}
|
|
if keypairs.next != nil {
|
|
next := keypairs.loadNext()
|
|
atomic.StoreUint64(&next.sendNonce, RejectAfterMessages)
|
|
}
|
|
keypairs.Unlock()
|
|
}
|
|
|
|
func (peer *Peer) Stop() {
|
|
peer.state.mu.Lock()
|
|
defer peer.state.mu.Unlock()
|
|
|
|
if !peer.isRunning.Swap(false) {
|
|
return
|
|
}
|
|
|
|
peer.device.log.Verbosef("%v - Stopping...", peer)
|
|
|
|
peer.timersStop()
|
|
// Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit.
|
|
peer.queue.inbound <- nil
|
|
peer.queue.outbound <- nil
|
|
peer.stopping.Wait()
|
|
peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us
|
|
|
|
peer.ZeroAndFlushAll()
|
|
}
|
|
|
|
func (peer *Peer) SetEndpointFromPacket(endpoint conn.Endpoint) {
|
|
if peer.disableRoaming {
|
|
return
|
|
}
|
|
peer.Lock()
|
|
peer.endpoint = endpoint
|
|
peer.Unlock()
|
|
}
|