refactor: convert structs to interfaces and rename accessors
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Has been cancelled

- Convert struct types to interfaces
- Rename getter and setter methods
- Add Close method to server interface
- Merge handler functionality into session file
- Handle lifecycle.Connection().Wait()
- fix panic on nil connection in SSH server
This commit is contained in:
2026-01-16 15:17:33 +07:00
parent dbdf8094fa
commit 19135ceb42
5 changed files with 325 additions and 373 deletions
+40 -95
View File
@@ -2,7 +2,6 @@ package client
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
@@ -16,7 +15,6 @@ import (
proto "git.fossy.my.id/bagas/tunnel-please-grpc/gen"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
@@ -24,83 +22,34 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
type GrpcConfig struct {
Address string
UseTLS bool
InsecureSkipVerify bool
Timeout time.Duration
KeepAlive bool
MaxRetries int
KeepAliveTime time.Duration
KeepAliveTimeout time.Duration
PermitWithoutStream bool
type Client interface {
SubscribeEvents(ctx context.Context, identity, authToken string) error
ClientConn() *grpc.ClientConn
AuthorizeConn(ctx context.Context, token string) (authorized bool, user string, err error)
Close() error
CheckServerHealth(ctx context.Context) error
}
type Client struct {
type client struct {
conn *grpc.ClientConn
config *GrpcConfig
address string
sessionRegistry session.Registry
eventService proto.EventServiceClient
authorizeConnectionService proto.UserServiceClient
closing bool
}
func DefaultConfig() *GrpcConfig {
return &GrpcConfig{
Address: "localhost:50051",
UseTLS: false,
InsecureSkipVerify: false,
Timeout: 10 * time.Second,
KeepAlive: true,
MaxRetries: 3,
KeepAliveTime: 2 * time.Minute,
KeepAliveTimeout: 10 * time.Second,
PermitWithoutStream: false,
}
}
func New(config *GrpcConfig, sessionRegistry session.Registry) (*Client, error) {
if config == nil {
config = DefaultConfig()
} else {
defaults := DefaultConfig()
if config.Address == "" {
config.Address = defaults.Address
}
if config.Timeout == 0 {
config.Timeout = defaults.Timeout
}
if config.MaxRetries == 0 {
config.MaxRetries = defaults.MaxRetries
}
if config.KeepAliveTime == 0 {
config.KeepAliveTime = defaults.KeepAliveTime
}
if config.KeepAliveTimeout == 0 {
config.KeepAliveTimeout = defaults.KeepAliveTimeout
}
}
func New(address string, sessionRegistry session.Registry) (Client, error) {
var opts []grpc.DialOption
if config.UseTLS {
tlsConfig := &tls.Config{
InsecureSkipVerify: config.InsecureSkipVerify,
}
creds := credentials.NewTLS(tlsConfig)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
kaParams := keepalive.ClientParameters{
Time: 2 * time.Minute,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
}
if config.KeepAlive {
kaParams := keepalive.ClientParameters{
Time: config.KeepAliveTime,
Timeout: config.KeepAliveTimeout,
PermitWithoutStream: config.PermitWithoutStream,
}
opts = append(opts, grpc.WithKeepaliveParams(kaParams))
}
opts = append(opts, grpc.WithKeepaliveParams(kaParams))
opts = append(opts,
grpc.WithDefaultCallOptions(
@@ -109,24 +58,24 @@ func New(config *GrpcConfig, sessionRegistry session.Registry) (*Client, error)
),
)
conn, err := grpc.NewClient(config.Address, opts...)
conn, err := grpc.NewClient(address, opts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to gRPC server at %s: %w", config.Address, err)
return nil, fmt.Errorf("failed to connect to gRPC server at %s: %w", address, err)
}
eventService := proto.NewEventServiceClient(conn)
authorizeConnectionService := proto.NewUserServiceClient(conn)
return &Client{
return &client{
conn: conn,
config: config,
address: address,
sessionRegistry: sessionRegistry,
eventService: eventService,
authorizeConnectionService: authorizeConnectionService,
}, nil
}
func (c *Client) SubscribeEvents(ctx context.Context, identity, authToken string) error {
func (c *client) SubscribeEvents(ctx context.Context, identity, authToken string) error {
const (
baseBackoff = time.Second
maxBackoff = 30 * time.Second
@@ -209,7 +158,7 @@ func (c *Client) SubscribeEvents(ctx context.Context, identity, authToken string
}
}
func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events]) error {
func (c *client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events]) error {
handlers := c.eventHandlers(subscribe)
for {
@@ -230,7 +179,7 @@ func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Nod
}
}
func (c *Client) eventHandlers(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events]) map[proto.EventType]func(*proto.Events) error {
func (c *client) eventHandlers(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events]) map[proto.EventType]func(*proto.Events) error {
return map[proto.EventType]func(*proto.Events) error{
proto.EventType_SLUG_CHANGE: func(evt *proto.Events) error { return c.handleSlugChange(subscribe, evt) },
proto.EventType_GET_SESSIONS: func(evt *proto.Events) error { return c.handleGetSessions(subscribe, evt) },
@@ -238,7 +187,7 @@ func (c *Client) eventHandlers(subscribe grpc.BidiStreamingClient[proto.Node, pr
}
}
func (c *Client) handleSlugChange(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
func (c *client) handleSlugChange(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
slugEvent := evt.GetSlugEvent()
user := slugEvent.GetUser()
oldSlug := slugEvent.GetOld()
@@ -272,7 +221,7 @@ func (c *Client) handleSlugChange(subscribe grpc.BidiStreamingClient[proto.Node,
}, "slug change success response")
}
func (c *Client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
func (c *client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
sessions := c.sessionRegistry.GetAllSessionFromUser(evt.GetGetSessionsEvent().GetIdentity())
var details []*proto.Detail
@@ -296,7 +245,7 @@ func (c *Client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node
}, "send get sessions response")
}
func (c *Client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
func (c *client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
terminate := evt.GetTerminateSessionEvent()
user := terminate.GetUser()
slug := terminate.GetSlug()
@@ -338,7 +287,7 @@ func (c *Client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto
}, "terminate session success response")
}
func (c *Client) sendNode(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], node *proto.Node, context string) error {
func (c *client) sendNode(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], node *proto.Node, context string) error {
if err := subscribe.Send(node); err != nil {
if c.isConnectionError(err) {
return err
@@ -348,7 +297,7 @@ func (c *Client) sendNode(subscribe grpc.BidiStreamingClient[proto.Node, proto.E
return nil
}
func (c *Client) protoToTunnelType(t proto.TunnelType) (types.TunnelType, error) {
func (c *client) protoToTunnelType(t proto.TunnelType) (types.TunnelType, error) {
switch t {
case proto.TunnelType_HTTP:
return types.HTTP, nil
@@ -359,11 +308,11 @@ func (c *Client) protoToTunnelType(t proto.TunnelType) (types.TunnelType, error)
}
}
func (c *Client) GetConnection() *grpc.ClientConn {
func (c *client) ClientConn() *grpc.ClientConn {
return c.conn
}
func (c *Client) AuthorizeConn(ctx context.Context, token string) (authorized bool, user string, err error) {
func (c *client) AuthorizeConn(ctx context.Context, token string) (authorized bool, user string, err error) {
check, err := c.authorizeConnectionService.Check(ctx, &proto.CheckRequest{AuthToken: token})
if err != nil {
return false, "UNAUTHORIZED", err
@@ -375,17 +324,8 @@ func (c *Client) AuthorizeConn(ctx context.Context, token string) (authorized bo
return true, check.GetUser(), nil
}
func (c *Client) Close() error {
if c.conn != nil {
log.Printf("Closing gRPC connection to %s", c.config.Address)
c.closing = true
return c.conn.Close()
}
return nil
}
func (c *Client) CheckServerHealth(ctx context.Context) error {
healthClient := grpc_health_v1.NewHealthClient(c.GetConnection())
func (c *client) CheckServerHealth(ctx context.Context) error {
healthClient := grpc_health_v1.NewHealthClient(c.ClientConn())
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
Service: "",
})
@@ -398,11 +338,16 @@ func (c *Client) CheckServerHealth(ctx context.Context) error {
return nil
}
func (c *Client) GetConfig() *GrpcConfig {
return c.config
func (c *client) Close() error {
if c.conn != nil {
log.Printf("Closing gRPC connection to %s", c.address)
c.closing = true
return c.conn.Close()
}
return nil
}
func (c *Client) isConnectionError(err error) bool {
func (c *client) isConnectionError(err error) bool {
if c.closing {
return false
}