device: remove QueueInboundElement.dropped
Now that we block when enqueueing to the decryption queue, there is only one case in which we "drop" a inbound element, when decryption fails. We can use a simple, obvious, sync-free sentinel for that, elem.packet == nil. Also, we can return the message buffer to the pool slightly later, which further simplifies the code. Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
This commit is contained in:
parent
7ee95e053c
commit
a86492a567
|
@ -30,7 +30,6 @@ type QueueHandshakeElement struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueueInboundElement struct {
|
type QueueInboundElement struct {
|
||||||
dropped int32
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
buffer *[MaxMessageSize]byte
|
buffer *[MaxMessageSize]byte
|
||||||
packet []byte
|
packet []byte
|
||||||
|
@ -50,14 +49,6 @@ func (elem *QueueInboundElement) clearPointers() {
|
||||||
elem.endpoint = nil
|
elem.endpoint = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (elem *QueueInboundElement) Drop() {
|
|
||||||
atomic.StoreInt32(&elem.dropped, AtomicTrue)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (elem *QueueInboundElement) IsDropped() bool {
|
|
||||||
return atomic.LoadInt32(&elem.dropped) == AtomicTrue
|
|
||||||
}
|
|
||||||
|
|
||||||
func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, elem QueueHandshakeElement) bool {
|
func (device *Device) addToHandshakeQueue(queue chan QueueHandshakeElement, elem QueueHandshakeElement) bool {
|
||||||
select {
|
select {
|
||||||
case queue <- elem:
|
case queue <- elem:
|
||||||
|
@ -180,7 +171,6 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) {
|
||||||
elem.packet = packet
|
elem.packet = packet
|
||||||
elem.buffer = buffer
|
elem.buffer = buffer
|
||||||
elem.keypair = keypair
|
elem.keypair = keypair
|
||||||
elem.dropped = AtomicFalse
|
|
||||||
elem.endpoint = endpoint
|
elem.endpoint = endpoint
|
||||||
elem.counter = 0
|
elem.counter = 0
|
||||||
elem.Mutex = sync.Mutex{}
|
elem.Mutex = sync.Mutex{}
|
||||||
|
@ -243,19 +233,11 @@ func (device *Device) RoutineDecryption() {
|
||||||
logDebug.Println("Routine: decryption worker - started")
|
logDebug.Println("Routine: decryption worker - started")
|
||||||
|
|
||||||
for elem := range device.queue.decryption.c {
|
for elem := range device.queue.decryption.c {
|
||||||
// check if dropped
|
|
||||||
|
|
||||||
if elem.IsDropped() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// split message into fields
|
// split message into fields
|
||||||
|
|
||||||
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
|
counter := elem.packet[MessageTransportOffsetCounter:MessageTransportOffsetContent]
|
||||||
content := elem.packet[MessageTransportOffsetContent:]
|
content := elem.packet[MessageTransportOffsetContent:]
|
||||||
|
|
||||||
// decrypt and release to consumer
|
// decrypt and release to consumer
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
elem.counter = binary.LittleEndian.Uint64(counter)
|
elem.counter = binary.LittleEndian.Uint64(counter)
|
||||||
// copy counter to nonce
|
// copy counter to nonce
|
||||||
|
@ -267,8 +249,7 @@ func (device *Device) RoutineDecryption() {
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
elem.Drop()
|
elem.packet = nil
|
||||||
device.PutMessageBuffer(elem.buffer)
|
|
||||||
}
|
}
|
||||||
elem.Unlock()
|
elem.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -484,9 +465,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
||||||
logDebug.Println(peer, "- Routine: sequential receiver - stopped")
|
logDebug.Println(peer, "- Routine: sequential receiver - stopped")
|
||||||
peer.routines.stopping.Done()
|
peer.routines.stopping.Done()
|
||||||
if elem != nil {
|
if elem != nil {
|
||||||
if !elem.IsDropped() {
|
|
||||||
device.PutMessageBuffer(elem.buffer)
|
device.PutMessageBuffer(elem.buffer)
|
||||||
}
|
|
||||||
device.PutInboundElement(elem)
|
device.PutInboundElement(elem)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -495,9 +474,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if elem != nil {
|
if elem != nil {
|
||||||
if !elem.IsDropped() {
|
|
||||||
device.PutMessageBuffer(elem.buffer)
|
device.PutMessageBuffer(elem.buffer)
|
||||||
}
|
|
||||||
device.PutInboundElement(elem)
|
device.PutInboundElement(elem)
|
||||||
elem = nil
|
elem = nil
|
||||||
}
|
}
|
||||||
|
@ -513,15 +490,13 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for decryption
|
// wait for decryption
|
||||||
|
|
||||||
elem.Lock()
|
elem.Lock()
|
||||||
|
if elem.packet == nil {
|
||||||
if elem.IsDropped() {
|
// decryption failed
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for replay
|
// check for replay
|
||||||
|
|
||||||
if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) {
|
if !elem.keypair.replayFilter.ValidateCounter(elem.counter, RejectAfterMessages) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue