feat: implement forwarder session termination
All checks were successful
Docker Build and Push / build-and-push-tags (push) Has been skipped
Docker Build and Push / build-and-push-branches (push) Successful in 6m3s
renovate / renovate (push) Successful in 34s

This commit is contained in:
2026-01-06 18:32:24 +07:00
parent ff9bdcdf0b
commit e86ddc3373
3 changed files with 124 additions and 2 deletions

2
go.mod
View File

@@ -3,7 +3,7 @@ module git.fossy.my.id/bagas/tunnel-please-controller
go 1.25.5 go 1.25.5
require ( require (
git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0 git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0
github.com/jackc/pgx/v5 v5.8.0 github.com/jackc/pgx/v5 v5.8.0
github.com/joho/godotenv v1.5.1 github.com/joho/godotenv v1.5.1
github.com/lestrrat-go/httprc/v3 v3.0.3 github.com/lestrrat-go/httprc/v3 v3.0.3

2
go.sum
View File

@@ -2,6 +2,8 @@ git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0 h1:RhcBKUG41/om4jgN+iF/vlY/RojTe
git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY= git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY=
git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0 h1:tpJSKjaSmV+vxxbVx6qnStjxFVXjj2M0rygWXxLb99o= git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0 h1:tpJSKjaSmV+vxxbVx6qnStjxFVXjj2M0rygWXxLb99o=
git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY= git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY=
git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0 h1:3xszIhck4wo9CoeRq9vnkar4PhY7kz9QrR30qj2XszA=
git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0/go.mod h1:Weh6ZujgWmT8XxD3Qba7sJ6r5eyUMB9XSWynqdyOoLo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -169,6 +169,21 @@ func processEventStream(ctx context.Context, requestChan *Subscriber, event grpc
case requestChan.node <- recv: case requestChan.node <- recv:
} }
log.Printf("Received SESSIONS event: %v", recv) log.Printf("Received SESSIONS event: %v", recv)
case proto.EventType_TERMINATE_SESSION:
log.Printf("Processing terminate event")
if err := event.Send(request); err != nil {
return err
}
recv, err := recvClientWithTimeout(ctx, requestChan.done, event, defaultSubscriberResponseWait)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case requestChan.node <- recv:
}
log.Printf("Received terminate event: %v", recv)
default: default:
log.Printf("Unknown event type: %v", request.GetType()) log.Printf("Unknown event type: %v", request.GetType())
} }
@@ -335,8 +350,9 @@ func (s *Server) StartAPI(ctx context.Context, Addr string) error {
var token jwt.Token var token jwt.Token
var err error var err error
var keyset jwk.Set
if jwkURL != "" { if jwkURL != "" {
keyset, err := s.jwkCache.Lookup(request.Context(), jwkURL) keyset, err = s.jwkCache.Lookup(request.Context(), jwkURL)
if err != nil { if err != nil {
log.Printf("jwks lookup failed: %v", err) log.Printf("jwks lookup failed: %v", err)
writeError(http.StatusBadGateway, "unable to fetch jwks") writeError(http.StatusBadGateway, "unable to fetch jwks")
@@ -413,6 +429,110 @@ func (s *Server) StartAPI(ctx context.Context, Addr string) error {
} }
}) })
handler.HandleFunc("DELETE /api/session/{node}/{type}/{session}", func(writer http.ResponseWriter, request *http.Request) {
writeError := func(status int, msg string) {
writer.Header().Set("Content-Type", "application/json")
writer.WriteHeader(status)
_ = json.NewEncoder(writer).Encode(map[string]string{"error": msg})
}
var token jwt.Token
var err error
var keyset jwk.Set
if jwkURL != "" {
keyset, err = s.jwkCache.Lookup(request.Context(), jwkURL)
if err != nil {
log.Printf("jwks lookup failed: %v", err)
writeError(http.StatusBadGateway, "unable to fetch jwks")
return
}
token, err = jwt.ParseRequest(request, jwt.WithKeySet(keyset))
if err != nil {
log.Printf("jwt parse failed: %v", err)
writeError(http.StatusUnauthorized, "invalid or expired token")
return
}
} else {
token, err = jwt.ParseRequest(request, jwt.WithVerify(false))
if err != nil {
log.Printf("jwt parse failed (no verification): %v", err)
writeError(http.StatusBadRequest, "invalid token")
return
}
}
var email string
err = token.Get("email", &email)
if err != nil {
log.Printf("email claim not found: %v", err)
writeError(http.StatusBadRequest, "missing email claim in token")
return
}
if email == "" {
writeError(http.StatusBadRequest, "empty email claim in token")
return
}
node := request.PathValue("node")
if node == "" {
writeError(http.StatusBadRequest, "no node specified")
return
}
sessionTypeRaw := request.PathValue("type")
if node == "" {
writeError(http.StatusBadRequest, "no type specified")
return
}
var tunnelType proto.TunnelType
if sessionTypeRaw == "http" {
tunnelType = proto.TunnelType_HTTP
} else if sessionTypeRaw == "tcp" {
tunnelType = proto.TunnelType_TCP
} else {
writeError(http.StatusBadRequest, "invalid session type specified")
return
}
session := request.PathValue("session")
if node == "" {
writeError(http.StatusBadRequest, "no node specified")
return
}
subscriber, err := s.GetEventSubscriber(node)
if err != nil {
writeError(http.StatusBadRequest, "no node found")
return
}
subscriber.events <- &proto.Events{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Events_TerminateSessionEvent{
TerminateSessionEvent: &proto.TerminateSessionEvent{
User: email,
TunnelType: tunnelType,
Slug: session,
},
},
}
select {
case response := <-subscriber.node:
resp, ok := response.Payload.(*proto.Node_TerminateSessionEventResponse)
if !ok {
writeError(http.StatusInternalServerError, "received an unexpected response from the node")
return
}
if !resp.TerminateSessionEventResponse.Success {
writeError(http.StatusBadRequest, resp.TerminateSessionEventResponse.Message)
return
}
log.Printf("Received terminate session response: %v", response)
writer.WriteHeader(http.StatusNoContent)
case <-request.Context().Done():
}
})
handler.HandleFunc("/api/sessions", func(writer http.ResponseWriter, request *http.Request) { handler.HandleFunc("/api/sessions", func(writer http.ResponseWriter, request *http.Request) {
writeError := func(status int, msg string) { writeError := func(status int, msg string) {
writer.Header().Set("Content-Type", "application/json") writer.Header().Set("Content-Type", "application/json")