staging #77
+104
-98
@@ -77,45 +77,19 @@ func New(config config.Config, address string, sessionRegistry registry.Registry
|
||||
}
|
||||
|
||||
func (c *client) SubscribeEvents(ctx context.Context, identity, authToken string) error {
|
||||
const (
|
||||
baseBackoff = time.Second
|
||||
maxBackoff = 30 * time.Second
|
||||
)
|
||||
|
||||
backoff := baseBackoff
|
||||
wait := func() error {
|
||||
if backoff <= 0 {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
growBackoff := func() {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
backoff := time.Second
|
||||
|
||||
for {
|
||||
if err := c.subscribeAndProcess(ctx, identity, authToken, &backoff); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) subscribeAndProcess(ctx context.Context, identity, authToken string, backoff *time.Duration) error {
|
||||
subscribe, err := c.eventService.Subscribe(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil {
|
||||
return err
|
||||
}
|
||||
if !c.isConnectionError(err) || status.Code(err) == codes.Unauthenticated {
|
||||
return err
|
||||
}
|
||||
if err = wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
growBackoff()
|
||||
log.Printf("Reconnect to controller within %v sec", backoff.Seconds())
|
||||
continue
|
||||
return c.handleSubscribeError(ctx, err, backoff)
|
||||
}
|
||||
|
||||
err = subscribe.Send(&proto.Node{
|
||||
@@ -129,33 +103,78 @@ func (c *client) SubscribeEvents(ctx context.Context, identity, authToken string
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Println("Authentication failed to send to gRPC server:", err)
|
||||
if c.isConnectionError(err) {
|
||||
if err = wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
growBackoff()
|
||||
continue
|
||||
}
|
||||
return err
|
||||
return c.handleAuthError(ctx, err, backoff)
|
||||
}
|
||||
|
||||
log.Println("Authentication Successfully sent to gRPC server")
|
||||
backoff = baseBackoff
|
||||
*backoff = time.Second
|
||||
|
||||
if err = c.processEventStream(subscribe); err != nil {
|
||||
return c.handleStreamError(ctx, err, backoff)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) handleSubscribeError(ctx context.Context, err error, backoff *time.Duration) error {
|
||||
if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil {
|
||||
return err
|
||||
}
|
||||
if c.isConnectionError(err) {
|
||||
if !c.isConnectionError(err) || status.Code(err) == codes.Unauthenticated {
|
||||
return err
|
||||
}
|
||||
if err = c.wait(ctx, *backoff); err != nil {
|
||||
return err
|
||||
}
|
||||
c.growBackoff(backoff)
|
||||
log.Printf("Reconnect to controller within %v sec", backoff.Seconds())
|
||||
if err = wait(); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) handleAuthError(ctx context.Context, err error, backoff *time.Duration) error {
|
||||
log.Println("Authentication failed to send to gRPC server:", err)
|
||||
if !c.isConnectionError(err) {
|
||||
return err
|
||||
}
|
||||
growBackoff()
|
||||
continue
|
||||
}
|
||||
if err := c.wait(ctx, *backoff); err != nil {
|
||||
return err
|
||||
}
|
||||
c.growBackoff(backoff)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) handleStreamError(ctx context.Context, err error, backoff *time.Duration) error {
|
||||
if errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled || ctx.Err() != nil {
|
||||
return err
|
||||
}
|
||||
if !c.isConnectionError(err) {
|
||||
return err
|
||||
}
|
||||
log.Printf("Reconnect to controller within %v sec", backoff.Seconds())
|
||||
if err := c.wait(ctx, *backoff); err != nil {
|
||||
return err
|
||||
}
|
||||
c.growBackoff(backoff)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) wait(ctx context.Context, duration time.Duration) error {
|
||||
if duration <= 0 {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-time.After(duration):
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) growBackoff(backoff *time.Duration) {
|
||||
const maxBackoff = 30 * time.Second
|
||||
*backoff *= 2
|
||||
if *backoff > maxBackoff {
|
||||
*backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,35 +210,20 @@ func (c *client) eventHandlers(subscribe grpc.BidiStreamingClient[proto.Node, pr
|
||||
func (c *client) handleSlugChange(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
|
||||
slugEvent := evt.GetSlugEvent()
|
||||
user := slugEvent.GetUser()
|
||||
oldSlug := slugEvent.GetOld()
|
||||
newSlug := slugEvent.GetNew()
|
||||
oldKey := types.SessionKey{Id: slugEvent.GetOld(), Type: types.TunnelTypeHTTP}
|
||||
newKey := types.SessionKey{Id: slugEvent.GetNew(), Type: types.TunnelTypeHTTP}
|
||||
|
||||
userSession, err := c.sessionRegistry.Get(types.SessionKey{Id: oldSlug, Type: types.TunnelTypeHTTP})
|
||||
userSession, err := c.sessionRegistry.Get(oldKey)
|
||||
if err != nil {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Node_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{Success: false, Message: err.Error()},
|
||||
},
|
||||
}, "slug change failure response")
|
||||
return c.sendSlugChangeResponse(subscribe, false, err.Error())
|
||||
}
|
||||
|
||||
if err = c.sessionRegistry.Update(user, types.SessionKey{Id: oldSlug, Type: types.TunnelTypeHTTP}, types.SessionKey{Id: newSlug, Type: types.TunnelTypeHTTP}); err != nil {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Node_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{Success: false, Message: err.Error()},
|
||||
},
|
||||
}, "slug change failure response")
|
||||
if err = c.sessionRegistry.Update(user, oldKey, newKey); err != nil {
|
||||
return c.sendSlugChangeResponse(subscribe, false, err.Error())
|
||||
}
|
||||
|
||||
userSession.Interaction().Redraw()
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Node_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{Success: true, Message: ""},
|
||||
},
|
||||
}, "slug change success response")
|
||||
return c.sendSlugChangeResponse(subscribe, true, "")
|
||||
}
|
||||
|
||||
func (c *client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
|
||||
@@ -238,12 +242,7 @@ func (c *client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node
|
||||
})
|
||||
}
|
||||
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_GET_SESSIONS,
|
||||
Payload: &proto.Node_GetSessionsEvent{
|
||||
GetSessionsEvent: &proto.GetSessionsResponse{Details: details},
|
||||
},
|
||||
}, "send get sessions response")
|
||||
return c.sendGetSessionsResponse(subscribe, details)
|
||||
}
|
||||
|
||||
func (c *client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
|
||||
@@ -253,39 +252,46 @@ func (c *client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto
|
||||
|
||||
tunnelType, err := c.protoToTunnelType(terminate.GetTunnelType())
|
||||
if err != nil {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_TERMINATE_SESSION,
|
||||
Payload: &proto.Node_TerminateSessionEventResponse{
|
||||
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()},
|
||||
},
|
||||
}, "terminate session invalid tunnel type")
|
||||
return c.sendTerminateSessionResponse(subscribe, false, err.Error())
|
||||
}
|
||||
|
||||
userSession, err := c.sessionRegistry.GetWithUser(user, types.SessionKey{Id: slug, Type: tunnelType})
|
||||
if err != nil {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_TERMINATE_SESSION,
|
||||
Payload: &proto.Node_TerminateSessionEventResponse{
|
||||
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()},
|
||||
},
|
||||
}, "terminate session fetch failed")
|
||||
return c.sendTerminateSessionResponse(subscribe, false, err.Error())
|
||||
}
|
||||
|
||||
if err = userSession.Lifecycle().Close(); err != nil {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_TERMINATE_SESSION,
|
||||
Payload: &proto.Node_TerminateSessionEventResponse{
|
||||
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()},
|
||||
},
|
||||
}, "terminate session close failed")
|
||||
return c.sendTerminateSessionResponse(subscribe, false, err.Error())
|
||||
}
|
||||
|
||||
return c.sendTerminateSessionResponse(subscribe, true, "")
|
||||
}
|
||||
|
||||
func (c *client) sendSlugChangeResponse(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], success bool, message string) error {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Node_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{Success: success, Message: message},
|
||||
},
|
||||
}, "slug change response")
|
||||
}
|
||||
|
||||
func (c *client) sendGetSessionsResponse(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], details []*proto.Detail) error {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_GET_SESSIONS,
|
||||
Payload: &proto.Node_GetSessionsEvent{
|
||||
GetSessionsEvent: &proto.GetSessionsResponse{Details: details},
|
||||
},
|
||||
}, "send get sessions response")
|
||||
}
|
||||
|
||||
func (c *client) sendTerminateSessionResponse(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], success bool, message string) error {
|
||||
return c.sendNode(subscribe, &proto.Node{
|
||||
Type: proto.EventType_TERMINATE_SESSION,
|
||||
Payload: &proto.Node_TerminateSessionEventResponse{
|
||||
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: true, Message: ""},
|
||||
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: success, Message: message},
|
||||
},
|
||||
}, "terminate session success response")
|
||||
}, "terminate session response")
|
||||
}
|
||||
|
||||
func (c *client) sendNode(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], node *proto.Node, context string) error {
|
||||
|
||||
+3
-3
@@ -87,12 +87,12 @@ func (s *session) Slug() slug.Slug {
|
||||
|
||||
func (s *session) Detail() *types.Detail {
|
||||
tunnelTypeMap := map[types.TunnelType]string{
|
||||
types.TunnelTypeHTTP: "TunnelTypeHTTP",
|
||||
types.TunnelTypeTCP: "TunnelTypeTCP",
|
||||
types.TunnelTypeHTTP: "HTTP",
|
||||
types.TunnelTypeTCP: "TCP",
|
||||
}
|
||||
tunnelType, ok := tunnelTypeMap[s.forwarder.TunnelType()]
|
||||
if !ok {
|
||||
tunnelType = "TunnelTypeUNKNOWN"
|
||||
tunnelType = "UNKNOWN"
|
||||
}
|
||||
|
||||
return &types.Detail{
|
||||
|
||||
Reference in New Issue
Block a user