Removed old signals

This commit is contained in:
Mathias Hall-Andersen 2018-05-05 04:15:07 +02:00
parent 4d9f3a2f53
commit 7a83f2565a
7 changed files with 74 additions and 114 deletions

View file

@ -29,11 +29,10 @@ func (e *Event) Clear() {
} }
func (e *Event) Fire() { func (e *Event) Fire() {
if e == nil || atomic.SwapInt32(&e.guard, 1) != 0 { if atomic.SwapInt32(&e.guard, 1) != 0 {
return return
} }
now := time.Now() if now := time.Now(); now.After(e.next) {
if e.next.After(now) {
select { select {
case e.C <- struct{}{}: case e.C <- struct{}{}:
default: default:

View file

@ -148,7 +148,6 @@ func main() {
logger.Debug.Println("Debug log enabled") logger.Debug.Println("Debug log enabled")
if err != nil { if err != nil {
logger.Error.Println("Failed to create TUN device:", err) logger.Error.Println("Failed to create TUN device:", err)
os.Exit(ExitSetupFailed) os.Exit(ExitSetupFailed)

View file

@ -571,7 +571,7 @@ func (peer *Peer) NewKeyPair() *KeyPair {
} else { } else {
kp.previous = kp.current kp.previous = kp.current
kp.current = keyPair kp.current = keyPair
peer.signal.newKeyPair.Send() peer.event.newKeyPair.Fire()
} }
} else { } else {

43
peer.go
View file

@ -15,7 +15,7 @@ import (
const ( const (
PeerRoutineNumber = 4 PeerRoutineNumber = 4
EventInterval = time.Millisecond EventInterval = 10 * time.Millisecond
) )
type Peer struct { type Peer struct {
@ -46,18 +46,14 @@ type Peer struct {
dataReceived *Event dataReceived *Event
anyAuthenticatedPacketReceived *Event anyAuthenticatedPacketReceived *Event
anyAuthenticatedPacketTraversal *Event anyAuthenticatedPacketTraversal *Event
handshakeComplete *Event handshakeCompleted *Event
handshakePushDeadline *Event handshakePushDeadline *Event
handshakeBegin *Event
ephemeralKeyCreated *Event ephemeralKeyCreated *Event
newKeyPair *Event
} }
signal struct { signal struct {
newKeyPair Signal // size 1, new key pair was generated
handshakeCompleted Signal // size 1, handshake completed
handshakeBegin Signal // size 1, begin new handshake begin
messageSend Signal // size 1, message was send to peer
messageReceived Signal // size 1, authenticated message recv
flushNonceQueue chan struct{} // size 0, empty queued packets flushNonceQueue chan struct{} // size 0, empty queued packets
} }
@ -115,6 +111,18 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
peer.device = device peer.device = device
peer.isRunning.Set(false) peer.isRunning.Set(false)
// events
peer.event.dataSent = newEvent(EventInterval)
peer.event.dataReceived = newEvent(EventInterval)
peer.event.anyAuthenticatedPacketReceived = newEvent(EventInterval)
peer.event.anyAuthenticatedPacketTraversal = newEvent(EventInterval)
peer.event.handshakeCompleted = newEvent(EventInterval)
peer.event.handshakePushDeadline = newEvent(EventInterval)
peer.event.handshakeBegin = newEvent(EventInterval)
peer.event.ephemeralKeyCreated = newEvent(EventInterval)
peer.event.newKeyPair = newEvent(EventInterval)
// map public key // map public key
_, ok := device.peers.keyMap[pk] _, ok := device.peers.keyMap[pk]
@ -202,22 +210,8 @@ func (peer *Peer) Start() {
peer.routines.starting.Wait() peer.routines.starting.Wait()
peer.routines.stopping.Wait() peer.routines.stopping.Wait()
// events
peer.event.dataSent = newEvent(EventInterval)
peer.event.dataReceived = newEvent(EventInterval)
peer.event.anyAuthenticatedPacketReceived = newEvent(EventInterval)
peer.event.anyAuthenticatedPacketTraversal = newEvent(EventInterval)
peer.event.handshakeComplete = newEvent(EventInterval)
peer.event.handshakePushDeadline = newEvent(EventInterval)
peer.event.ephemeralKeyCreated = newEvent(EventInterval)
// prepare queues and signals // prepare queues and signals
peer.signal.newKeyPair = NewSignal()
peer.signal.handshakeBegin = NewSignal()
peer.signal.handshakeCompleted = NewSignal()
peer.signal.flushNonceQueue = make(chan struct{}) peer.signal.flushNonceQueue = make(chan struct{})
peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize) peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize)
@ -269,12 +263,7 @@ func (peer *Peer) Stop() {
// close signals // close signals
peer.signal.newKeyPair.Close()
peer.signal.handshakeBegin.Close()
peer.signal.handshakeCompleted.Close()
close(peer.signal.flushNonceQueue) close(peer.signal.flushNonceQueue)
peer.signal.flushNonceQueue = nil peer.signal.flushNonceQueue = nil
// clear key pairs // clear key pairs

View file

@ -456,8 +456,8 @@ func (device *Device) RoutineHandshake() {
// update timers // update timers
peer.TimerAnyAuthenticatedPacketTraversal() peer.event.anyAuthenticatedPacketTraversal.Fire()
peer.TimerAnyAuthenticatedPacketReceived() peer.event.anyAuthenticatedPacketReceived.Fire()
// update endpoint // update endpoint
@ -489,7 +489,7 @@ func (device *Device) RoutineHandshake() {
err = peer.SendBuffer(packet) err = peer.SendBuffer(packet)
if err == nil { if err == nil {
peer.TimerAnyAuthenticatedPacketTraversal() peer.event.anyAuthenticatedPacketTraversal.Fire()
} else { } else {
logError.Println(peer, ": Failed to send handshake response", err) logError.Println(peer, ": Failed to send handshake response", err)
} }
@ -529,9 +529,9 @@ func (device *Device) RoutineHandshake() {
// update timers // update timers
peer.TimerAnyAuthenticatedPacketTraversal() peer.event.anyAuthenticatedPacketTraversal.Fire()
peer.TimerAnyAuthenticatedPacketReceived() peer.event.anyAuthenticatedPacketReceived.Fire()
peer.TimerHandshakeComplete() peer.event.handshakeCompleted.Fire()
// derive key-pair // derive key-pair
@ -584,8 +584,8 @@ func (peer *Peer) RoutineSequentialReceiver() {
continue continue
} }
peer.TimerAnyAuthenticatedPacketTraversal() peer.event.anyAuthenticatedPacketTraversal.Fire()
peer.TimerAnyAuthenticatedPacketReceived() peer.event.anyAuthenticatedPacketReceived.Fire()
peer.KeepKeyFreshReceiving() peer.KeepKeyFreshReceiving()
// check if using new key-pair // check if using new key-pair
@ -593,7 +593,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
kp := &peer.keyPairs kp := &peer.keyPairs
kp.mutex.Lock() kp.mutex.Lock()
if kp.next == elem.keyPair { if kp.next == elem.keyPair {
peer.TimerHandshakeComplete() peer.event.handshakeCompleted.Fire()
if kp.previous != nil { if kp.previous != nil {
device.DeleteKeyPair(kp.previous) device.DeleteKeyPair(kp.previous)
} }
@ -615,7 +615,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
logDebug.Println(peer, ": Received keep-alive") logDebug.Println(peer, ": Received keep-alive")
continue continue
} }
peer.TimerDataReceived() peer.event.dataReceived.Fire()
// verify source and strip padding // verify source and strip padding

11
send.go
View file

@ -222,6 +222,9 @@ func (peer *Peer) RoutineNonce() {
// wait for key pair // wait for key pair
for { for {
peer.event.newKeyPair.Clear()
keyPair = peer.keyPairs.Current() keyPair = peer.keyPairs.Current()
if keyPair != nil && keyPair.sendNonce < RejectAfterMessages { if keyPair != nil && keyPair.sendNonce < RejectAfterMessages {
if time.Now().Sub(keyPair.created) < RejectAfterTime { if time.Now().Sub(keyPair.created) < RejectAfterTime {
@ -229,12 +232,12 @@ func (peer *Peer) RoutineNonce() {
} }
} }
peer.signal.handshakeBegin.Send() peer.event.handshakeBegin.Fire()
logDebug.Println(peer, ": Awaiting key-pair") logDebug.Println(peer, ": Awaiting key-pair")
select { select {
case <-peer.signal.newKeyPair.Wait(): case <-peer.event.newKeyPair.C:
logDebug.Println(peer, ": Obtained awaited key-pair") logDebug.Println(peer, ": Obtained awaited key-pair")
case <-peer.signal.flushNonceQueue: case <-peer.signal.flushNonceQueue:
goto NextPacket goto NextPacket
@ -392,9 +395,9 @@ func (peer *Peer) RoutineSequentialSender() {
// update timers // update timers
peer.TimerAnyAuthenticatedPacketTraversal() peer.event.anyAuthenticatedPacketTraversal.Fire()
if len(elem.packet) != MessageKeepaliveSize { if len(elem.packet) != MessageKeepaliveSize {
peer.TimerDataSent() peer.event.dataSent.Fire()
} }
peer.KeepKeyFreshSending() peer.KeepKeyFreshSending()
} }

106
timers.go
View file

@ -27,10 +27,10 @@ func (peer *Peer) KeepKeyFreshSending() {
} }
nonce := atomic.LoadUint64(&kp.sendNonce) nonce := atomic.LoadUint64(&kp.sendNonce)
if nonce > RekeyAfterMessages { if nonce > RekeyAfterMessages {
peer.signal.handshakeBegin.Send() peer.event.handshakeBegin.Fire()
} }
if kp.isInitiator && time.Now().Sub(kp.created) > RekeyAfterTime { if kp.isInitiator && time.Now().Sub(kp.created) > RekeyAfterTime {
peer.signal.handshakeBegin.Send() peer.event.handshakeBegin.Fire()
} }
} }
@ -54,7 +54,7 @@ func (peer *Peer) KeepKeyFreshReceiving() {
if send { if send {
// do a last minute attempt at initiating a new handshake // do a last minute attempt at initiating a new handshake
peer.timer.sendLastMinuteHandshake.Set(true) peer.timer.sendLastMinuteHandshake.Set(true)
peer.signal.handshakeBegin.Send() peer.event.handshakeBegin.Fire()
} }
} }
@ -74,55 +74,13 @@ func (peer *Peer) SendKeepAlive() bool {
} }
} }
/* Event:
* Sent non-empty (authenticated) transport message
*/
func (peer *Peer) TimerDataSent() {
peer.event.dataSent.Fire()
}
/* Event:
* Received non-empty (authenticated) transport message
*
* Action:
* Set a timer to confirm the message using a keep-alive (if not already set)
*/
func (peer *Peer) TimerDataReceived() {
peer.event.dataReceived.Fire()
/*
if !peer.timer.keepalivePassive.Start(KeepaliveTimeout) {
peer.timer.needAnotherKeepalive.Set(true)
}
*/
}
/* Event:
* Any (authenticated) packet received
*/
func (peer *Peer) TimerAnyAuthenticatedPacketReceived() {
peer.event.anyAuthenticatedPacketReceived.Fire()
}
/* Event:
* Any authenticated packet send / received.
*
* Action:
* Push persistent keep-alive into the future
*/
func (peer *Peer) TimerAnyAuthenticatedPacketTraversal() {
peer.event.anyAuthenticatedPacketTraversal.Fire()
}
/* Called after successfully completing a handshake. /* Called after successfully completing a handshake.
* i.e. after: * i.e. after:
* *
* - Valid handshake response * - Valid handshake response
* - First transport message under the "next" key * - First transport message under the "next" key
*/ */
func (peer *Peer) TimerHandshakeComplete() { // peer.device.log.Info.Println(peer, ": New handshake completed")
peer.signal.handshakeCompleted.Send()
peer.device.log.Info.Println(peer, ": New handshake completed")
}
/* Event: /* Event:
* An ephemeral key is generated * An ephemeral key is generated
@ -145,10 +103,6 @@ func (peer *Peer) TimerEphemeralKeyCreated() {
*/ */
func (peer *Peer) sendNewHandshake() error { func (peer *Peer) sendNewHandshake() error {
// temporarily disable the handshake complete signal
peer.signal.handshakeCompleted.Disable()
// create initiation message // create initiation message
msg, err := peer.device.CreateMessageInitiation(peer) msg, err := peer.device.CreateMessageInitiation(peer)
@ -166,14 +120,9 @@ func (peer *Peer) sendNewHandshake() error {
// send to endpoint // send to endpoint
peer.TimerAnyAuthenticatedPacketTraversal() peer.event.anyAuthenticatedPacketTraversal.Fire()
err = peer.SendBuffer(packet) return peer.SendBuffer(packet)
if err == nil {
peer.signal.handshakeCompleted.Enable()
}
return err
} }
func newTimer() *time.Timer { func newTimer() *time.Timer {
@ -198,6 +147,8 @@ func (peer *Peer) RoutineTimerHandler() {
// reset all timers // reset all timers
enableHandshake := true
pendingHandshakeNew := false pendingHandshakeNew := false
pendingKeepalivePassive := false pendingKeepalivePassive := false
@ -309,12 +260,12 @@ func (peer *Peer) RoutineTimerHandler() {
// handshake timers // handshake timers
case <-timerHandshakeNew.C:
logInfo.Println(peer, ": Retrying handshake (timer event)")
peer.signal.handshakeBegin.Send()
case <-timerHandshakeTimeout.C: case <-timerHandshakeTimeout.C:
// allow new handshake to be send
enableHandshake = true
// clear source (in case this is causing problems) // clear source (in case this is causing problems)
peer.mutex.Lock() peer.mutex.Lock()
@ -339,6 +290,11 @@ func (peer *Peer) RoutineTimerHandler() {
logDebug.Println(peer, ": Send handshake initiation (subsequent)") logDebug.Println(peer, ": Send handshake initiation (subsequent)")
} }
// disable further handshakes
peer.event.handshakeBegin.Clear()
enableHandshake = false
case <-timerHandshakeDeadline.C: case <-timerHandshakeDeadline.C:
// clear all queued packets and stop keep-alive // clear all queued packets and stop keep-alive
@ -348,13 +304,19 @@ func (peer *Peer) RoutineTimerHandler() {
peer.flushNonceQueue() peer.flushNonceQueue()
signalSend(peer.signal.flushNonceQueue) signalSend(peer.signal.flushNonceQueue)
timerKeepalivePersistent.Stop() timerKeepalivePersistent.Stop()
peer.signal.handshakeBegin.Enable()
/* signals */ // disable further handshakes
case <-peer.signal.handshakeBegin.Wait(): peer.event.handshakeBegin.Clear()
enableHandshake = true
peer.signal.handshakeBegin.Disable() case <-peer.event.handshakeBegin.C:
if !enableHandshake {
continue
}
logDebug.Println(peer, ": Event, Handshake Begin")
err := peer.sendNewHandshake() err := peer.sendNewHandshake()
@ -372,7 +334,12 @@ func (peer *Peer) RoutineTimerHandler() {
timerHandshakeDeadline.Reset(RekeyAttemptTime) timerHandshakeDeadline.Reset(RekeyAttemptTime)
case <-peer.signal.handshakeCompleted.Wait(): // disable further handshakes
peer.event.handshakeBegin.Clear()
enableHandshake = false
case <-peer.event.handshakeCompleted.C:
logInfo.Println(peer, ": Handshake completed") logInfo.Println(peer, ": Handshake completed")
@ -383,9 +350,12 @@ func (peer *Peer) RoutineTimerHandler() {
timerHandshakeTimeout.Stop() timerHandshakeTimeout.Stop()
timerHandshakeDeadline.Stop() timerHandshakeDeadline.Stop()
peer.signal.handshakeBegin.Enable()
peer.timer.sendLastMinuteHandshake.Set(false) peer.timer.sendLastMinuteHandshake.Set(false)
// allow further handshakes
peer.event.handshakeBegin.Clear()
enableHandshake = true
} }
} }
} }