Fix shutdown races

This commit is contained in:
Jason A. Donenfeld 2018-09-24 01:52:02 +02:00
parent 833597b585
commit ebc7541953
3 changed files with 42 additions and 12 deletions

View file

@ -377,10 +377,11 @@ func (device *Device) Close() {
close(device.signals.stop) close(device.signals.stop)
device.RemoveAllPeers()
device.state.stopping.Wait() device.state.stopping.Wait()
device.FlushPacketQueues() device.FlushPacketQueues()
device.RemoveAllPeers()
device.rate.limiter.Close() device.rate.limiter.Close()
device.state.changing.Set(false) device.state.changing.Set(false)

View file

@ -247,7 +247,6 @@ func (device *Device) RoutineDecryption() {
// check if dropped // check if dropped
if elem.IsDropped() { if elem.IsDropped() {
device.PutInboundElement(elem)
continue continue
} }
@ -281,7 +280,6 @@ func (device *Device) RoutineDecryption() {
if err != nil { if err != nil {
elem.Drop() elem.Drop()
device.PutMessageBuffer(elem.buffer) device.PutMessageBuffer(elem.buffer)
elem.buffer = nil
} }
elem.mutex.Unlock() elem.mutex.Unlock()
} }
@ -313,6 +311,7 @@ func (device *Device) RoutineHandshake() {
for { for {
if elem.buffer != nil { if elem.buffer != nil {
device.PutMessageBuffer(elem.buffer) device.PutMessageBuffer(elem.buffer)
elem.buffer = nil
} }
select { select {
@ -494,7 +493,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
logDebug.Println(peer, "- Routine: sequential receiver - stopped") logDebug.Println(peer, "- Routine: sequential receiver - stopped")
peer.routines.stopping.Done() peer.routines.stopping.Done()
if elem != nil { if elem != nil {
if elem.buffer != nil { if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer) device.PutMessageBuffer(elem.buffer)
} }
device.PutInboundElement(elem) device.PutInboundElement(elem)
@ -507,10 +506,11 @@ func (peer *Peer) RoutineSequentialReceiver() {
for { for {
if elem != nil { if elem != nil {
if elem.buffer != nil { if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer) device.PutMessageBuffer(elem.buffer)
} }
device.PutInboundElement(elem) device.PutInboundElement(elem)
elem = nil
} }
select { select {

43
send.go
View file

@ -341,12 +341,6 @@ func (peer *Peer) RoutineNonce() {
device := peer.device device := peer.device
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() {
logDebug.Println(peer, "- Routine: nonce worker - stopped")
peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
peer.routines.stopping.Done()
}()
flush := func() { flush := func() {
for { for {
select { select {
@ -359,6 +353,13 @@ func (peer *Peer) RoutineNonce() {
} }
} }
defer func() {
flush()
logDebug.Println(peer, "- Routine: nonce worker - stopped")
peer.queue.packetInNonceQueueIsAwaitingKey.Set(false)
peer.routines.stopping.Done()
}()
peer.routines.starting.Done() peer.routines.starting.Done()
logDebug.Println(peer, "- Routine: nonce worker - started") logDebug.Println(peer, "- Routine: nonce worker - started")
@ -461,6 +462,19 @@ func (device *Device) RoutineEncryption() {
logDebug := device.log.Debug logDebug := device.log.Debug
defer func() { defer func() {
for {
select {
case elem, ok := <-device.queue.encryption:
if ok && !elem.IsDropped() {
elem.Drop()
device.PutMessageBuffer(elem.buffer)
elem.mutex.Unlock()
}
default:
goto out
}
}
out:
logDebug.Println("Routine: encryption worker - stopped") logDebug.Println("Routine: encryption worker - stopped")
device.state.stopping.Done() device.state.stopping.Done()
}() }()
@ -485,7 +499,6 @@ func (device *Device) RoutineEncryption() {
// check if dropped // check if dropped
if elem.IsDropped() { if elem.IsDropped() {
device.PutOutboundElement(elem)
continue continue
} }
@ -540,6 +553,22 @@ func (peer *Peer) RoutineSequentialSender() {
logError := device.log.Error logError := device.log.Error
defer func() { defer func() {
for {
select {
case elem, ok := <-peer.queue.outbound:
if ok {
if !elem.IsDropped() {
device.PutMessageBuffer(elem.buffer)
elem.Drop()
}
device.PutOutboundElement(elem)
elem.mutex.Unlock()
}
default:
goto out
}
}
out:
logDebug.Println(peer, "- Routine: sequential sender - stopped") logDebug.Println(peer, "- Routine: sequential sender - stopped")
peer.routines.stopping.Done() peer.routines.stopping.Done()
}() }()