More robust solution to close deadlock
This commit is contained in:
parent
09a9bc2899
commit
a46401bbb1
31
device.go
31
device.go
|
@ -13,6 +13,10 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
DeviceRoutineNumberPerCPU = 3
|
||||
)
|
||||
|
||||
type Device struct {
|
||||
isUp AtomicBool // device is (going) up
|
||||
isClosed AtomicBool // device is closed? (acting as guard)
|
||||
|
@ -21,6 +25,7 @@ type Device struct {
|
|||
// synchronized resources (locks acquired in order)
|
||||
|
||||
state struct {
|
||||
stopping sync.WaitGroup
|
||||
mutex sync.Mutex
|
||||
changing AtomicBool
|
||||
current bool
|
||||
|
@ -306,7 +311,9 @@ func NewDevice(tun TUNDevice, logger *Logger) *Device {
|
|||
|
||||
// start workers
|
||||
|
||||
for i := 0; i < runtime.NumCPU(); i += 1 {
|
||||
cpus := runtime.NumCPU()
|
||||
device.state.stopping.Add(DeviceRoutineNumberPerCPU * cpus)
|
||||
for i := 0; i < cpus; i += 1 {
|
||||
go device.RoutineEncryption()
|
||||
go device.RoutineDecryption()
|
||||
go device.RoutineHandshake()
|
||||
|
@ -360,6 +367,25 @@ func (device *Device) RemoveAllPeers() {
|
|||
device.peers.keyMap = make(map[NoisePublicKey]*Peer)
|
||||
}
|
||||
|
||||
func (device *Device) FlushPacketQueues() {
|
||||
for {
|
||||
select {
|
||||
case elem, ok := <-device.queue.decryption:
|
||||
if ok {
|
||||
elem.Drop()
|
||||
}
|
||||
case elem, ok := <-device.queue.encryption:
|
||||
if ok {
|
||||
elem.Drop()
|
||||
}
|
||||
case <-device.queue.handshake:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (device *Device) Close() {
|
||||
if device.isClosed.Swap(true) {
|
||||
return
|
||||
|
@ -376,6 +402,9 @@ func (device *Device) Close() {
|
|||
|
||||
device.signal.stop.Broadcast()
|
||||
|
||||
device.state.stopping.Wait()
|
||||
device.FlushPacketQueues()
|
||||
|
||||
device.RemoveAllPeers()
|
||||
device.rate.limiter.Close()
|
||||
|
||||
|
|
21
receive.go
21
receive.go
|
@ -238,17 +238,7 @@ func (device *Device) RoutineDecryption() {
|
|||
|
||||
logDebug := device.log.Debug
|
||||
defer func() {
|
||||
for {
|
||||
select {
|
||||
case elem, ok := <-device.queue.decryption:
|
||||
if ok {
|
||||
elem.Drop()
|
||||
}
|
||||
default:
|
||||
goto out
|
||||
}
|
||||
}
|
||||
out:
|
||||
device.state.stopping.Done()
|
||||
logDebug.Println("Routine: decryption worker - stopped")
|
||||
}()
|
||||
logDebug.Println("Routine: decryption worker - started")
|
||||
|
@ -314,14 +304,7 @@ func (device *Device) RoutineHandshake() {
|
|||
logDebug := device.log.Debug
|
||||
|
||||
defer func() {
|
||||
for {
|
||||
select {
|
||||
case <-device.queue.handshake:
|
||||
default:
|
||||
goto out
|
||||
}
|
||||
}
|
||||
out:
|
||||
device.state.stopping.Done()
|
||||
logDebug.Println("Routine: handshake worker - stopped")
|
||||
}()
|
||||
|
||||
|
|
12
send.go
12
send.go
|
@ -274,17 +274,7 @@ func (device *Device) RoutineEncryption() {
|
|||
logDebug := device.log.Debug
|
||||
|
||||
defer func() {
|
||||
for {
|
||||
select {
|
||||
case elem, ok := <-device.queue.encryption:
|
||||
if ok {
|
||||
elem.Drop()
|
||||
}
|
||||
default:
|
||||
goto out
|
||||
}
|
||||
}
|
||||
out:
|
||||
device.state.stopping.Done()
|
||||
logDebug.Println("Routine: encryption worker - stopped")
|
||||
}()
|
||||
|
||||
|
|
Loading…
Reference in a new issue