Compare commits

..

6 Commits

Author SHA1 Message Date
e86ddc3373 feat: implement forwarder session termination
All checks were successful
Docker Build and Push / build-and-push-tags (push) Has been skipped
Docker Build and Push / build-and-push-branches (push) Successful in 6m3s
renovate / renovate (push) Successful in 35s
2026-01-06 18:32:24 +07:00
ff9bdcdf0b update: send user for slug change event
Some checks failed
Docker Build and Push / build-and-push-tags (push) Has been skipped
Docker Build and Push / build-and-push-branches (push) Successful in 4m20s
renovate / renovate (push) Failing after 53s
2026-01-05 16:49:58 +07:00
63cc475a47 feat: implement slug changing
All checks were successful
Docker Build and Push / build-and-push-tags (push) Has been skipped
Docker Build and Push / build-and-push-branches (push) Successful in 3m58s
renovate / renovate (push) Successful in 45s
2026-01-05 00:56:37 +07:00
6124df8911 fix: add deadline to jwk register
All checks were successful
Docker Build and Push / build-and-push-tags (push) Has been skipped
renovate / renovate (push) Successful in 44s
Docker Build and Push / build-and-push-branches (push) Successful in 4m0s
2026-01-04 17:10:36 +07:00
a98682d42f Merge pull request 'fix(deps): update module github.com/lestrrat-go/httprc/v3 to v3.0.3' (#2) from renovate/github.com-lestrrat-go-httprc-v3-3.x into main
All checks were successful
Docker Build and Push / build-and-push-tags (push) Has been skipped
renovate / renovate (push) Successful in 43s
Docker Build and Push / build-and-push-branches (push) Successful in 3m55s
Reviewed-on: #2
2026-01-04 09:27:02 +00:00
83f62f85f8 fix(deps): update module github.com/lestrrat-go/httprc/v3 to v3.0.3 2026-01-04 09:13:30 +00:00
4 changed files with 234 additions and 62 deletions

7
go.mod
View File

@@ -3,10 +3,10 @@ module git.fossy.my.id/bagas/tunnel-please-controller
go 1.25.5
require (
git.fossy.my.id/bagas/tunnel-please-grpc v1.2.0
git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0
github.com/jackc/pgx/v5 v5.8.0
github.com/joho/godotenv v1.5.1
github.com/lestrrat-go/httprc/v3 v3.0.1
github.com/lestrrat-go/httprc/v3 v3.0.3
github.com/lestrrat-go/jwx/v3 v3.0.12
google.golang.org/grpc v1.78.0
)
@@ -20,10 +20,9 @@ require (
github.com/lestrrat-go/dsig v1.0.0 // indirect
github.com/lestrrat-go/dsig-secp256k1 v1.0.0 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/lestrrat-go/option/v2 v2.0.0 // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
github.com/valyala/fastjson v1.6.7 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sys v0.39.0 // indirect

19
go.sum
View File

@@ -1,5 +1,9 @@
git.fossy.my.id/bagas/tunnel-please-grpc v1.2.0 h1:BS1dJU3wa2ILgTGwkV95Knle0il0OQtErGqyb6xV7SU=
git.fossy.my.id/bagas/tunnel-please-grpc v1.2.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY=
git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0 h1:RhcBKUG41/om4jgN+iF/vlY/RojTeX1QhBa4p4428ec=
git.fossy.my.id/bagas/tunnel-please-grpc v1.3.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY=
git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0 h1:tpJSKjaSmV+vxxbVx6qnStjxFVXjj2M0rygWXxLb99o=
git.fossy.my.id/bagas/tunnel-please-grpc v1.4.0/go.mod h1:fG+VkArdkceGB0bNA7IFQus9GetLAwdF5Oi4jdMlXtY=
git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0 h1:3xszIhck4wo9CoeRq9vnkar4PhY7kz9QrR30qj2XszA=
git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0/go.mod h1:Weh6ZujgWmT8XxD3Qba7sJ6r5eyUMB9XSWynqdyOoLo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -35,12 +39,10 @@ github.com/lestrrat-go/dsig-secp256k1 v1.0.0 h1:JpDe4Aybfl0soBvoVwjqDbp+9S1Y2OM7
github.com/lestrrat-go/dsig-secp256k1 v1.0.0/go.mod h1:CxUgAhssb8FToqbL8NjSPoGQlnO4w3LG1P0qPWQm/NU=
github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE=
github.com/lestrrat-go/httpcc v1.0.1/go.mod h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E=
github.com/lestrrat-go/httprc/v3 v3.0.1 h1:3n7Es68YYGZb2Jf+k//llA4FTZMl3yCwIjFIk4ubevI=
github.com/lestrrat-go/httprc/v3 v3.0.1/go.mod h1:2uAvmbXE4Xq8kAUjVrZOq1tZVYYYs5iP62Cmtru00xk=
github.com/lestrrat-go/httprc/v3 v3.0.3 h1:WjLHWkDkgWXeIUrKi/7lS/sGq2DjkSAwdTbH5RHXAKs=
github.com/lestrrat-go/httprc/v3 v3.0.3/go.mod h1:mSMtkZW92Z98M5YoNNztbRGxbXHql7tSitCvaxvo9l0=
github.com/lestrrat-go/jwx/v3 v3.0.12 h1:p25r68Y4KrbBdYjIsQweYxq794CtGCzcrc5dGzJIRjg=
github.com/lestrrat-go/jwx/v3 v3.0.12/go.mod h1:HiUSaNmMLXgZ08OmGBaPVvoZQgJVOQphSrGr5zMamS8=
github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
github.com/lestrrat-go/option/v2 v2.0.0 h1:XxrcaJESE1fokHy3FpaQ/cXW8ZsIdWcdFzzLOcID3Ss=
github.com/lestrrat-go/option/v2 v2.0.0/go.mod h1:oSySsmzMoR0iRzCDCaUfsCzxQHUEuhOViQObyy7S6Vg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -49,13 +51,12 @@ github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0=
github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fastjson v1.6.7 h1:ZE4tRy0CIkh+qDc5McjatheGX2czdn8slQjomexVpBM=
github.com/valyala/fastjson v1.6.7/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=

10
main.go
View File

@@ -20,7 +20,7 @@ import (
func main() {
if _, err := os.Stat(".env"); err == nil {
if err := godotenv.Load(".env"); err != nil {
if err = godotenv.Load(".env"); err != nil {
log.Printf("Warning: Failed to load .env file: %s", err)
}
}
@@ -50,7 +50,7 @@ func main() {
return
}
defer func(connect *pgx.Conn, ctx context.Context) {
err := connect.Close(ctx)
err = connect.Close(ctx)
if err != nil {
panic(err)
}
@@ -70,13 +70,13 @@ func main() {
errCh := make(chan error, 2)
go func() {
if err := s.StartAPI(ctx, apiAddr); err != nil && !errors.Is(err, context.Canceled) {
if err = s.StartAPI(ctx, apiAddr); err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
}()
go func() {
if err := s.StartController(ctx, controllerAddr); err != nil && !errors.Is(err, context.Canceled) {
if err = s.StartController(ctx, controllerAddr); err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
}()
@@ -84,7 +84,7 @@ func main() {
select {
case <-ctx.Done():
log.Printf("shutting down: %v", ctx.Err())
case err := <-errCh:
case err = <-errCh:
log.Fatalf("server error: %v", err)
}
}

View File

@@ -28,6 +28,7 @@ import (
const (
defaultSubscriberResponseWait = 5 * time.Second
jwkRegisterTimeout = 5 * time.Second
)
type Subscriber struct {
@@ -44,7 +45,6 @@ type Server struct {
authToken string
jwkCache *jwk.Cache
proto.UnimplementedEventServiceServer
proto.UnimplementedSlugChangeServer
proto.UnimplementedUserServiceServer
}
@@ -169,6 +169,21 @@ func processEventStream(ctx context.Context, requestChan *Subscriber, event grpc
case requestChan.node <- recv:
}
log.Printf("Received SESSIONS event: %v", recv)
case proto.EventType_TERMINATE_SESSION:
log.Printf("Processing terminate event")
if err := event.Send(request); err != nil {
return err
}
recv, err := recvClientWithTimeout(ctx, requestChan.done, event, defaultSubscriberResponseWait)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case requestChan.node <- recv:
}
log.Printf("Received terminate event: %v", recv)
default:
log.Printf("Unknown event type: %v", request.GetType())
}
@@ -243,46 +258,6 @@ func (s *Server) GetEventSubscriber(identity string) (*Subscriber, error) {
return req, nil
}
func (s *Server) RequestChangeSlug(ctx context.Context, request *proto.ChangeSlugRequest) (*proto.ChangeSlugResponse, error) {
if request == nil {
return nil, status.Error(codes.InvalidArgument, "request is nil")
}
if request.GetNode() == "" {
return nil, status.Error(codes.InvalidArgument, "node is required")
}
if request.Old == "" || request.New == "" {
return nil, status.Error(codes.InvalidArgument, "old and new slugs are required")
}
subscriber, err := s.GetEventSubscriber(request.GetNode())
if err != nil {
return nil, err
}
controllerMsg := &proto.Events{
Type: proto.EventType_SLUG_CHANGE,
Payload: &proto.Events_SlugEvent{
SlugEvent: &proto.SlugChangeEvent{
Old: request.Old,
New: request.New,
},
},
}
resp, err := s.sendAndReceive(ctx, subscriber, controllerMsg, defaultSubscriberResponseWait)
if err != nil {
return nil, err
}
if resp == nil {
return nil, status.Error(codes.FailedPrecondition, "empty response from client")
}
response, ok := resp.Payload.(*proto.Node_SlugEventResponse)
if !ok || response == nil || response.SlugEventResponse == nil {
return nil, status.Error(codes.FailedPrecondition, "invalid slug response payload")
}
return (*proto.ChangeSlugResponse)(response.SlugEventResponse), nil
}
type SubscriberResult struct {
Identity string
Response *proto.Node
@@ -343,6 +318,11 @@ func (s *Server) notifyAllSubscriber(ctx context.Context, recvChan <-chan *proto
}
}
type Slug struct {
Old string `json:"old"`
New string `json:"new"`
}
func (s *Server) StartAPI(ctx context.Context, Addr string) error {
handler := http.NewServeMux()
httpServer := http.Server{
@@ -352,13 +332,206 @@ func (s *Server) StartAPI(ctx context.Context, Addr string) error {
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
}
jwkURL := config.Getenv("JWKS_URL", "")
if jwkURL != "" {
if err := s.jwkCache.Register(ctx, jwkURL); err != nil {
registerCtx, cancel := context.WithTimeout(ctx, jwkRegisterTimeout)
defer cancel()
if err := s.jwkCache.Register(registerCtx, jwkURL); err != nil {
return fmt.Errorf("failed to register jwk cache: %w", err)
}
}
handler.HandleFunc("PATCH /api/session/{node}", func(writer http.ResponseWriter, request *http.Request) {
writeError := func(status int, msg string) {
writer.Header().Set("Content-Type", "application/json")
writer.WriteHeader(status)
_ = json.NewEncoder(writer).Encode(map[string]string{"error": msg})
}
var token jwt.Token
var err error
var keyset jwk.Set
if jwkURL != "" {
keyset, err = s.jwkCache.Lookup(request.Context(), jwkURL)
if err != nil {
log.Printf("jwks lookup failed: %v", err)
writeError(http.StatusBadGateway, "unable to fetch jwks")
return
}
token, err = jwt.ParseRequest(request, jwt.WithKeySet(keyset))
if err != nil {
log.Printf("jwt parse failed: %v", err)
writeError(http.StatusUnauthorized, "invalid or expired token")
return
}
} else {
token, err = jwt.ParseRequest(request, jwt.WithVerify(false))
if err != nil {
log.Printf("jwt parse failed (no verification): %v", err)
writeError(http.StatusBadRequest, "invalid token")
return
}
}
var email string
err = token.Get("email", &email)
if err != nil {
log.Printf("email claim not found: %v", err)
writeError(http.StatusBadRequest, "missing email claim in token")
return
}
if email == "" {
writeError(http.StatusBadRequest, "empty email claim in token")
return
}
node := request.PathValue("node")
if node == "" {
writeError(http.StatusBadRequest, "no node specified")
return
}
var slug *Slug
if err := json.NewDecoder(request.Body).Decode(&slug); err != nil {
writeError(http.StatusBadRequest, "invalid request body")
return
}
subscriber, err := s.GetEventSubscriber(node)
if err != nil {
writeError(http.StatusBadRequest, "no node found")
return
}
subscriber.events <- &proto.Events{
Type: proto.EventType_SLUG_CHANGE,
Payload: &proto.Events_SlugEvent{
SlugEvent: &proto.SlugChangeEvent{
User: email,
Old: slug.Old,
New: slug.New,
},
},
}
select {
case response := <-subscriber.node:
resp, ok := response.Payload.(*proto.Node_SlugEventResponse)
if !ok {
writeError(http.StatusInternalServerError, "received an unexpected response from the node")
return
}
if !resp.SlugEventResponse.Success {
writeError(http.StatusBadRequest, resp.SlugEventResponse.Message)
return
}
log.Printf("Received slug change response: %v", response)
writer.WriteHeader(http.StatusNoContent)
case <-request.Context().Done():
}
})
handler.HandleFunc("DELETE /api/session/{node}/{type}/{session}", func(writer http.ResponseWriter, request *http.Request) {
writeError := func(status int, msg string) {
writer.Header().Set("Content-Type", "application/json")
writer.WriteHeader(status)
_ = json.NewEncoder(writer).Encode(map[string]string{"error": msg})
}
var token jwt.Token
var err error
var keyset jwk.Set
if jwkURL != "" {
keyset, err = s.jwkCache.Lookup(request.Context(), jwkURL)
if err != nil {
log.Printf("jwks lookup failed: %v", err)
writeError(http.StatusBadGateway, "unable to fetch jwks")
return
}
token, err = jwt.ParseRequest(request, jwt.WithKeySet(keyset))
if err != nil {
log.Printf("jwt parse failed: %v", err)
writeError(http.StatusUnauthorized, "invalid or expired token")
return
}
} else {
token, err = jwt.ParseRequest(request, jwt.WithVerify(false))
if err != nil {
log.Printf("jwt parse failed (no verification): %v", err)
writeError(http.StatusBadRequest, "invalid token")
return
}
}
var email string
err = token.Get("email", &email)
if err != nil {
log.Printf("email claim not found: %v", err)
writeError(http.StatusBadRequest, "missing email claim in token")
return
}
if email == "" {
writeError(http.StatusBadRequest, "empty email claim in token")
return
}
node := request.PathValue("node")
if node == "" {
writeError(http.StatusBadRequest, "no node specified")
return
}
sessionTypeRaw := request.PathValue("type")
if node == "" {
writeError(http.StatusBadRequest, "no type specified")
return
}
var tunnelType proto.TunnelType
if sessionTypeRaw == "http" {
tunnelType = proto.TunnelType_HTTP
} else if sessionTypeRaw == "tcp" {
tunnelType = proto.TunnelType_TCP
} else {
writeError(http.StatusBadRequest, "invalid session type specified")
return
}
session := request.PathValue("session")
if node == "" {
writeError(http.StatusBadRequest, "no node specified")
return
}
subscriber, err := s.GetEventSubscriber(node)
if err != nil {
writeError(http.StatusBadRequest, "no node found")
return
}
subscriber.events <- &proto.Events{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Events_TerminateSessionEvent{
TerminateSessionEvent: &proto.TerminateSessionEvent{
User: email,
TunnelType: tunnelType,
Slug: session,
},
},
}
select {
case response := <-subscriber.node:
resp, ok := response.Payload.(*proto.Node_TerminateSessionEventResponse)
if !ok {
writeError(http.StatusInternalServerError, "received an unexpected response from the node")
return
}
if !resp.TerminateSessionEventResponse.Success {
writeError(http.StatusBadRequest, resp.TerminateSessionEventResponse.Message)
return
}
log.Printf("Received terminate session response: %v", response)
writer.WriteHeader(http.StatusNoContent)
case <-request.Context().Done():
}
})
handler.HandleFunc("/api/sessions", func(writer http.ResponseWriter, request *http.Request) {
writeError := func(status int, msg string) {
@@ -515,7 +688,6 @@ func (s *Server) StartController(ctx context.Context, Addr string) error {
)
reflection.Register(grpcServer)
proto.RegisterSlugChangeServer(grpcServer, s)
proto.RegisterEventServiceServer(grpcServer, s)
proto.RegisterUserServiceServer(grpcServer, s)