Merge branch 'master' of ssh://git.zx2c4.com/wireguard-go

This commit is contained in:
Mathias Hall-Andersen 2018-05-05 22:10:22 +02:00
commit 375dcbd4ae
3 changed files with 36 additions and 31 deletions

View file

@ -13,6 +13,10 @@ import (
"time" "time"
) )
const (
DeviceRoutineNumberPerCPU = 3
)
type Device struct { type Device struct {
isUp AtomicBool // device is (going) up isUp AtomicBool // device is (going) up
isClosed AtomicBool // device is closed? (acting as guard) isClosed AtomicBool // device is closed? (acting as guard)
@ -21,6 +25,7 @@ type Device struct {
// synchronized resources (locks acquired in order) // synchronized resources (locks acquired in order)
state struct { state struct {
stopping sync.WaitGroup
mutex sync.Mutex mutex sync.Mutex
changing AtomicBool changing AtomicBool
current bool current bool
@ -306,7 +311,9 @@ func NewDevice(tun TUNDevice, logger *Logger) *Device {
// start workers // start workers
for i := 0; i < runtime.NumCPU(); i += 1 { cpus := runtime.NumCPU()
device.state.stopping.Add(DeviceRoutineNumberPerCPU * cpus)
for i := 0; i < cpus; i += 1 {
go device.RoutineEncryption() go device.RoutineEncryption()
go device.RoutineDecryption() go device.RoutineDecryption()
go device.RoutineHandshake() go device.RoutineHandshake()
@ -360,6 +367,25 @@ func (device *Device) RemoveAllPeers() {
device.peers.keyMap = make(map[NoisePublicKey]*Peer) device.peers.keyMap = make(map[NoisePublicKey]*Peer)
} }
func (device *Device) FlushPacketQueues() {
for {
select {
case elem, ok := <-device.queue.decryption:
if ok {
elem.Drop()
}
case elem, ok := <-device.queue.encryption:
if ok {
elem.Drop()
}
case <-device.queue.handshake:
default:
return
}
}
}
func (device *Device) Close() { func (device *Device) Close() {
if device.isClosed.Swap(true) { if device.isClosed.Swap(true) {
return return
@ -376,6 +402,9 @@ func (device *Device) Close() {
device.signal.stop.Broadcast() device.signal.stop.Broadcast()
device.state.stopping.Wait()
device.FlushPacketQueues()
device.RemoveAllPeers() device.RemoveAllPeers()
device.rate.limiter.Close() device.rate.limiter.Close()

View file

@ -238,17 +238,8 @@ func (device *Device) RoutineDecryption() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
for {
select {
case elem, ok := <-device.queue.decryption:
if ok {
elem.Drop()
}
default:
break
}
}
logDebug.Println("Routine: decryption worker - stopped") logDebug.Println("Routine: decryption worker - stopped")
device.state.stopping.Done()
}() }()
logDebug.Println("Routine: decryption worker - started") logDebug.Println("Routine: decryption worker - started")
@ -313,14 +304,8 @@ func (device *Device) RoutineHandshake() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
for {
select {
case <-device.queue.handshake:
default:
return
}
}
logDebug.Println("Routine: handshake worker - stopped") logDebug.Println("Routine: handshake worker - stopped")
device.state.stopping.Done()
}() }()
logDebug.Println("Routine: handshake worker - started") logDebug.Println("Routine: handshake worker - started")
@ -549,8 +534,8 @@ func (peer *Peer) RoutineSequentialReceiver() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
peer.routines.stopping.Done()
logDebug.Println(peer, ": Routine: sequential receiver - stopped") logDebug.Println(peer, ": Routine: sequential receiver - stopped")
peer.routines.stopping.Done()
}() }()
logDebug.Println(peer, ": Routine: sequential receiver - started") logDebug.Println(peer, ": Routine: sequential receiver - started")

15
send.go
View file

@ -200,8 +200,8 @@ func (peer *Peer) RoutineNonce() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
peer.routines.stopping.Done()
logDebug.Println(peer, ": Routine: nonce worker - stopped") logDebug.Println(peer, ": Routine: nonce worker - stopped")
peer.routines.stopping.Done()
}() }()
peer.routines.starting.Done() peer.routines.starting.Done()
@ -277,17 +277,8 @@ func (device *Device) RoutineEncryption() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
for {
select {
case elem, ok := <-device.queue.encryption:
if ok {
elem.Drop()
}
default:
break
}
}
logDebug.Println("Routine: encryption worker - stopped") logDebug.Println("Routine: encryption worker - stopped")
device.state.stopping.Done()
}() }()
logDebug.Println("Routine: encryption worker - started") logDebug.Println("Routine: encryption worker - started")
@ -360,8 +351,8 @@ func (peer *Peer) RoutineSequentialSender() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
peer.routines.stopping.Done()
logDebug.Println(peer, ": Routine: sequential sender - stopped") logDebug.Println(peer, ": Routine: sequential sender - stopped")
peer.routines.stopping.Done()
}() }()
logDebug.Println(peer, ": Routine: sequential sender - started") logDebug.Println(peer, ": Routine: sequential sender - started")