diff --git a/src/conn_default.go b/src/conn_default.go new file mode 100644 index 0000000..a6dc97d --- /dev/null +++ b/src/conn_default.go @@ -0,0 +1,11 @@ +// +build !linux + +package main + +import ( + "net" +) + +func setFwmark(conn *net.UDPConn, value int) error { + return nil +} diff --git a/src/conn_linux.go b/src/conn_linux.go new file mode 100644 index 0000000..b04471c --- /dev/null +++ b/src/conn_linux.go @@ -0,0 +1,24 @@ +package main + +import ( + "golang.org/x/sys/unix" + "net" +) + +func setMark(conn *net.UDPConn, value int) error { + if conn == nil || value == 0 { + return nil + } + + file, err := conn.File() + if err != nil { + return err + } + + return unix.SetsockoptInt( + int(file.Fd()), + unix.SOL_SOCKET, + unix.SO_MARK, + value, + ) +} diff --git a/src/constants.go b/src/constants.go index 5ee8646..71dd98e 100644 --- a/src/constants.go +++ b/src/constants.go @@ -30,12 +30,14 @@ const ( QueueOutboundSize = 1024 QueueInboundSize = 1024 QueueHandshakeSize = 1024 - MinMessageSize = MessageTransportSize // size of keep-alive - MaxMessageSize = ((1 << 16) - 1) + MessageTransportHeaderSize - MaxPeers = 1 << 16 + MaxSegmentSize = (1 << 16) - 1 // largest possible UDP datagram + MinMessageSize = MessageKeepaliveSize // minimum size of transport message (keepalive) + MaxMessageSize = MaxSegmentSize // maximum size of transport message + MaxContentSize = MaxSegmentSize - MessageTransportSize // maximum size of transport message content ) const ( UnderLoadQueueSize = QueueHandshakeSize / 8 UnderLoadAfterTime = time.Second // how long does the device remain under load after detected + MaxPeers = 1 << 16 // maximum number of configured peers ) diff --git a/src/noise_protocol.go b/src/noise_protocol.go index 0d78c84..1f1301e 100644 --- a/src/noise_protocol.go +++ b/src/noise_protocol.go @@ -32,12 +32,13 @@ const ( ) const ( - MessageInitiationSize = 148 - MessageResponseSize = 92 - MessageCookieReplySize = 64 - MessageTransportHeaderSize = 16 + MessageInitiationSize = 148 // size of handshake initation message + MessageResponseSize = 92 // size of response message + MessageCookieReplySize = 64 // size of cookie reply message + MessageTransportHeaderSize = 16 // size of data preceeding content in transport message MessageTransportSize = MessageTransportHeaderSize + poly1305.TagSize // size of empty transport - MessageKeepaliveSize = MessageTransportSize + MessageKeepaliveSize = MessageTransportSize // size of keepalive + MessageHandshakeSize = MessageInitiationSize // size of largest handshake releated message ) const ( diff --git a/src/receive.go b/src/receive.go index 4c76bbf..ca7bb6e 100644 --- a/src/receive.go +++ b/src/receive.go @@ -54,6 +54,26 @@ func (device *Device) addToInboundQueue( } } +func (device *Device) addToDecryptionQueue( + queue chan *QueueInboundElement, + element *QueueInboundElement, +) { + for { + select { + case queue <- element: + return + default: + select { + case old := <-queue: + // drop & release to potential consumer + old.Drop() + old.mutex.Unlock() + default: + } + } + } +} + func (device *Device) addToHandshakeQueue( queue chan QueueHandshakeElement, element QueueHandshakeElement, @@ -167,7 +187,7 @@ func (device *Device) RoutineReceiveIncomming() { // add to decryption queues - device.addToInboundQueue(device.queue.decryption, elem) + device.addToDecryptionQueue(device.queue.decryption, elem) device.addToInboundQueue(peer.queue.inbound, elem) buffer = device.GetMessageBuffer() continue @@ -218,7 +238,6 @@ func (device *Device) RoutineDecryption() { // check if dropped if elem.IsDropped() { - elem.mutex.Unlock() // TODO: Make consistent with send continue } @@ -256,7 +275,7 @@ func (device *Device) RoutineHandshake() { logDebug := device.log.Debug logDebug.Println("Routine, handshake routine, started for device") - var temp [256]byte + var temp [MessageHandshakeSize]byte var elem QueueHandshakeElement for { diff --git a/src/send.go b/src/send.go index 0de3c0a..7d4014a 100644 --- a/src/send.go +++ b/src/send.go @@ -101,6 +101,7 @@ func addToEncryptionQueue( default: select { case old := <-queue: + // drop & release to potential consumer old.Drop() old.mutex.Unlock() default: @@ -137,19 +138,16 @@ func (peer *Peer) SendBuffer(buffer []byte) (int, error) { */ func (device *Device) RoutineReadFromTUN() { - var elem *QueueOutboundElement + elem := device.NewOutboundElement() logDebug := device.log.Debug logError := device.log.Error - logDebug.Println("Routine, TUN Reader: started") + logDebug.Println("Routine, TUN Reader started") for { - // read packet - if elem == nil { - elem = device.NewOutboundElement() - } + // read packet elem.packet = elem.buffer[MessageTransportHeaderSize:] size, err := device.tun.device.Read(elem.packet) @@ -159,7 +157,7 @@ func (device *Device) RoutineReadFromTUN() { return } - if size == 0 { + if size == 0 || size > MaxContentSize { continue } @@ -191,7 +189,7 @@ func (device *Device) RoutineReadFromTUN() { continue } - // check if known endpoint + // check if known endpoint (drop early) peer.mutex.RLock() if peer.endpoint == nil { @@ -205,8 +203,7 @@ func (device *Device) RoutineReadFromTUN() { signalSend(peer.signal.handshakeReset) addToOutboundQueue(peer.queue.nonce, elem) - elem = nil - + elem = device.NewOutboundElement() } }