feat(grpc): integrate slug edit handling
This commit is contained in:
@@ -3,13 +3,14 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"time"
|
||||
"tunnel_pls/session"
|
||||
|
||||
"git.fossy.my.id/bagas/tunnel-please-grpc/gen"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
proto "git.fossy.my.id/bagas/tunnel-please-grpc/gen"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
@@ -20,36 +21,59 @@ import (
|
||||
)
|
||||
|
||||
type GrpcConfig struct {
|
||||
Address string
|
||||
UseTLS bool
|
||||
InsecureSkipVerify bool
|
||||
Timeout time.Duration
|
||||
KeepAlive bool
|
||||
MaxRetries int
|
||||
Address string
|
||||
UseTLS bool
|
||||
InsecureSkipVerify bool
|
||||
Timeout time.Duration
|
||||
KeepAlive bool
|
||||
MaxRetries int
|
||||
KeepAliveTime time.Duration
|
||||
KeepAliveTimeout time.Duration
|
||||
PermitWithoutStream bool
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
conn *grpc.ClientConn
|
||||
config *GrpcConfig
|
||||
sessionRegistry session.Registry
|
||||
IdentityService gen.IdentityClient
|
||||
eventService gen.EventServiceClient
|
||||
slugService proto.SlugChangeClient
|
||||
eventService proto.EventServiceClient
|
||||
}
|
||||
|
||||
func DefaultConfig() *GrpcConfig {
|
||||
return &GrpcConfig{
|
||||
Address: "localhost:50051",
|
||||
UseTLS: false,
|
||||
InsecureSkipVerify: false,
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: true,
|
||||
MaxRetries: 3,
|
||||
Address: "localhost:50051",
|
||||
UseTLS: false,
|
||||
InsecureSkipVerify: false,
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: true,
|
||||
MaxRetries: 3,
|
||||
KeepAliveTime: 2 * time.Minute,
|
||||
KeepAliveTimeout: 10 * time.Second,
|
||||
PermitWithoutStream: false,
|
||||
}
|
||||
}
|
||||
|
||||
func New(config *GrpcConfig, sessionRegistry session.Registry) (*Client, error) {
|
||||
if config == nil {
|
||||
config = DefaultConfig()
|
||||
} else {
|
||||
defaults := DefaultConfig()
|
||||
if config.Address == "" {
|
||||
config.Address = defaults.Address
|
||||
}
|
||||
if config.Timeout == 0 {
|
||||
config.Timeout = defaults.Timeout
|
||||
}
|
||||
if config.MaxRetries == 0 {
|
||||
config.MaxRetries = defaults.MaxRetries
|
||||
}
|
||||
if config.KeepAliveTime == 0 {
|
||||
config.KeepAliveTime = defaults.KeepAliveTime
|
||||
}
|
||||
if config.KeepAliveTimeout == 0 {
|
||||
config.KeepAliveTimeout = defaults.KeepAliveTimeout
|
||||
}
|
||||
}
|
||||
|
||||
var opts []grpc.DialOption
|
||||
@@ -66,9 +90,9 @@ func New(config *GrpcConfig, sessionRegistry session.Registry) (*Client, error)
|
||||
|
||||
if config.KeepAlive {
|
||||
kaParams := keepalive.ClientParameters{
|
||||
Time: 10 * time.Second,
|
||||
Timeout: 3 * time.Second,
|
||||
PermitWithoutStream: false,
|
||||
Time: config.KeepAliveTime,
|
||||
Timeout: config.KeepAliveTimeout,
|
||||
PermitWithoutStream: config.PermitWithoutStream,
|
||||
}
|
||||
opts = append(opts, grpc.WithKeepaliveParams(kaParams))
|
||||
}
|
||||
@@ -85,94 +109,120 @@ func New(config *GrpcConfig, sessionRegistry session.Registry) (*Client, error)
|
||||
return nil, fmt.Errorf("failed to connect to gRPC server at %s: %w", config.Address, err)
|
||||
}
|
||||
|
||||
identityService := gen.NewIdentityClient(conn)
|
||||
eventService := gen.NewEventServiceClient(conn)
|
||||
slugService := proto.NewSlugChangeClient(conn)
|
||||
eventService := proto.NewEventServiceClient(conn)
|
||||
|
||||
return &Client{
|
||||
conn: conn,
|
||||
config: config,
|
||||
IdentityService: identityService,
|
||||
eventService: eventService,
|
||||
slugService: slugService,
|
||||
sessionRegistry: sessionRegistry,
|
||||
eventService: eventService,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Client) SubscribeEvents(ctx context.Context) error {
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
log.Println("Context cancelled, stopping event subscription")
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
log.Println("Subscribing to events...")
|
||||
stream, err := c.eventService.Subscribe(ctx, &empty.Empty{})
|
||||
if err != nil {
|
||||
log.Printf("Failed to subscribe: %v. Retrying in 10 seconds...", err)
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := c.processEventStream(ctx, stream); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
log.Printf("Stream error: %v. Reconnecting in 10 seconds...", err)
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
func (c *Client) SubscribeEvents(ctx context.Context, identity string) error {
|
||||
subscribe, err := c.eventService.Subscribe(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = subscribe.Send(&proto.Client{
|
||||
Type: proto.EventType_AUTHENTICATION,
|
||||
Payload: &proto.Client_AuthEvent{
|
||||
AuthEvent: &proto.Authentication{
|
||||
Identity: identity,
|
||||
AuthToken: "test_auth_key",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Println("Authentication failed to send to gRPC server:", err)
|
||||
return err
|
||||
}
|
||||
log.Println("Authentication Successfully sent to gRPC server")
|
||||
err = c.processEventStream(subscribe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) processEventStream(ctx context.Context, stream gen.EventService_SubscribeClient) error {
|
||||
func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Client, proto.Controller]) error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
event, err := stream.Recv()
|
||||
recv, err := subscribe.Recv()
|
||||
if err != nil {
|
||||
st, ok := status.FromError(err)
|
||||
if !ok {
|
||||
return fmt.Errorf("non-gRPC error: %w", err)
|
||||
}
|
||||
|
||||
switch st.Code() {
|
||||
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
|
||||
return fmt.Errorf("stream closed [%s]: %s", st.Code(), st.Message())
|
||||
default:
|
||||
return fmt.Errorf("gRPC error [%s]: %s", st.Code(), st.Message())
|
||||
if isConnectionError(err) {
|
||||
log.Printf("connection error receiving from gRPC server: %v", err)
|
||||
return err
|
||||
}
|
||||
log.Printf("non-connection receive error from gRPC server: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if event != nil {
|
||||
dataEvent := event.GetDataEvent()
|
||||
if dataEvent != nil {
|
||||
oldSlug := dataEvent.GetOld()
|
||||
newSlug := dataEvent.GetNew()
|
||||
|
||||
userSession, exist := c.sessionRegistry.Get(oldSlug)
|
||||
if !exist {
|
||||
log.Printf("Session with slug '%s' not found, ignoring event", oldSlug)
|
||||
continue
|
||||
}
|
||||
success := c.sessionRegistry.Update(oldSlug, newSlug)
|
||||
|
||||
if success {
|
||||
log.Printf("Successfully updated session slug from '%s' to '%s'", oldSlug, newSlug)
|
||||
userSession.GetInteraction().Redraw()
|
||||
} else {
|
||||
log.Printf("Failed to update session slug from '%s' to '%s'", oldSlug, newSlug)
|
||||
switch recv.GetType() {
|
||||
case proto.EventType_SLUG_CHANGE:
|
||||
oldSlug := recv.GetSlugEvent().GetOld()
|
||||
newSlug := recv.GetSlugEvent().GetNew()
|
||||
session, err := c.sessionRegistry.Get(oldSlug)
|
||||
if err != nil {
|
||||
errSend := subscribe.Send(&proto.Client{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Client_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{
|
||||
Success: false,
|
||||
Message: err.Error(),
|
||||
},
|
||||
},
|
||||
})
|
||||
if errSend != nil {
|
||||
if isConnectionError(errSend) {
|
||||
log.Printf("connection error sending slug change failure: %v", errSend)
|
||||
return errSend
|
||||
}
|
||||
log.Printf("non-connection send error for slug change failure: %v", errSend)
|
||||
}
|
||||
continue
|
||||
}
|
||||
err = c.sessionRegistry.Update(oldSlug, newSlug)
|
||||
if err != nil {
|
||||
errSend := subscribe.Send(&proto.Client{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Client_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{
|
||||
Success: false,
|
||||
Message: err.Error(),
|
||||
},
|
||||
},
|
||||
})
|
||||
if errSend != nil {
|
||||
if isConnectionError(errSend) {
|
||||
log.Printf("connection error sending slug change failure: %v", errSend)
|
||||
return errSend
|
||||
}
|
||||
log.Printf("non-connection send error for slug change failure: %v", errSend)
|
||||
}
|
||||
continue
|
||||
}
|
||||
session.GetInteraction().Redraw()
|
||||
err = subscribe.Send(&proto.Client{
|
||||
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
|
||||
Payload: &proto.Client_SlugEventResponse{
|
||||
SlugEventResponse: &proto.SlugChangeEventResponse{
|
||||
Success: true,
|
||||
Message: "",
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
if isConnectionError(err) {
|
||||
log.Printf("connection error sending slug change success: %v", err)
|
||||
return err
|
||||
}
|
||||
log.Printf("non-connection send error for slug change success: %v", err)
|
||||
continue
|
||||
}
|
||||
default:
|
||||
log.Printf("Unknown event type received: %v", recv.GetType())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,3 +259,18 @@ func (c *Client) CheckServerHealth(ctx context.Context) error {
|
||||
func (c *Client) GetConfig() *GrpcConfig {
|
||||
return c.config
|
||||
}
|
||||
|
||||
func isConnectionError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
return true
|
||||
}
|
||||
switch status.Code(err) {
|
||||
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user