device: remove QueueOutboundElement.dropped
If we block when enqueuing encryption elements to the queue, then we never drop them. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
parent
291dbcf1f0
commit
7ee95e053c
|
@ -43,7 +43,6 @@ import (
|
||||||
*/
|
*/
|
||||||
|
|
||||||
type QueueOutboundElement struct {
|
type QueueOutboundElement struct {
|
||||||
dropped int32
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
buffer *[MaxMessageSize]byte // slice holding the packet data
|
buffer *[MaxMessageSize]byte // slice holding the packet data
|
||||||
packet []byte // slice of "buffer" (always!)
|
packet []byte // slice of "buffer" (always!)
|
||||||
|
@ -54,7 +53,6 @@ type QueueOutboundElement struct {
|
||||||
|
|
||||||
func (device *Device) NewOutboundElement() *QueueOutboundElement {
|
func (device *Device) NewOutboundElement() *QueueOutboundElement {
|
||||||
elem := device.GetOutboundElement()
|
elem := device.GetOutboundElement()
|
||||||
elem.dropped = AtomicFalse
|
|
||||||
elem.buffer = device.GetMessageBuffer()
|
elem.buffer = device.GetMessageBuffer()
|
||||||
elem.Mutex = sync.Mutex{}
|
elem.Mutex = sync.Mutex{}
|
||||||
elem.nonce = 0
|
elem.nonce = 0
|
||||||
|
@ -73,14 +71,6 @@ func (elem *QueueOutboundElement) clearPointers() {
|
||||||
elem.peer = nil
|
elem.peer = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (elem *QueueOutboundElement) Drop() {
|
|
||||||
atomic.StoreInt32(&elem.dropped, AtomicTrue)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (elem *QueueOutboundElement) IsDropped() bool {
|
|
||||||
return atomic.LoadInt32(&elem.dropped) == AtomicTrue
|
|
||||||
}
|
|
||||||
|
|
||||||
func addToNonceQueue(queue chan *QueueOutboundElement, elem *QueueOutboundElement, device *Device) {
|
func addToNonceQueue(queue chan *QueueOutboundElement, elem *QueueOutboundElement, device *Device) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -436,7 +426,6 @@ NextPacket:
|
||||||
}
|
}
|
||||||
|
|
||||||
elem.keypair = keypair
|
elem.keypair = keypair
|
||||||
elem.dropped = AtomicFalse
|
|
||||||
elem.Lock()
|
elem.Lock()
|
||||||
|
|
||||||
// add to parallel and sequential queue
|
// add to parallel and sequential queue
|
||||||
|
@ -476,15 +465,7 @@ func (device *Device) RoutineEncryption() {
|
||||||
logDebug.Println("Routine: encryption worker - started")
|
logDebug.Println("Routine: encryption worker - started")
|
||||||
|
|
||||||
for elem := range device.queue.encryption.c {
|
for elem := range device.queue.encryption.c {
|
||||||
|
|
||||||
// check if dropped
|
|
||||||
|
|
||||||
if elem.IsDropped() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// populate header fields
|
// populate header fields
|
||||||
|
|
||||||
header := elem.buffer[:MessageTransportHeaderSize]
|
header := elem.buffer[:MessageTransportHeaderSize]
|
||||||
|
|
||||||
fieldType := header[0:4]
|
fieldType := header[0:4]
|
||||||
|
@ -532,10 +513,6 @@ func (peer *Peer) RoutineSequentialSender() {
|
||||||
|
|
||||||
for elem := range peer.queue.outbound {
|
for elem := range peer.queue.outbound {
|
||||||
elem.Lock()
|
elem.Lock()
|
||||||
if elem.IsDropped() {
|
|
||||||
device.PutOutboundElement(elem)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !peer.isRunning.Get() {
|
if !peer.isRunning.Get() {
|
||||||
// peer has been stopped; return re-usable elems to the shared pool.
|
// peer has been stopped; return re-usable elems to the shared pool.
|
||||||
// This is an optimization only. It is possible for the peer to be stopped
|
// This is an optimization only. It is possible for the peer to be stopped
|
||||||
|
|
Loading…
Reference in a new issue