From 24ea464c7a0ab62d71e0a3b825cc44de9b472539 Mon Sep 17 00:00:00 2001 From: bagas Date: Fri, 23 Jan 2026 14:48:04 +0700 Subject: [PATCH] fix(client): reduce cognitive complexity and fix typo (go:S3776) --- internal/grpc/client/client.go | 246 +++++++++++++++++---------------- session/session.go | 6 +- 2 files changed, 129 insertions(+), 123 deletions(-) diff --git a/internal/grpc/client/client.go b/internal/grpc/client/client.go index f2e0a1e..530e664 100644 --- a/internal/grpc/client/client.go +++ b/internal/grpc/client/client.go @@ -77,85 +77,104 @@ func New(config config.Config, address string, sessionRegistry registry.Registry } func (c *client) SubscribeEvents(ctx context.Context, identity, authToken string) error { - const ( - baseBackoff = time.Second - maxBackoff = 30 * time.Second - ) - - backoff := baseBackoff - wait := func() error { - if backoff <= 0 { - return nil - } - select { - case <-time.After(backoff): - return nil - case <-ctx.Done(): - return ctx.Err() - } - } - growBackoff := func() { - backoff *= 2 - if backoff > maxBackoff { - backoff = maxBackoff - } - } + backoff := time.Second for { - subscribe, err := c.eventService.Subscribe(ctx) - if err != nil { - if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil { - return err - } - if !c.isConnectionError(err) || status.Code(err) == codes.Unauthenticated { - return err - } - if err = wait(); err != nil { - return err - } - growBackoff() - log.Printf("Reconnect to controller within %v sec", backoff.Seconds()) - continue + if err := c.subscribeAndProcess(ctx, identity, authToken, &backoff); err != nil { + return err } + } +} - err = subscribe.Send(&proto.Node{ - Type: proto.EventType_AUTHENTICATION, - Payload: &proto.Node_AuthEvent{ - AuthEvent: &proto.Authentication{ - Identity: identity, - AuthToken: authToken, - }, +func (c *client) subscribeAndProcess(ctx context.Context, identity, authToken string, backoff *time.Duration) error { + subscribe, err := c.eventService.Subscribe(ctx) + if err != nil { + return c.handleSubscribeError(ctx, err, backoff) + } + + err = subscribe.Send(&proto.Node{ + Type: proto.EventType_AUTHENTICATION, + Payload: &proto.Node_AuthEvent{ + AuthEvent: &proto.Authentication{ + Identity: identity, + AuthToken: authToken, }, - }) + }, + }) - if err != nil { - log.Println("Authentication failed to send to gRPC server:", err) - if c.isConnectionError(err) { - if err = wait(); err != nil { - return err - } - growBackoff() - continue - } - return err - } - log.Println("Authentication Successfully sent to gRPC server") - backoff = baseBackoff + if err != nil { + return c.handleAuthError(ctx, err, backoff) + } - if err = c.processEventStream(subscribe); err != nil { - if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil { - return err - } - if c.isConnectionError(err) { - log.Printf("Reconnect to controller within %v sec", backoff.Seconds()) - if err = wait(); err != nil { - return err - } - growBackoff() - continue - } - return err - } + log.Println("Authentication Successfully sent to gRPC server") + *backoff = time.Second + + if err = c.processEventStream(subscribe); err != nil { + return c.handleStreamError(ctx, err, backoff) + } + + return nil +} + +func (c *client) handleSubscribeError(ctx context.Context, err error, backoff *time.Duration) error { + if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil { + return err + } + if !c.isConnectionError(err) || status.Code(err) == codes.Unauthenticated { + return err + } + if err = c.wait(ctx, *backoff); err != nil { + return err + } + c.growBackoff(backoff) + log.Printf("Reconnect to controller within %v sec", backoff.Seconds()) + return nil +} + +func (c *client) handleAuthError(ctx context.Context, err error, backoff *time.Duration) error { + log.Println("Authentication failed to send to gRPC server:", err) + if !c.isConnectionError(err) { + return err + } + if err := c.wait(ctx, *backoff); err != nil { + return err + } + c.growBackoff(backoff) + return nil +} + +func (c *client) handleStreamError(ctx context.Context, err error, backoff *time.Duration) error { + if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil { + return err + } + if !c.isConnectionError(err) { + return err + } + log.Printf("Reconnect to controller within %v sec", backoff.Seconds()) + if err := c.wait(ctx, *backoff); err != nil { + return err + } + c.growBackoff(backoff) + return nil +} + +func (c *client) wait(ctx context.Context, duration time.Duration) error { + if duration <= 0 { + return nil + } + select { + case <-time.After(duration): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (c *client) growBackoff(backoff *time.Duration) { + const maxBackoff = 30 * time.Second + *backoff *= 2 + if *backoff > maxBackoff { + *backoff = maxBackoff } } @@ -191,35 +210,20 @@ func (c *client) eventHandlers(subscribe grpc.BidiStreamingClient[proto.Node, pr func (c *client) handleSlugChange(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error { slugEvent := evt.GetSlugEvent() user := slugEvent.GetUser() - oldSlug := slugEvent.GetOld() - newSlug := slugEvent.GetNew() + oldKey := types.SessionKey{Id: slugEvent.GetOld(), Type: types.TunnelTypeHTTP} + newKey := types.SessionKey{Id: slugEvent.GetNew(), Type: types.TunnelTypeHTTP} - userSession, err := c.sessionRegistry.Get(types.SessionKey{Id: oldSlug, Type: types.TunnelTypeHTTP}) + userSession, err := c.sessionRegistry.Get(oldKey) if err != nil { - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_SLUG_CHANGE_RESPONSE, - Payload: &proto.Node_SlugEventResponse{ - SlugEventResponse: &proto.SlugChangeEventResponse{Success: false, Message: err.Error()}, - }, - }, "slug change failure response") + return c.sendSlugChangeResponse(subscribe, false, err.Error()) } - if err = c.sessionRegistry.Update(user, types.SessionKey{Id: oldSlug, Type: types.TunnelTypeHTTP}, types.SessionKey{Id: newSlug, Type: types.TunnelTypeHTTP}); err != nil { - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_SLUG_CHANGE_RESPONSE, - Payload: &proto.Node_SlugEventResponse{ - SlugEventResponse: &proto.SlugChangeEventResponse{Success: false, Message: err.Error()}, - }, - }, "slug change failure response") + if err = c.sessionRegistry.Update(user, oldKey, newKey); err != nil { + return c.sendSlugChangeResponse(subscribe, false, err.Error()) } userSession.Interaction().Redraw() - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_SLUG_CHANGE_RESPONSE, - Payload: &proto.Node_SlugEventResponse{ - SlugEventResponse: &proto.SlugChangeEventResponse{Success: true, Message: ""}, - }, - }, "slug change success response") + return c.sendSlugChangeResponse(subscribe, true, "") } func (c *client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error { @@ -238,12 +242,7 @@ func (c *client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node }) } - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_GET_SESSIONS, - Payload: &proto.Node_GetSessionsEvent{ - GetSessionsEvent: &proto.GetSessionsResponse{Details: details}, - }, - }, "send get sessions response") + return c.sendGetSessionsResponse(subscribe, details) } func (c *client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error { @@ -253,39 +252,46 @@ func (c *client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto tunnelType, err := c.protoToTunnelType(terminate.GetTunnelType()) if err != nil { - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_TERMINATE_SESSION, - Payload: &proto.Node_TerminateSessionEventResponse{ - TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()}, - }, - }, "terminate session invalid tunnel type") + return c.sendTerminateSessionResponse(subscribe, false, err.Error()) } userSession, err := c.sessionRegistry.GetWithUser(user, types.SessionKey{Id: slug, Type: tunnelType}) if err != nil { - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_TERMINATE_SESSION, - Payload: &proto.Node_TerminateSessionEventResponse{ - TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()}, - }, - }, "terminate session fetch failed") + return c.sendTerminateSessionResponse(subscribe, false, err.Error()) } if err = userSession.Lifecycle().Close(); err != nil { - return c.sendNode(subscribe, &proto.Node{ - Type: proto.EventType_TERMINATE_SESSION, - Payload: &proto.Node_TerminateSessionEventResponse{ - TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()}, - }, - }, "terminate session close failed") + return c.sendTerminateSessionResponse(subscribe, false, err.Error()) } + return c.sendTerminateSessionResponse(subscribe, true, "") +} + +func (c *client) sendSlugChangeResponse(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], success bool, message string) error { + return c.sendNode(subscribe, &proto.Node{ + Type: proto.EventType_SLUG_CHANGE_RESPONSE, + Payload: &proto.Node_SlugEventResponse{ + SlugEventResponse: &proto.SlugChangeEventResponse{Success: success, Message: message}, + }, + }, "slug change response") +} + +func (c *client) sendGetSessionsResponse(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], details []*proto.Detail) error { + return c.sendNode(subscribe, &proto.Node{ + Type: proto.EventType_GET_SESSIONS, + Payload: &proto.Node_GetSessionsEvent{ + GetSessionsEvent: &proto.GetSessionsResponse{Details: details}, + }, + }, "send get sessions response") +} + +func (c *client) sendTerminateSessionResponse(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], success bool, message string) error { return c.sendNode(subscribe, &proto.Node{ Type: proto.EventType_TERMINATE_SESSION, Payload: &proto.Node_TerminateSessionEventResponse{ - TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: true, Message: ""}, + TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: success, Message: message}, }, - }, "terminate session success response") + }, "terminate session response") } func (c *client) sendNode(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], node *proto.Node, context string) error { diff --git a/session/session.go b/session/session.go index cb7c04c..c88f615 100644 --- a/session/session.go +++ b/session/session.go @@ -87,12 +87,12 @@ func (s *session) Slug() slug.Slug { func (s *session) Detail() *types.Detail { tunnelTypeMap := map[types.TunnelType]string{ - types.TunnelTypeHTTP: "TunnelTypeHTTP", - types.TunnelTypeTCP: "TunnelTypeTCP", + types.TunnelTypeHTTP: "HTTP", + types.TunnelTypeTCP: "TCP", } tunnelType, ok := tunnelTypeMap[s.forwarder.TunnelType()] if !ok { - tunnelType = "TunnelTypeUNKNOWN" + tunnelType = "UNKNOWN" } return &types.Detail{