Initial version of migration to new event model
- Begin move away from global timer state. - Made logging format more consistent
This commit is contained in:
parent
168ef61a63
commit
6db41d5a26
8 changed files with 203 additions and 183 deletions
44
event.go
Normal file
44
event.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
guard int32
|
||||
next time.Time
|
||||
interval time.Duration
|
||||
C chan struct{}
|
||||
}
|
||||
|
||||
func newEvent(interval time.Duration) *Event {
|
||||
return &Event{
|
||||
guard: 0,
|
||||
next: time.Now(),
|
||||
interval: interval,
|
||||
C: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Event) Clear() {
|
||||
select {
|
||||
case <-e.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Event) Fire() {
|
||||
if e == nil || atomic.SwapInt32(&e.guard, 1) != 0 {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
if e.next.After(now) {
|
||||
select {
|
||||
case e.C <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
e.next = now.Add(e.interval)
|
||||
}
|
||||
atomic.StoreInt32(&e.guard, 0)
|
||||
}
|
62
peer.go
62
peer.go
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
const (
|
||||
PeerRoutineNumber = 4
|
||||
EventInterval = time.Millisecond
|
||||
)
|
||||
|
||||
type Peer struct {
|
||||
|
@ -35,26 +36,27 @@ type Peer struct {
|
|||
nextKeepalive time.Time
|
||||
}
|
||||
|
||||
event struct {
|
||||
dataSent *Event
|
||||
dataReceived *Event
|
||||
anyAuthenticatedPacketReceived *Event
|
||||
anyAuthenticatedPacketTraversal *Event
|
||||
handshakeComplete *Event
|
||||
handshakePushDeadline *Event
|
||||
ephemeralKeyCreated *Event
|
||||
}
|
||||
|
||||
signal struct {
|
||||
newKeyPair Signal // size 1, new key pair was generated
|
||||
handshakeCompleted Signal // size 1, handshake completed
|
||||
handshakeBegin Signal // size 1, begin new handshake begin
|
||||
flushNonceQueue Signal // size 1, empty queued packets
|
||||
messageSend Signal // size 1, message was send to peer
|
||||
messageReceived Signal // size 1, authenticated message recv
|
||||
|
||||
flushNonceQueue chan struct{} // size 0, empty queued packets
|
||||
}
|
||||
|
||||
timer struct {
|
||||
|
||||
// state related to WireGuard timers
|
||||
|
||||
keepalivePersistent Timer // set for persistent keep-alive
|
||||
keepalivePassive Timer // set upon receiving messages
|
||||
zeroAllKeys Timer // zero all key material
|
||||
handshakeNew Timer // begin a new handshake (stale)
|
||||
handshakeDeadline Timer // complete handshake timeout
|
||||
handshakeTimeout Timer // current handshake message timeout
|
||||
|
||||
sendLastMinuteHandshake AtomicBool
|
||||
needAnotherKeepalive AtomicBool
|
||||
}
|
||||
|
@ -108,13 +110,6 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) {
|
|||
peer.device = device
|
||||
peer.isRunning.Set(false)
|
||||
|
||||
peer.timer.zeroAllKeys = NewTimer()
|
||||
peer.timer.keepalivePersistent = NewTimer()
|
||||
peer.timer.keepalivePassive = NewTimer()
|
||||
peer.timer.handshakeNew = NewTimer()
|
||||
peer.timer.handshakeDeadline = NewTimer()
|
||||
peer.timer.handshakeTimeout = NewTimer()
|
||||
|
||||
// map public key
|
||||
|
||||
_, ok := device.peers.keyMap[pk]
|
||||
|
@ -195,19 +190,30 @@ func (peer *Peer) Start() {
|
|||
}
|
||||
|
||||
device := peer.device
|
||||
device.log.Debug.Println(peer.String() + ": Starting...")
|
||||
device.log.Debug.Println(peer, ": Starting...")
|
||||
|
||||
// sanity check : these should be 0
|
||||
|
||||
peer.routines.starting.Wait()
|
||||
peer.routines.stopping.Wait()
|
||||
|
||||
// events
|
||||
|
||||
peer.event.dataSent = newEvent(EventInterval)
|
||||
peer.event.dataReceived = newEvent(EventInterval)
|
||||
peer.event.anyAuthenticatedPacketReceived = newEvent(EventInterval)
|
||||
peer.event.anyAuthenticatedPacketTraversal = newEvent(EventInterval)
|
||||
peer.event.handshakeComplete = newEvent(EventInterval)
|
||||
peer.event.handshakePushDeadline = newEvent(EventInterval)
|
||||
peer.event.ephemeralKeyCreated = newEvent(EventInterval)
|
||||
|
||||
// prepare queues and signals
|
||||
|
||||
peer.signal.newKeyPair = NewSignal()
|
||||
peer.signal.handshakeBegin = NewSignal()
|
||||
peer.signal.handshakeCompleted = NewSignal()
|
||||
peer.signal.flushNonceQueue = NewSignal()
|
||||
|
||||
peer.signal.flushNonceQueue = make(chan struct{})
|
||||
|
||||
peer.queue.nonce = make(chan *QueueOutboundElement, QueueOutboundSize)
|
||||
peer.queue.outbound = make(chan *QueueOutboundElement, QueueOutboundSize)
|
||||
|
@ -242,7 +248,7 @@ func (peer *Peer) Stop() {
|
|||
}
|
||||
|
||||
device := peer.device
|
||||
device.log.Debug.Println(peer.String() + ": Stopping...")
|
||||
device.log.Debug.Println(peer, ": Stopping...")
|
||||
|
||||
// stop & wait for ongoing peer routines
|
||||
|
||||
|
@ -250,15 +256,6 @@ func (peer *Peer) Stop() {
|
|||
peer.routines.stop.Broadcast()
|
||||
peer.routines.stopping.Wait()
|
||||
|
||||
// stop timers
|
||||
|
||||
peer.timer.keepalivePersistent.Stop()
|
||||
peer.timer.keepalivePassive.Stop()
|
||||
peer.timer.zeroAllKeys.Stop()
|
||||
peer.timer.handshakeNew.Stop()
|
||||
peer.timer.handshakeDeadline.Stop()
|
||||
peer.timer.handshakeTimeout.Stop()
|
||||
|
||||
// close queues
|
||||
|
||||
close(peer.queue.nonce)
|
||||
|
@ -270,7 +267,10 @@ func (peer *Peer) Stop() {
|
|||
peer.signal.newKeyPair.Close()
|
||||
peer.signal.handshakeBegin.Close()
|
||||
peer.signal.handshakeCompleted.Close()
|
||||
peer.signal.flushNonceQueue.Close()
|
||||
|
||||
close(peer.signal.flushNonceQueue)
|
||||
|
||||
peer.signal.flushNonceQueue = nil
|
||||
|
||||
// clear key pairs
|
||||
|
||||
|
|
25
receive.go
25
receive.go
|
@ -207,6 +207,9 @@ func (device *Device) RoutineReceiveIncoming(IP int, bind Bind) {
|
|||
|
||||
case MessageCookieReplyType:
|
||||
okay = len(packet) == MessageCookieReplySize
|
||||
|
||||
default:
|
||||
logDebug.Println("Received message with unknown type")
|
||||
}
|
||||
|
||||
if okay {
|
||||
|
@ -457,7 +460,7 @@ func (device *Device) RoutineHandshake() {
|
|||
peer.endpoint = elem.endpoint
|
||||
peer.mutex.Unlock()
|
||||
|
||||
logDebug.Println(peer.String() + ": Received handshake initiation")
|
||||
logDebug.Println(peer, ": Received handshake initiation")
|
||||
|
||||
// create response
|
||||
|
||||
|
@ -470,7 +473,7 @@ func (device *Device) RoutineHandshake() {
|
|||
peer.TimerEphemeralKeyCreated()
|
||||
peer.NewKeyPair()
|
||||
|
||||
logDebug.Println(peer.String(), "Creating handshake response")
|
||||
logDebug.Println(peer, ": Creating handshake response")
|
||||
|
||||
writer := bytes.NewBuffer(temp[:0])
|
||||
binary.Write(writer, binary.LittleEndian, response)
|
||||
|
@ -483,7 +486,7 @@ func (device *Device) RoutineHandshake() {
|
|||
if err == nil {
|
||||
peer.TimerAnyAuthenticatedPacketTraversal()
|
||||
} else {
|
||||
logError.Println(peer.String(), "Failed to send handshake response", err)
|
||||
logError.Println(peer, ": Failed to send handshake response", err)
|
||||
}
|
||||
|
||||
case MessageResponseType:
|
||||
|
@ -515,7 +518,7 @@ func (device *Device) RoutineHandshake() {
|
|||
peer.endpoint = elem.endpoint
|
||||
peer.mutex.Unlock()
|
||||
|
||||
logDebug.Println(peer.String() + ": Received handshake response")
|
||||
logDebug.Println(peer, ": Received handshake response")
|
||||
|
||||
peer.TimerEphemeralKeyCreated()
|
||||
|
||||
|
@ -542,10 +545,10 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
|||
|
||||
defer func() {
|
||||
peer.routines.stopping.Done()
|
||||
logDebug.Println(peer.String() + ": Routine: sequential receiver - stopped")
|
||||
logDebug.Println(peer, ": Routine: sequential receiver - stopped")
|
||||
}()
|
||||
|
||||
logDebug.Println(peer.String() + ": Routine: sequential receiver - started")
|
||||
logDebug.Println(peer, ": Routine: sequential receiver - started")
|
||||
|
||||
peer.routines.starting.Done()
|
||||
|
||||
|
@ -604,7 +607,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
|||
// check for keep-alive
|
||||
|
||||
if len(elem.packet) == 0 {
|
||||
logDebug.Println("Received keep-alive from", peer.String())
|
||||
logDebug.Println(peer, ": Received keep-alive")
|
||||
continue
|
||||
}
|
||||
peer.TimerDataReceived()
|
||||
|
@ -634,7 +637,7 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
|||
if device.routing.table.LookupIPv4(src) != peer {
|
||||
logInfo.Println(
|
||||
"IPv4 packet with disallowed source address from",
|
||||
peer.String(),
|
||||
peer,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
@ -661,14 +664,14 @@ func (peer *Peer) RoutineSequentialReceiver() {
|
|||
src := elem.packet[IPv6offsetSrc : IPv6offsetSrc+net.IPv6len]
|
||||
if device.routing.table.LookupIPv6(src) != peer {
|
||||
logInfo.Println(
|
||||
"IPv6 packet with disallowed source address from",
|
||||
peer.String(),
|
||||
peer,
|
||||
"sent packet with disallowed IPv6 source",
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
default:
|
||||
logInfo.Println("Packet with invalid IP version from", peer.String())
|
||||
logInfo.Println("Packet with invalid IP version from", peer)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
22
send.go
22
send.go
|
@ -45,7 +45,7 @@ type QueueOutboundElement struct {
|
|||
peer *Peer // related peer
|
||||
}
|
||||
|
||||
func (peer *Peer) FlushNonceQueue() {
|
||||
func (peer *Peer) flushNonceQueue() {
|
||||
elems := len(peer.queue.nonce)
|
||||
for i := 0; i < elems; i++ {
|
||||
select {
|
||||
|
@ -175,7 +175,7 @@ func (device *Device) RoutineReadFromTUN() {
|
|||
// insert into nonce/pre-handshake queue
|
||||
|
||||
if peer.isRunning.Get() {
|
||||
peer.timer.handshakeDeadline.Reset(RekeyAttemptTime)
|
||||
peer.event.handshakePushDeadline.Fire()
|
||||
addToOutboundQueue(peer.queue.nonce, elem)
|
||||
elem = device.NewOutboundElement()
|
||||
}
|
||||
|
@ -196,11 +196,11 @@ func (peer *Peer) RoutineNonce() {
|
|||
|
||||
defer func() {
|
||||
peer.routines.stopping.Done()
|
||||
logDebug.Println(peer.String() + ": Routine: nonce worker - stopped")
|
||||
logDebug.Println(peer, ": Routine: nonce worker - stopped")
|
||||
}()
|
||||
|
||||
peer.routines.starting.Done()
|
||||
logDebug.Println(peer.String() + ": Routine: nonce worker - started")
|
||||
logDebug.Println(peer, ": Routine: nonce worker - started")
|
||||
|
||||
for {
|
||||
NextPacket:
|
||||
|
@ -226,14 +226,12 @@ func (peer *Peer) RoutineNonce() {
|
|||
|
||||
peer.signal.handshakeBegin.Send()
|
||||
|
||||
logDebug.Println(peer.String() + ": Awaiting key-pair")
|
||||
logDebug.Println(peer, ": Awaiting key-pair")
|
||||
|
||||
select {
|
||||
case <-peer.signal.newKeyPair.Wait():
|
||||
logDebug.Println(peer.String() + ": Obtained awaited key-pair")
|
||||
case <-peer.signal.flushNonceQueue.Wait():
|
||||
logDebug.Println(peer.String() + ": Flushing nonce queue")
|
||||
peer.FlushNonceQueue()
|
||||
logDebug.Println(peer, ": Obtained awaited key-pair")
|
||||
case <-peer.signal.flushNonceQueue:
|
||||
goto NextPacket
|
||||
case <-peer.routines.stop.Wait():
|
||||
return
|
||||
|
@ -352,10 +350,10 @@ func (peer *Peer) RoutineSequentialSender() {
|
|||
|
||||
defer func() {
|
||||
peer.routines.stopping.Done()
|
||||
logDebug.Println(peer.String() + ": Routine: sequential sender - stopped")
|
||||
logDebug.Println(peer, ": Routine: sequential sender - stopped")
|
||||
}()
|
||||
|
||||
logDebug.Println(peer.String() + ": Routine: sequential sender - started")
|
||||
logDebug.Println(peer, ": Routine: sequential sender - started")
|
||||
|
||||
peer.routines.starting.Done()
|
||||
|
||||
|
@ -382,7 +380,7 @@ func (peer *Peer) RoutineSequentialSender() {
|
|||
err := peer.SendBuffer(elem.packet)
|
||||
device.PutMessageBuffer(elem.buffer)
|
||||
if err != nil {
|
||||
logDebug.Println("Failed to send authenticated packet to peer", peer.String())
|
||||
logDebug.Println("Failed to send authenticated packet to peer", peer)
|
||||
continue
|
||||
}
|
||||
atomic.AddUint64(&peer.stats.txBytes, length)
|
||||
|
|
|
@ -1,5 +1,12 @@
|
|||
package main
|
||||
|
||||
func signalSend(s chan<- struct{}) {
|
||||
select {
|
||||
case s <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
type Signal struct {
|
||||
enabled AtomicBool
|
||||
C chan struct{}
|
||||
|
|
65
timer.go
65
timer.go
|
@ -1,65 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Timer struct {
|
||||
mutex sync.Mutex
|
||||
pending bool
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
/* Starts the timer if not already pending
|
||||
*/
|
||||
func (t *Timer) Start(dur time.Duration) bool {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
|
||||
started := !t.pending
|
||||
if started {
|
||||
t.timer.Reset(dur)
|
||||
}
|
||||
return started
|
||||
}
|
||||
|
||||
func (t *Timer) Stop() {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
|
||||
t.timer.Stop()
|
||||
select {
|
||||
case <-t.timer.C:
|
||||
default:
|
||||
}
|
||||
t.pending = false
|
||||
}
|
||||
|
||||
func (t *Timer) Pending() bool {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
|
||||
return t.pending
|
||||
}
|
||||
|
||||
func (t *Timer) Reset(dur time.Duration) {
|
||||
t.mutex.Lock()
|
||||
defer t.mutex.Unlock()
|
||||
t.timer.Reset(dur)
|
||||
}
|
||||
|
||||
func (t *Timer) Wait() <-chan time.Time {
|
||||
return t.timer.C
|
||||
}
|
||||
|
||||
func NewTimer() (t Timer) {
|
||||
t.pending = false
|
||||
t.timer = time.NewTimer(time.Hour)
|
||||
t.timer.Stop()
|
||||
select {
|
||||
case <-t.timer.C:
|
||||
default:
|
||||
}
|
||||
return
|
||||
}
|
141
timers.go
141
timers.go
|
@ -10,8 +10,6 @@ import (
|
|||
|
||||
/* NOTE:
|
||||
* Notion of validity
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
/* Called when a new authenticated message has been send
|
||||
|
@ -75,8 +73,7 @@ func (peer *Peer) SendKeepAlive() bool {
|
|||
* Sent non-empty (authenticated) transport message
|
||||
*/
|
||||
func (peer *Peer) TimerDataSent() {
|
||||
peer.timer.keepalivePassive.Stop()
|
||||
peer.timer.handshakeNew.Start(NewHandshakeTime)
|
||||
peer.event.dataSent.Fire()
|
||||
}
|
||||
|
||||
/* Event:
|
||||
|
@ -86,16 +83,19 @@ func (peer *Peer) TimerDataSent() {
|
|||
* Set a timer to confirm the message using a keep-alive (if not already set)
|
||||
*/
|
||||
func (peer *Peer) TimerDataReceived() {
|
||||
if !peer.timer.keepalivePassive.Start(KeepaliveTimeout) {
|
||||
peer.timer.needAnotherKeepalive.Set(true)
|
||||
}
|
||||
peer.event.dataReceived.Fire()
|
||||
/*
|
||||
if !peer.timer.keepalivePassive.Start(KeepaliveTimeout) {
|
||||
peer.timer.needAnotherKeepalive.Set(true)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
/* Event:
|
||||
* Any (authenticated) packet received
|
||||
*/
|
||||
func (peer *Peer) TimerAnyAuthenticatedPacketReceived() {
|
||||
peer.timer.handshakeNew.Stop()
|
||||
peer.event.anyAuthenticatedPacketReceived.Fire()
|
||||
}
|
||||
|
||||
/* Event:
|
||||
|
@ -105,11 +105,7 @@ func (peer *Peer) TimerAnyAuthenticatedPacketReceived() {
|
|||
* Push persistent keep-alive into the future
|
||||
*/
|
||||
func (peer *Peer) TimerAnyAuthenticatedPacketTraversal() {
|
||||
interval := peer.persistentKeepaliveInterval
|
||||
if interval > 0 {
|
||||
duration := time.Duration(interval) * time.Second
|
||||
peer.timer.keepalivePersistent.Reset(duration)
|
||||
}
|
||||
peer.event.anyAuthenticatedPacketTraversal.Fire()
|
||||
}
|
||||
|
||||
/* Called after successfully completing a handshake.
|
||||
|
@ -120,7 +116,7 @@ func (peer *Peer) TimerAnyAuthenticatedPacketTraversal() {
|
|||
*/
|
||||
func (peer *Peer) TimerHandshakeComplete() {
|
||||
peer.signal.handshakeCompleted.Send()
|
||||
peer.device.log.Info.Println(peer.String() + ": New handshake completed")
|
||||
peer.device.log.Info.Println(peer, ": New handshake completed")
|
||||
}
|
||||
|
||||
/* Event:
|
||||
|
@ -136,7 +132,8 @@ func (peer *Peer) TimerHandshakeComplete() {
|
|||
* upon failure to complete a handshake
|
||||
*/
|
||||
func (peer *Peer) TimerEphemeralKeyCreated() {
|
||||
peer.timer.zeroAllKeys.Reset(RejectAfterTime * 3)
|
||||
peer.event.ephemeralKeyCreated.Fire()
|
||||
// peer.timer.zeroAllKeys.Reset(RejectAfterTime * 3)
|
||||
}
|
||||
|
||||
/* Sends a new handshake initiation message to the peer (endpoint)
|
||||
|
@ -171,16 +168,15 @@ func (peer *Peer) sendNewHandshake() error {
|
|||
peer.signal.handshakeCompleted.Enable()
|
||||
}
|
||||
|
||||
// set timeout
|
||||
|
||||
jitter := time.Millisecond * time.Duration(rand.Uint32()%334)
|
||||
|
||||
peer.timer.keepalivePassive.Stop()
|
||||
peer.timer.handshakeTimeout.Reset(RekeyTimeout + jitter)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func newTimer() *time.Timer {
|
||||
timer := time.NewTimer(time.Hour)
|
||||
timer.Stop()
|
||||
return timer
|
||||
}
|
||||
|
||||
func (peer *Peer) RoutineTimerHandler() {
|
||||
|
||||
device := peer.device
|
||||
|
@ -189,24 +185,28 @@ func (peer *Peer) RoutineTimerHandler() {
|
|||
logDebug := device.log.Debug
|
||||
|
||||
defer func() {
|
||||
logDebug.Println(peer.String() + ": Routine: timer handler - stopped")
|
||||
logDebug.Println(peer, ": Routine: timer handler - stopped")
|
||||
peer.routines.stopping.Done()
|
||||
}()
|
||||
|
||||
logDebug.Println(peer.String() + ": Routine: timer handler - started")
|
||||
logDebug.Println(peer, ": Routine: timer handler - started")
|
||||
|
||||
// reset all timers
|
||||
|
||||
peer.timer.keepalivePassive.Stop()
|
||||
peer.timer.handshakeDeadline.Stop()
|
||||
peer.timer.handshakeTimeout.Stop()
|
||||
peer.timer.handshakeNew.Stop()
|
||||
peer.timer.zeroAllKeys.Stop()
|
||||
pendingHandshakeNew := false
|
||||
pendingKeepalivePassive := false
|
||||
|
||||
timerKeepalivePassive := newTimer()
|
||||
timerHandshakeDeadline := newTimer()
|
||||
timerHandshakeTimeout := newTimer()
|
||||
timerHandshakeNew := newTimer()
|
||||
timerZeroAllKeys := newTimer()
|
||||
timerKeepalivePersistent := newTimer()
|
||||
|
||||
interval := peer.persistentKeepaliveInterval
|
||||
if interval > 0 {
|
||||
duration := time.Duration(interval) * time.Second
|
||||
peer.timer.keepalivePersistent.Reset(duration)
|
||||
timerKeepalivePersistent.Reset(duration)
|
||||
}
|
||||
|
||||
// signal synchronised setup complete
|
||||
|
@ -223,34 +223,56 @@ func (peer *Peer) RoutineTimerHandler() {
|
|||
case <-peer.routines.stop.Wait():
|
||||
return
|
||||
|
||||
/* events */
|
||||
|
||||
case <-peer.event.dataSent.C:
|
||||
timerKeepalivePassive.Stop()
|
||||
if !pendingHandshakeNew {
|
||||
timerHandshakeNew.Reset(NewHandshakeTime)
|
||||
}
|
||||
|
||||
case <-peer.event.dataReceived.C:
|
||||
if pendingKeepalivePassive {
|
||||
peer.timer.needAnotherKeepalive.Set(true) // TODO: make local
|
||||
} else {
|
||||
timerKeepalivePassive.Reset(KeepaliveTimeout)
|
||||
}
|
||||
|
||||
case <-peer.event.anyAuthenticatedPacketTraversal.C:
|
||||
interval := peer.persistentKeepaliveInterval
|
||||
if interval > 0 {
|
||||
duration := time.Duration(interval) * time.Second
|
||||
timerKeepalivePersistent.Reset(duration)
|
||||
}
|
||||
|
||||
/* timers */
|
||||
|
||||
// keep-alive
|
||||
|
||||
case <-peer.timer.keepalivePersistent.Wait():
|
||||
case <-timerKeepalivePersistent.C:
|
||||
|
||||
interval := peer.persistentKeepaliveInterval
|
||||
if interval > 0 {
|
||||
logDebug.Println(peer.String() + ": Send keep-alive (persistent)")
|
||||
peer.timer.keepalivePassive.Stop()
|
||||
logDebug.Println(peer, ": Send keep-alive (persistent)")
|
||||
timerKeepalivePassive.Stop()
|
||||
peer.SendKeepAlive()
|
||||
}
|
||||
|
||||
case <-peer.timer.keepalivePassive.Wait():
|
||||
case <-timerKeepalivePassive.C:
|
||||
|
||||
logDebug.Println(peer.String() + ": Send keep-alive (passive)")
|
||||
logDebug.Println(peer, ": Send keep-alive (passive)")
|
||||
|
||||
peer.SendKeepAlive()
|
||||
|
||||
if peer.timer.needAnotherKeepalive.Swap(false) {
|
||||
peer.timer.keepalivePassive.Reset(KeepaliveTimeout)
|
||||
timerKeepalivePassive.Reset(KeepaliveTimeout)
|
||||
}
|
||||
|
||||
// clear key material timer
|
||||
|
||||
case <-peer.timer.zeroAllKeys.Wait():
|
||||
case <-timerZeroAllKeys.C:
|
||||
|
||||
logDebug.Println(peer.String() + ": Clear all key-material (timer event)")
|
||||
logDebug.Println(peer, ": Clear all key-material (timer event)")
|
||||
|
||||
hs := &peer.handshake
|
||||
hs.mutex.Lock()
|
||||
|
@ -282,11 +304,11 @@ func (peer *Peer) RoutineTimerHandler() {
|
|||
|
||||
// handshake timers
|
||||
|
||||
case <-peer.timer.handshakeNew.Wait():
|
||||
logInfo.Println(peer.String() + ": Retrying handshake (timer event)")
|
||||
case <-timerHandshakeNew.C:
|
||||
logInfo.Println(peer, ": Retrying handshake (timer event)")
|
||||
peer.signal.handshakeBegin.Send()
|
||||
|
||||
case <-peer.timer.handshakeTimeout.Wait():
|
||||
case <-timerHandshakeTimeout.C:
|
||||
|
||||
// clear source (in case this is causing problems)
|
||||
|
||||
|
@ -300,20 +322,27 @@ func (peer *Peer) RoutineTimerHandler() {
|
|||
|
||||
err := peer.sendNewHandshake()
|
||||
|
||||
// set timeout
|
||||
|
||||
jitter := time.Millisecond * time.Duration(rand.Uint32()%334)
|
||||
timerKeepalivePassive.Stop()
|
||||
timerHandshakeTimeout.Reset(RekeyTimeout + jitter)
|
||||
|
||||
if err != nil {
|
||||
logInfo.Println(peer.String()+": Failed to send handshake initiation", err)
|
||||
logInfo.Println(peer, ": Failed to send handshake initiation", err)
|
||||
} else {
|
||||
logDebug.Println(peer.String() + ": Send handshake initiation (subsequent)")
|
||||
logDebug.Println(peer, ": Send handshake initiation (subsequent)")
|
||||
}
|
||||
|
||||
case <-peer.timer.handshakeDeadline.Wait():
|
||||
case <-timerHandshakeDeadline.C:
|
||||
|
||||
// clear all queued packets and stop keep-alive
|
||||
|
||||
logInfo.Println(peer.String() + ": Handshake negotiation timed-out")
|
||||
logInfo.Println(peer, ": Handshake negotiation timed-out")
|
||||
|
||||
peer.signal.flushNonceQueue.Send()
|
||||
peer.timer.keepalivePersistent.Stop()
|
||||
peer.flushNonceQueue()
|
||||
signalSend(peer.signal.flushNonceQueue)
|
||||
timerKeepalivePersistent.Stop()
|
||||
peer.signal.handshakeBegin.Enable()
|
||||
|
||||
/* signals */
|
||||
|
@ -324,25 +353,31 @@ func (peer *Peer) RoutineTimerHandler() {
|
|||
|
||||
err := peer.sendNewHandshake()
|
||||
|
||||
// set timeout
|
||||
|
||||
jitter := time.Millisecond * time.Duration(rand.Uint32()%334)
|
||||
timerKeepalivePassive.Stop()
|
||||
timerHandshakeTimeout.Reset(RekeyTimeout + jitter)
|
||||
|
||||
if err != nil {
|
||||
logInfo.Println(peer.String()+": Failed to send handshake initiation", err)
|
||||
logInfo.Println(peer, ": Failed to send handshake initiation", err)
|
||||
} else {
|
||||
logDebug.Println(peer.String() + ": Send handshake initiation (initial)")
|
||||
logDebug.Println(peer, ": Send handshake initiation (initial)")
|
||||
}
|
||||
|
||||
peer.timer.handshakeDeadline.Reset(RekeyAttemptTime)
|
||||
timerHandshakeDeadline.Reset(RekeyAttemptTime)
|
||||
|
||||
case <-peer.signal.handshakeCompleted.Wait():
|
||||
|
||||
logInfo.Println(peer.String() + ": Handshake completed")
|
||||
logInfo.Println(peer, ": Handshake completed")
|
||||
|
||||
atomic.StoreInt64(
|
||||
&peer.stats.lastHandshakeNano,
|
||||
time.Now().UnixNano(),
|
||||
)
|
||||
|
||||
peer.timer.handshakeTimeout.Stop()
|
||||
peer.timer.handshakeDeadline.Stop()
|
||||
timerHandshakeTimeout.Stop()
|
||||
timerHandshakeDeadline.Stop()
|
||||
peer.signal.handshakeBegin.Enable()
|
||||
|
||||
peer.timer.sendLastMinuteHandshake.Set(false)
|
||||
|
|
20
uapi.go
20
uapi.go
|
@ -248,12 +248,10 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
logError.Println("Failed to create new peer:", err)
|
||||
return &IPCError{Code: ipcErrorInvalid}
|
||||
}
|
||||
logDebug.Println("UAPI: Created new peer:", peer.String())
|
||||
logDebug.Println("UAPI: Created new peer:", peer)
|
||||
}
|
||||
|
||||
peer.mutex.Lock()
|
||||
peer.timer.handshakeDeadline.Reset(RekeyAttemptTime)
|
||||
peer.mutex.Unlock()
|
||||
peer.event.handshakePushDeadline.Fire()
|
||||
|
||||
case "remove":
|
||||
|
||||
|
@ -264,7 +262,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
return &IPCError{Code: ipcErrorInvalid}
|
||||
}
|
||||
if !dummy {
|
||||
logDebug.Println("UAPI: Removing peer:", peer.String())
|
||||
logDebug.Println("UAPI: Removing peer:", peer)
|
||||
device.RemovePeer(peer.handshake.remoteStatic)
|
||||
}
|
||||
peer = &Peer{}
|
||||
|
@ -274,7 +272,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
|
||||
// update PSK
|
||||
|
||||
logDebug.Println("UAPI: Updating pre-shared key for peer:", peer.String())
|
||||
logDebug.Println("UAPI: Updating pre-shared key for peer:", peer)
|
||||
|
||||
peer.handshake.mutex.Lock()
|
||||
err := peer.handshake.presharedKey.FromHex(value)
|
||||
|
@ -289,7 +287,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
|
||||
// set endpoint destination
|
||||
|
||||
logDebug.Println("UAPI: Updating endpoint for peer:", peer.String())
|
||||
logDebug.Println("UAPI: Updating endpoint for peer:", peer)
|
||||
|
||||
err := func() error {
|
||||
peer.mutex.Lock()
|
||||
|
@ -299,7 +297,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
return err
|
||||
}
|
||||
peer.endpoint = endpoint
|
||||
peer.timer.handshakeDeadline.Reset(RekeyAttemptTime)
|
||||
peer.event.handshakePushDeadline.Fire()
|
||||
return nil
|
||||
}()
|
||||
|
||||
|
@ -312,7 +310,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
|
||||
// update keep-alive interval
|
||||
|
||||
logDebug.Println("UAPI: Updating persistent_keepalive_interval for peer:", peer.String())
|
||||
logDebug.Println("UAPI: Updating persistent_keepalive_interval for peer:", peer)
|
||||
|
||||
secs, err := strconv.ParseUint(value, 10, 16)
|
||||
if err != nil {
|
||||
|
@ -337,7 +335,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
|
||||
case "replace_allowed_ips":
|
||||
|
||||
logDebug.Println("UAPI: Removing all allowed IPs for peer:", peer.String())
|
||||
logDebug.Println("UAPI: Removing all allowed IPs for peer:", peer)
|
||||
|
||||
if value != "true" {
|
||||
logError.Println("Failed to set replace_allowed_ips, invalid value:", value)
|
||||
|
@ -354,7 +352,7 @@ func ipcSetOperation(device *Device, socket *bufio.ReadWriter) *IPCError {
|
|||
|
||||
case "allowed_ip":
|
||||
|
||||
logDebug.Println("UAPI: Adding allowed_ip to peer:", peer.String())
|
||||
logDebug.Println("UAPI: Adding allowed_ip to peer:", peer)
|
||||
|
||||
_, network, err := net.ParseCIDR(value)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in a new issue