From 5edb3c8086b005d5b14d66415fde59a381ecef5f Mon Sep 17 00:00:00 2001 From: bagas Date: Sun, 4 Jan 2026 15:19:03 +0700 Subject: [PATCH] fix: startup order --- internal/grpc/client/client.go | 47 ++++++++++++----------- main.go | 68 ++++++++++++++++++++-------------- 2 files changed, 64 insertions(+), 51 deletions(-) diff --git a/internal/grpc/client/client.go b/internal/grpc/client/client.go index 0883a86..f056ad7 100644 --- a/internal/grpc/client/client.go +++ b/internal/grpc/client/client.go @@ -42,6 +42,7 @@ type Client struct { slugService proto.SlugChangeClient eventService proto.EventServiceClient authorizeConnectionService proto.UserServiceClient + closing bool } func DefaultConfig() *GrpcConfig { @@ -155,16 +156,17 @@ func (c *Client) SubscribeEvents(ctx context.Context, identity, authToken string for { subscribe, err := c.eventService.Subscribe(ctx) if err != nil { - if !isConnectionError(err) { + if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil { return err } - if status.Code(err) == codes.Unauthenticated { + if !c.isConnectionError(err) || status.Code(err) == codes.Unauthenticated { return err } - if err := wait(); err != nil { + if err = wait(); err != nil { return err } growBackoff() + log.Printf("Reconnect to controller within %v sec", backoff.Seconds()) continue } @@ -180,8 +182,8 @@ func (c *Client) SubscribeEvents(ctx context.Context, identity, authToken string if err != nil { log.Println("Authentication failed to send to gRPC server:", err) - if isConnectionError(err) { - if err := wait(); err != nil { + if c.isConnectionError(err) { + if err = wait(); err != nil { return err } growBackoff() @@ -193,9 +195,13 @@ func (c *Client) SubscribeEvents(ctx context.Context, identity, authToken string backoff = baseBackoff if err = c.processEventStream(subscribe); err != nil { - if isConnectionError(err) { + if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil { + return err + } + if c.isConnectionError(err) { log.Printf("Reconnect to controller within %v sec", backoff.Seconds()) - if err := wait(); err != nil { + if err = wait(); err != nil { + fmt.Println(err) return err } growBackoff() @@ -210,16 +216,7 @@ func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Nod for { recv, err := subscribe.Recv() if err != nil { - if isConnectionError(err) { - log.Printf("connection error receiving from gRPC server: %v", err) - return err - } - if status.Code(err) == codes.Unauthenticated { - log.Printf("Authentication failed: %v", err) - return err - } - log.Printf("non-connection receive error from gRPC server: %v", err) - continue + return err } switch recv.GetType() { case proto.EventType_SLUG_CHANGE: @@ -237,8 +234,7 @@ func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Nod }, }) if errSend != nil { - if isConnectionError(errSend) { - log.Printf("connection error sending slug change failure: %v", errSend) + if c.isConnectionError(errSend) { return errSend } log.Printf("non-connection send error for slug change failure: %v", errSend) @@ -257,8 +253,7 @@ func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Nod }, }) if errSend != nil { - if isConnectionError(errSend) { - log.Printf("connection error sending slug change failure: %v", errSend) + if c.isConnectionError(errSend) { return errSend } log.Printf("non-connection send error for slug change failure: %v", errSend) @@ -276,7 +271,7 @@ func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Nod }, }) if err != nil { - if isConnectionError(err) { + if c.isConnectionError(err) { log.Printf("connection error sending slug change success: %v", err) return err } @@ -306,7 +301,7 @@ func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Nod }, }) if err != nil { - if isConnectionError(err) { + if c.isConnectionError(err) { log.Printf("connection error sending sessions success: %v", err) return err } @@ -338,6 +333,7 @@ func (c *Client) AuthorizeConn(ctx context.Context, token string) (authorized bo func (c *Client) Close() error { if c.conn != nil { log.Printf("Closing gRPC connection to %s", c.config.Address) + c.closing = true return c.conn.Close() } return nil @@ -361,7 +357,10 @@ func (c *Client) GetConfig() *GrpcConfig { return c.config } -func isConnectionError(err error) bool { +func (c *Client) isConnectionError(err error) bool { + if c.closing { + return false + } if err == nil { return false } diff --git a/main.go b/main.go index 36ec8ca..e069f7c 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,9 @@ import ( "net/http" _ "net/http/pprof" "os" + "os/signal" "strings" + "syscall" "time" "tunnel_pls/internal/config" "tunnel_pls/internal/grpc/client" @@ -68,10 +70,14 @@ func main() { sshConfig.AddHostKey(private) sessionRegistry := session.NewRegistry() - var grpcClient *client.Client - var cancel context.CancelFunc = func() {} - var ctx context.Context = context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errChan := make(chan error, 2) + shutdownChan := make(chan os.Signal, 1) + signal.Notify(shutdownChan, os.Interrupt, syscall.SIGTERM) + + var grpcClient *client.Client if isNodeMode { grpcHost := config.Getenv("GRPC_ADDRESS", "localhost") grpcPort := config.Getenv("GRPC_PORT", "8080") @@ -79,10 +85,9 @@ func main() { nodeToken := config.Getenv("NODE_TOKEN", "") if nodeToken == "" { log.Fatalf("NODE_TOKEN is required in node mode") - return } - - grpcClient, err = client.New(&client.GrpcConfig{ + + c, err := client.New(&client.GrpcConfig{ Address: grpcAddr, UseTLS: false, InsecureSkipVerify: false, @@ -91,37 +96,46 @@ func main() { MaxRetries: 3, }, sessionRegistry) if err != nil { - return + log.Fatalf("failed to create grpc client: %v", err) } - defer func(grpcClient *client.Client) { - err := grpcClient.Close() - if err != nil { + grpcClient = c - } - }(grpcClient) - - ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) - err = grpcClient.CheckServerHealth(ctx) - if err != nil { - log.Fatalf("gRPC health check failed: %s", err) - return + healthCtx, healthCancel := context.WithTimeout(ctx, 5*time.Second) + if err := grpcClient.CheckServerHealth(healthCtx); err != nil { + healthCancel() + log.Fatalf("gRPC health check failed: %v", err) } - cancel() + healthCancel() - ctx, cancel = context.WithCancel(context.Background()) go func() { identity := config.Getenv("DOMAIN", "localhost") - err = grpcClient.SubscribeEvents(ctx, identity, nodeToken) - if err != nil { - return + if err := grpcClient.SubscribeEvents(ctx, identity, nodeToken); err != nil { + errChan <- fmt.Errorf("failed to subscribe to events: %w", err) } }() } - app, err := server.NewServer(sshConfig, sessionRegistry, grpcClient) - if err != nil { - log.Fatalf("Failed to start server: %s", err) + go func() { + app, err := server.NewServer(sshConfig, sessionRegistry, grpcClient) + if err != nil { + errChan <- fmt.Errorf("failed to start server: %s", err) + return + } + app.Start() + }() + + select { + case err := <-errChan: + log.Printf("error happen : %s", err) + case sig := <-shutdownChan: + log.Printf("received signal %s, shutting down", sig) } - app.Start() + cancel() + + if grpcClient != nil { + if err := grpcClient.Close(); err != nil { + log.Printf("failed to close grpc conn : %s", err) + } + } }