device: tie encryption queue lifetime to the peers that write to it
Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
parent
4846070322
commit
8a374a35a0
|
@ -397,6 +397,10 @@ func (device *Device) Close() {
|
||||||
|
|
||||||
device.isUp.Set(false)
|
device.isUp.Set(false)
|
||||||
|
|
||||||
|
// Remove peers before closing queues,
|
||||||
|
// because peers assume that queues are active.
|
||||||
|
device.RemoveAllPeers()
|
||||||
|
|
||||||
// We kept a reference to the encryption and decryption queues,
|
// We kept a reference to the encryption and decryption queues,
|
||||||
// in case we started any new peers that might write to them.
|
// in case we started any new peers that might write to them.
|
||||||
// No new peers are coming; we are done with these queues.
|
// No new peers are coming; we are done with these queues.
|
||||||
|
@ -405,8 +409,6 @@ func (device *Device) Close() {
|
||||||
device.queue.handshake.wg.Done()
|
device.queue.handshake.wg.Done()
|
||||||
device.state.stopping.Wait()
|
device.state.stopping.Wait()
|
||||||
|
|
||||||
device.RemoveAllPeers()
|
|
||||||
|
|
||||||
device.rate.limiter.Close()
|
device.rate.limiter.Close()
|
||||||
|
|
||||||
device.state.changing.Set(false)
|
device.state.changing.Set(false)
|
||||||
|
|
|
@ -177,6 +177,7 @@ func (peer *Peer) Start() {
|
||||||
if peer.queue.staged == nil {
|
if peer.queue.staged == nil {
|
||||||
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
|
peer.queue.staged = make(chan *QueueOutboundElement, QueueStagedSize)
|
||||||
}
|
}
|
||||||
|
peer.device.queue.encryption.wg.Add(1) // keep encryption queue open for our writes
|
||||||
|
|
||||||
peer.timersInit()
|
peer.timersInit()
|
||||||
peer.handshake.lastSentHandshake = time.Now().Add(-(RekeyTimeout + time.Second))
|
peer.handshake.lastSentHandshake = time.Now().Add(-(RekeyTimeout + time.Second))
|
||||||
|
@ -248,6 +249,7 @@ func (peer *Peer) Stop() {
|
||||||
close(peer.queue.inbound)
|
close(peer.queue.inbound)
|
||||||
close(peer.queue.outbound)
|
close(peer.queue.outbound)
|
||||||
peer.stopping.Wait()
|
peer.stopping.Wait()
|
||||||
|
peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us
|
||||||
|
|
||||||
peer.ZeroAndFlushAll()
|
peer.ZeroAndFlushAll()
|
||||||
}
|
}
|
||||||
|
|
|
@ -291,8 +291,6 @@ func (peer *Peer) StagePacket(elem *QueueOutboundElement) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (peer *Peer) SendStagedPackets() {
|
func (peer *Peer) SendStagedPackets() {
|
||||||
peer.device.queue.encryption.wg.Add(1)
|
|
||||||
defer peer.device.queue.encryption.wg.Done()
|
|
||||||
top:
|
top:
|
||||||
if len(peer.queue.staged) == 0 || !peer.device.isUp.Get() {
|
if len(peer.queue.staged) == 0 || !peer.device.isUp.Get() {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue