From b1da55ab5ef3daf5cce90ba71ea9e3b6933eda16 Mon Sep 17 00:00:00 2001 From: NNdroid <99177648+NNdroid@users.noreply.github.com> Date: Mon, 12 Dec 2022 14:10:55 +0800 Subject: [PATCH] Optimise quic --- quic/quicclient.go | 40 +++++++++++++++++++++++++++++----------- quic/quicserver.go | 44 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/quic/quicclient.go b/quic/quicclient.go index fcd50ec..ea43e5c 100644 --- a/quic/quicclient.go +++ b/quic/quicclient.go @@ -11,6 +11,8 @@ import ( "github.com/net-byte/vtun/common/netutil" "github.com/net-byte/water" "log" + "sync" + "time" ) // StartClient starts the quic client @@ -23,26 +25,34 @@ func StartClient(iFace *water.Interface, config config.Config) { if config.TLSSni != "" { tlsConfig.ServerName = config.TLSSni } + var wg sync.WaitGroup for { conn, err := quic.DialAddr(config.ServerAddr, tlsConfig, nil) if err != nil { log.Panic(err) } - stream, err := conn.OpenStreamSync(context.Background()) - if err != nil { - netutil.PrintErr(err, config.Verbose) - continue + for { + stream, err := conn.OpenStreamSync(context.Background()) + if err != nil { + netutil.PrintErr(err, config.Verbose) + break + } + go tunToQuic(config, stream, iFace, &wg) + quicToTun(config, stream, iFace, &wg) } - go tunToQuic(config, stream, iFace) - quicToTun(config, stream, iFace) + wg.Wait() } } // tunToQuic sends packets from tun to quic -func tunToQuic(config config.Config, stream quic.Stream, iFace *water.Interface) { +func tunToQuic(config config.Config, stream quic.Stream, iFace *water.Interface, wg *sync.WaitGroup) { + wg.Add(1) packet := make([]byte, config.BufferSize) shb := make([]byte, 2) - defer stream.Close() + defer func() { + stream.Close() + wg.Done() + }() for { shn, err := iFace.Read(packet) if err != nil { @@ -60,6 +70,7 @@ func tunToQuic(config config.Config, stream quic.Stream, iFace *water.Interface) shb[1] = byte(shn & 0xff) copy(packet[len(shb):len(shb)+len(b)], b) copy(packet[:len(shb)], shb) + stream.SetWriteDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) n, err := stream.Write(packet[:len(shb)+len(b)]) if err != nil { netutil.PrintErr(err, config.Verbose) @@ -70,18 +81,23 @@ func tunToQuic(config config.Config, stream quic.Stream, iFace *water.Interface) } // quicToTun sends packets from quic to tun -func quicToTun(config config.Config, stream quic.Stream, iFace *water.Interface) { +func quicToTun(config config.Config, stream quic.Stream, iFace *water.Interface, wg *sync.WaitGroup) { + wg.Add(1) packet := make([]byte, config.BufferSize) shb := make([]byte, 2) - defer stream.Close() + defer func() { + stream.Close() + wg.Done() + }() for { + stream.SetReadDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) n, err := stream.Read(shb) if err != nil { netutil.PrintErr(err, config.Verbose) break } if n < 2 { - continue + break } shn := 0 shn = ((shn & 0x00) | int(shb[0])) << 8 @@ -89,6 +105,7 @@ func quicToTun(config config.Config, stream quic.Stream, iFace *water.Interface) splitSize := 99 var count = 0 if shn < splitSize { + stream.SetReadDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) n, err = stream.Read(packet[:shn]) if err != nil { netutil.PrintErr(err, config.Verbose) @@ -101,6 +118,7 @@ func quicToTun(config config.Config, stream quic.Stream, iFace *water.Interface) if shn-count < splitSize { receiveSize = shn - count } + stream.SetReadDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) n, err = stream.Read(packet[count : count+receiveSize]) if err != nil { netutil.PrintErr(err, config.Verbose) diff --git a/quic/quicserver.go b/quic/quicserver.go index 74866e4..e4667e3 100644 --- a/quic/quicserver.go +++ b/quic/quicserver.go @@ -11,7 +11,6 @@ import ( "github.com/net-byte/vtun/common/counter" "github.com/net-byte/vtun/common/netutil" "github.com/net-byte/water" - "io" "log" "time" ) @@ -36,6 +35,7 @@ func StartServer(iface *water.Interface, config config.Config) { for { conn, err := listener.Accept(context.Background()) if err != nil { + netutil.PrintErr(err, config.Verbose) continue } go func() { @@ -43,11 +43,12 @@ func StartServer(iface *water.Interface, config config.Config) { stream, err := conn.AcceptStream(context.Background()) if err != nil { netutil.PrintErr(err, config.Verbose) - return + break } //client -> server toServer(config, stream, iface) } + conn.CloseWithError(quic.ApplicationErrorCode(0x01), "closed") }() } } @@ -60,7 +61,7 @@ func toClient(config config.Config, iFace *water.Interface) { shn, err := iFace.Read(packet) if err != nil { netutil.PrintErr(err, config.Verbose) - break + continue } shb[0] = byte(shn >> 8 & 0xff) shb[1] = byte(shn & 0xff) @@ -73,13 +74,15 @@ func toClient(config config.Config, iFace *water.Interface) { if config.Compress { b = snappy.Encode(nil, b) } - stream := v.(quic.Stream) copy(packet[len(shb):len(shb)+len(b)], b) copy(packet[:len(shb)], shb) + stream := v.(quic.Stream) + stream.SetWriteDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) n, err := stream.Write(packet[:len(shb)+len(b)]) if err != nil { + cache.GetCache().Delete(key) netutil.PrintErr(err, config.Verbose) - break + continue } counter.IncrWrittenBytes(n) } @@ -93,6 +96,7 @@ func toServer(config config.Config, stream quic.Stream, iface *water.Interface) shb := make([]byte, 2) defer stream.Close() for { + stream.SetReadDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) n, err := stream.Read(shb) if err != nil { netutil.PrintErr(err, config.Verbose) @@ -104,12 +108,32 @@ func toServer(config config.Config, stream quic.Stream, iface *water.Interface) shn := 0 shn = ((shn & 0x00) | int(shb[0])) << 8 shn = shn | int(shb[1]) - n, err = stream.Read(packet[:shn]) - if err == io.EOF || err != nil { - netutil.PrintErr(err, config.Verbose) - break + splitSize := 99 + var count = 0 + if shn < splitSize { + stream.SetReadDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) + n, err = stream.Read(packet[:shn]) + if err != nil { + netutil.PrintErr(err, config.Verbose) + break + } + count = n + } else { + for count < shn { + receiveSize := splitSize + if shn-count < splitSize { + receiveSize = shn - count + } + stream.SetReadDeadline(time.Now().Add(time.Duration(config.Timeout) * time.Second)) + n, err = stream.Read(packet[count : count+receiveSize]) + if err != nil { + netutil.PrintErr(err, config.Verbose) + break + } + count += n + } } - b := packet[:n] + b := packet[:shn] if config.Compress { b, err = snappy.Decode(nil, b) if err != nil {