From e86ddc337328ee59eaf4a9aa63cc41c15bf1374c Mon Sep 17 00:00:00 2001 From: bagas Date: Tue, 6 Jan 2026 18:32:24 +0700 Subject: [PATCH] feat: implement forwarder session termination --- go.mod | 2 +- go.sum | 2 + server/server.go | 122 ++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 124 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 283530b..a596bd6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module git.fossy.my.id/bagas/tunnel-please-controller go 1.25.5 require ( - git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0 + git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0 github.com/jackc/pgx/v5 v5.8.0 github.com/joho/godotenv v1.5.1 github.com/lestrrat-go/httprc/v3 v3.0.3 diff --git a/go.sum b/go.sum index 85fa441..24f8f68 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0 h1:RhcBKUG41/om4jgN+iF/vlY/RojTe git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY= git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0 h1:tpJSKjaSmV+vxxbVx6qnStjxFVXjj2M0rygWXxLb99o= git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY= +git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0 h1:3xszIhck4wo9CoeRq9vnkar4PhY7kz9QrR30qj2XszA= +git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0/go.mod h1:Weh6ZujgWmT8XxD3Qba7sJ6r5eyUMB9XSWynqdyOoLo= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/server/server.go b/server/server.go index f871c2c..4b6a0b4 100644 --- a/server/server.go +++ b/server/server.go @@ -169,6 +169,21 @@ func processEventStream(ctx context.Context, requestChan *Subscriber, event grpc case requestChan.node <- recv: } log.Printf("Received SESSIONS event: %v", recv) + case proto.EventType_TERMINATE_SESSION: + log.Printf("Processing terminate event") + if err := event.Send(request); err != nil { + return err + } + recv, err := recvClientWithTimeout(ctx, requestChan.done, event, defaultSubscriberResponseWait) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case requestChan.node <- recv: + } + log.Printf("Received terminate event: %v", recv) default: log.Printf("Unknown event type: %v", request.GetType()) } @@ -335,8 +350,9 @@ func (s *Server) StartAPI(ctx context.Context, Addr string) error { var token jwt.Token var err error + var keyset jwk.Set if jwkURL != "" { - keyset, err := s.jwkCache.Lookup(request.Context(), jwkURL) + keyset, err = s.jwkCache.Lookup(request.Context(), jwkURL) if err != nil { log.Printf("jwks lookup failed: %v", err) writeError(http.StatusBadGateway, "unable to fetch jwks") @@ -413,6 +429,110 @@ func (s *Server) StartAPI(ctx context.Context, Addr string) error { } }) + handler.HandleFunc("DELETE /api/session/{node}/{type}/{session}", func(writer http.ResponseWriter, request *http.Request) { + writeError := func(status int, msg string) { + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(status) + _ = json.NewEncoder(writer).Encode(map[string]string{"error": msg}) + } + + var token jwt.Token + var err error + var keyset jwk.Set + if jwkURL != "" { + keyset, err = s.jwkCache.Lookup(request.Context(), jwkURL) + if err != nil { + log.Printf("jwks lookup failed: %v", err) + writeError(http.StatusBadGateway, "unable to fetch jwks") + return + } + + token, err = jwt.ParseRequest(request, jwt.WithKeySet(keyset)) + if err != nil { + log.Printf("jwt parse failed: %v", err) + writeError(http.StatusUnauthorized, "invalid or expired token") + return + } + } else { + token, err = jwt.ParseRequest(request, jwt.WithVerify(false)) + if err != nil { + log.Printf("jwt parse failed (no verification): %v", err) + writeError(http.StatusBadRequest, "invalid token") + return + } + } + + var email string + err = token.Get("email", &email) + if err != nil { + log.Printf("email claim not found: %v", err) + writeError(http.StatusBadRequest, "missing email claim in token") + return + } + if email == "" { + writeError(http.StatusBadRequest, "empty email claim in token") + return + } + node := request.PathValue("node") + if node == "" { + writeError(http.StatusBadRequest, "no node specified") + return + } + + sessionTypeRaw := request.PathValue("type") + if node == "" { + writeError(http.StatusBadRequest, "no type specified") + return + } + + var tunnelType proto.TunnelType + if sessionTypeRaw == "http" { + tunnelType = proto.TunnelType_HTTP + } else if sessionTypeRaw == "tcp" { + tunnelType = proto.TunnelType_TCP + } else { + writeError(http.StatusBadRequest, "invalid session type specified") + return + } + + session := request.PathValue("session") + if node == "" { + writeError(http.StatusBadRequest, "no node specified") + return + } + + subscriber, err := s.GetEventSubscriber(node) + if err != nil { + writeError(http.StatusBadRequest, "no node found") + return + } + subscriber.events <- &proto.Events{ + Type: proto.EventType_TERMINATE_SESSION, + Payload: &proto.Events_TerminateSessionEvent{ + TerminateSessionEvent: &proto.TerminateSessionEvent{ + User: email, + TunnelType: tunnelType, + Slug: session, + }, + }, + } + select { + case response := <-subscriber.node: + resp, ok := response.Payload.(*proto.Node_TerminateSessionEventResponse) + if !ok { + writeError(http.StatusInternalServerError, "received an unexpected response from the node") + return + } + if !resp.TerminateSessionEventResponse.Success { + writeError(http.StatusBadRequest, resp.TerminateSessionEventResponse.Message) + return + } + log.Printf("Received terminate session response: %v", response) + writer.WriteHeader(http.StatusNoContent) + case <-request.Context().Done(): + } + }) + handler.HandleFunc("/api/sessions", func(writer http.ResponseWriter, request *http.Request) { writeError := func(status int, msg string) { writer.Header().Set("Content-Type", "application/json")