From 82ca15f70eacdb57167757424704f7d82c5e1a06 Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Wed, 23 Oct 2024 16:44:35 +0100 Subject: [PATCH] add sessionCache, use it to store sessions. --- pkg/udpproxy/sessioncache.go | 41 ++++++++++++++++++++++++++++++++++++ pkg/udpproxy/udpproxy.go | 36 ++++++++++++------------------- 2 files changed, 55 insertions(+), 22 deletions(-) create mode 100644 pkg/udpproxy/sessioncache.go diff --git a/pkg/udpproxy/sessioncache.go b/pkg/udpproxy/sessioncache.go new file mode 100644 index 0000000..b7d9927 --- /dev/null +++ b/pkg/udpproxy/sessioncache.go @@ -0,0 +1,41 @@ +package udpproxy + +import "sync" + +// sessionCache tracks connection sessions +type sessionCache struct { + mu sync.RWMutex + data map[string]*session +} + +// newSessionCache creates a usable sessionCache. +func newSessionCache() sessionCache { + return sessionCache{ + data: make(map[string]*session), + } +} + +// Read returns the associated session for an addr +func (sc *sessionCache) Read(addr string) (*session, bool) { + sc.mu.RLock() + defer sc.mu.RUnlock() + + v, ok := sc.data[addr] + return v, ok +} + +// Upsert overrides the session for a given addr. +func (sc *sessionCache) Upsert(addr string, session *session) { + sc.mu.Lock() + defer sc.mu.Unlock() + + sc.data[addr] = session +} + +// Delete removes the session for the given addr. +func (sc *sessionCache) Delete(addr string) { + sc.mu.Lock() + defer sc.mu.Unlock() + + delete(sc.data, addr) +} diff --git a/pkg/udpproxy/udpproxy.go b/pkg/udpproxy/udpproxy.go index 3b5d052..69a318d 100644 --- a/pkg/udpproxy/udpproxy.go +++ b/pkg/udpproxy/udpproxy.go @@ -2,7 +2,6 @@ package udpproxy import ( "net" - "sync" "time" log "github.com/sirupsen/logrus" @@ -11,15 +10,15 @@ import ( // Option is a functional option type that allows us to configure the Client. type Option func(*Client) -// WithStaleTimeout is a functional option to set the stale session timeout -func WithStaleTimeout(timeout time.Duration) Option { +// WithSessionTimeout is a functional option to set the session timeout +func WithSessionTimeout(timeout time.Duration) Option { return func(c *Client) { if timeout < time.Minute { log.Warnf("cannot set stale session timeout to less than 1 minute.. defaulting to 5 minutes") return } - c.timeout = timeout + c.sessionTimeout = timeout } } @@ -29,10 +28,8 @@ type Client struct { proxyConn *net.UDPConn - mutex sync.RWMutex - sessions map[string]*session - - timeout time.Duration + sessionCache sessionCache + sessionTimeout time.Duration } func New(port, target string, options ...Option) (*Client, error) { @@ -47,11 +44,10 @@ func New(port, target string, options ...Option) (*Client, error) { } c := &Client{ - laddr: laddr, - raddr: raddr, - mutex: sync.RWMutex{}, - sessions: map[string]*session{}, - timeout: 5 * time.Minute, + laddr: laddr, + raddr: raddr, + sessionCache: newSessionCache(), + sessionTimeout: 5 * time.Minute, } for _, o := range options { @@ -77,7 +73,7 @@ func (c *Client) ListenAndServe() error { log.Error(err) } - session, ok := c.sessions[caddr.String()] + session, ok := c.sessionCache.Read(caddr.String()) if !ok { session, err = newSession(caddr, c.raddr, c.proxyConn) if err != nil { @@ -85,7 +81,7 @@ func (c *Client) ListenAndServe() error { continue } - c.sessions[caddr.String()] = session + c.sessionCache.Upsert(caddr.String(), session) } go session.proxyTo(buf[:n]) @@ -95,16 +91,12 @@ func (c *Client) ListenAndServe() error { func (c *Client) pruneSessions() { ticker := time.NewTicker(1 * time.Minute) - // the locks here could be abusive and i dont even know if this is a real - // problem but we definitely need to clean up stale sessions for range ticker.C { - for _, session := range c.sessions { - c.mutex.RLock() - if time.Since(session.updateTime) > c.timeout { - delete(c.sessions, session.caddr.String()) + for _, session := range c.sessionCache.data { + if time.Since(session.updateTime) > c.sessionTimeout { + c.sessionCache.Delete(session.caddr.String()) log.Tracef("session for %s deleted", session.caddr) } - c.mutex.RUnlock() } } }