diff --git a/device/channels.go b/device/channels.go index 4bd6090..1e3e206 100644 --- a/device/channels.go +++ b/device/channels.go @@ -71,14 +71,15 @@ func newHandshakeQueue() *handshakeQueue { return q } +type autodrainingInboundQueue struct { + c chan *QueueInboundElement +} + // newAutodrainingInboundQueue returns a channel that will be drained when it gets GC'd. // It is useful in cases in which is it hard to manage the lifetime of the channel. // The returned channel must not be closed. Senders should signal shutdown using // some other means, such as sending a sentinel nil values. -func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement { - type autodrainingInboundQueue struct { - c chan *QueueInboundElement - } +func newAutodrainingInboundQueue(device *Device) *autodrainingInboundQueue { q := &autodrainingInboundQueue{ c: make(chan *QueueInboundElement, QueueInboundSize), } @@ -97,7 +98,11 @@ func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement { } } }) - return q.c + return q +} + +type autodrainingOutboundQueue struct { + c chan *QueueOutboundElement } // newAutodrainingOutboundQueue returns a channel that will be drained when it gets GC'd. @@ -105,10 +110,7 @@ func newAutodrainingInboundQueue(device *Device) chan *QueueInboundElement { // The returned channel must not be closed. Senders should signal shutdown using // some other means, such as sending a sentinel nil values. // All sends to the channel must be best-effort, because there may be no receivers. -func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement { - type autodrainingOutboundQueue struct { - c chan *QueueOutboundElement - } +func newAutodrainingOutboundQueue(device *Device) *autodrainingOutboundQueue { q := &autodrainingOutboundQueue{ c: make(chan *QueueOutboundElement, QueueOutboundSize), } @@ -127,5 +129,5 @@ func newAutodrainingOutboundQueue(device *Device) chan *QueueOutboundElement { } } }) - return q.c + return q } diff --git a/device/peer.go b/device/peer.go index 49b9acb..69238a6 100644 --- a/device/peer.go +++ b/device/peer.go @@ -57,8 +57,8 @@ type Peer struct { queue struct { staged chan *QueueOutboundElement // staged packets before a handshake is available - outbound chan *QueueOutboundElement // sequential ordering of udp transmission - inbound chan *QueueInboundElement // sequential ordering of tun writing + outbound *autodrainingOutboundQueue // sequential ordering of udp transmission + inbound *autodrainingInboundQueue // sequential ordering of tun writing } cookieGenerator CookieGenerator @@ -253,8 +253,8 @@ func (peer *Peer) Stop() { peer.timersStop() // Signal that RoutineSequentialSender and RoutineSequentialReceiver should exit. - peer.queue.inbound <- nil - peer.queue.outbound <- nil + peer.queue.inbound.c <- nil + peer.queue.outbound.c <- nil peer.stopping.Wait() peer.device.queue.encryption.wg.Done() // no more writes to encryption queue from us diff --git a/device/pools_test.go b/device/pools_test.go index 6caf7e7..a27ccc0 100644 --- a/device/pools_test.go +++ b/device/pools_test.go @@ -80,4 +80,4 @@ func BenchmarkWaitPool(b *testing.B) { }() } wg.Wait() -} \ No newline at end of file +} diff --git a/device/receive.go b/device/receive.go index 7acb7d9..3fc6831 100644 --- a/device/receive.go +++ b/device/receive.go @@ -167,7 +167,7 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind conn.Bind) { // add to decryption queues if peer.isRunning.Get() { - peer.queue.inbound <- elem + peer.queue.inbound.c <- elem device.queue.decryption.c <- elem buffer = device.GetMessageBuffer() } else { @@ -402,7 +402,7 @@ func (peer *Peer) RoutineSequentialReceiver() { }() device.log.Verbosef("%v - Routine: sequential receiver - started", peer) - for elem := range peer.queue.inbound { + for elem := range peer.queue.inbound.c { if elem == nil { return } @@ -477,7 +477,7 @@ func (peer *Peer) RoutineSequentialReceiver() { if err != nil && !device.isClosed() { device.log.Errorf("Failed to write packet to TUN device: %v", err) } - if len(peer.queue.inbound) == 0 { + if len(peer.queue.inbound.c) == 0 { err = device.tun.device.Flush() if err != nil { peer.device.log.Errorf("Unable to flush packets: %v", err) diff --git a/device/send.go b/device/send.go index 911ee5c..783e5b9 100644 --- a/device/send.go +++ b/device/send.go @@ -317,7 +317,7 @@ top: // add to parallel and sequential queue if peer.isRunning.Get() { - peer.queue.outbound <- elem + peer.queue.outbound.c <- elem peer.device.queue.encryption.c <- elem } else { peer.device.PutMessageBuffer(elem.buffer) @@ -410,7 +410,7 @@ func (peer *Peer) RoutineSequentialSender() { }() device.log.Verbosef("%v - Routine: sequential sender - started", peer) - for elem := range peer.queue.outbound { + for elem := range peer.queue.outbound.c { if elem == nil { return }