WIP: gRPC integration, initial implementation

This commit is contained in:
2026-01-01 21:03:17 +07:00
parent 96d2b88f95
commit e1cd4ed981
6 changed files with 226 additions and 112 deletions

View File

@@ -1,4 +1,4 @@
package grpc
package client
import (
"context"
@@ -6,15 +6,20 @@ import (
"fmt"
"log"
"time"
"tunnel_pls/session"
"git.fossy.my.id/bagas/tunnel-please-grpc/gen"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
type ClientConfig struct {
type GrpcConfig struct {
Address string
UseTLS bool
InsecureSkipVerify bool
@@ -25,12 +30,14 @@ type ClientConfig struct {
type Client struct {
conn *grpc.ClientConn
config *ClientConfig
config *GrpcConfig
sessionRegistry session.Registry
IdentityService gen.IdentityClient
eventService gen.EventServiceClient
}
func DefaultConfig() *ClientConfig {
return &ClientConfig{
func DefaultConfig() *GrpcConfig {
return &GrpcConfig{
Address: "localhost:50051",
UseTLS: false,
InsecureSkipVerify: false,
@@ -40,7 +47,7 @@ func DefaultConfig() *ClientConfig {
}
}
func NewClient(config *ClientConfig) (*Client, error) {
func New(config *GrpcConfig, sessionRegistry session.Registry) (*Client, error) {
if config == nil {
config = DefaultConfig()
}
@@ -61,15 +68,15 @@ func NewClient(config *ClientConfig) (*Client, error) {
kaParams := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
PermitWithoutStream: false,
}
opts = append(opts, grpc.WithKeepaliveParams(kaParams))
}
opts = append(opts,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(4*1024*1024), // 4MB
grpc.MaxCallSendMsgSize(4*1024*1024), // 4MB
grpc.MaxCallRecvMsgSize(4*1024*1024),
grpc.MaxCallSendMsgSize(4*1024*1024),
),
)
@@ -78,15 +85,98 @@ func NewClient(config *ClientConfig) (*Client, error) {
return nil, fmt.Errorf("failed to connect to gRPC server at %s: %w", config.Address, err)
}
log.Printf("Successfully connected to gRPC server at %s", config.Address)
identityService := gen.NewIdentityClient(conn)
eventService := gen.NewEventServiceClient(conn)
return &Client{
conn: conn,
config: config,
IdentityService: identityService,
eventService: eventService,
sessionRegistry: sessionRegistry,
}, nil
}
func (c *Client) SubscribeEvents(ctx context.Context) error {
for {
if ctx.Err() != nil {
log.Println("Context cancelled, stopping event subscription")
return ctx.Err()
}
log.Println("Subscribing to events...")
stream, err := c.eventService.Subscribe(ctx, &empty.Empty{})
if err != nil {
log.Printf("Failed to subscribe: %v. Retrying in 10 seconds...", err)
select {
case <-time.After(10 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
continue
}
if err := c.processEventStream(ctx, stream); err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
log.Printf("Stream error: %v. Reconnecting in 10 seconds...", err)
select {
case <-time.After(10 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
}
}
func (c *Client) processEventStream(ctx context.Context, stream gen.EventService_SubscribeClient) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
event, err := stream.Recv()
if err != nil {
st, ok := status.FromError(err)
if !ok {
return fmt.Errorf("non-gRPC error: %w", err)
}
switch st.Code() {
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
return fmt.Errorf("stream closed [%s]: %s", st.Code(), st.Message())
default:
return fmt.Errorf("gRPC error [%s]: %s", st.Code(), st.Message())
}
}
if event != nil {
dataEvent := event.GetDataEvent()
if dataEvent != nil {
oldSlug := dataEvent.GetOld()
newSlug := dataEvent.GetNew()
userSession, exist := c.sessionRegistry.Get(oldSlug)
if !exist {
log.Printf("Session with slug '%s' not found, ignoring event", oldSlug)
continue
}
success := c.sessionRegistry.Update(oldSlug, newSlug)
if success {
log.Printf("Successfully updated session slug from '%s' to '%s'", oldSlug, newSlug)
userSession.GetInteraction().Redraw()
} else {
log.Printf("Failed to update session slug from '%s' to '%s'", oldSlug, newSlug)
}
}
}
}
}
func (c *Client) GetConnection() *grpc.ClientConn {
return c.conn
}
@@ -99,88 +189,23 @@ func (c *Client) Close() error {
return nil
}
func (c *Client) IsConnected() bool {
if c.conn == nil {
return false
}
state := c.conn.GetState()
return state.String() == "READY" || state.String() == "IDLE"
}
func (c *Client) CheckServerHealth(ctx context.Context) error {
healthClient := grpc_health_v1.NewHealthClient(c.GetConnection())
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
Service: "",
})
func (c *Client) Reconnect() error {
if err := c.Close(); err != nil {
log.Printf("Warning: error closing existing connection: %v", err)
}
var opts []grpc.DialOption
if c.config.UseTLS {
tlsConfig := &tls.Config{
InsecureSkipVerify: c.config.InsecureSkipVerify,
}
creds := credentials.NewTLS(tlsConfig)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
if c.config.KeepAlive {
kaParams := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}
opts = append(opts, grpc.WithKeepaliveParams(kaParams))
}
opts = append(opts,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(4*1024*1024),
grpc.MaxCallSendMsgSize(4*1024*1024),
),
)
conn, err := grpc.NewClient(c.config.Address, opts...)
if err != nil {
return fmt.Errorf("failed to reconnect to gRPC server at %s: %w", c.config.Address, err)
return fmt.Errorf("health check failed: %w", err)
}
c.conn = conn
log.Printf("Successfully reconnected to gRPC server at %s", c.config.Address)
return nil
}
func (c *Client) WaitForReady(ctx context.Context) error {
if c.conn == nil {
return fmt.Errorf("connection is nil")
}
_, ok := ctx.Deadline()
if !ok {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.config.Timeout)
defer cancel()
}
currentState := c.conn.GetState()
for currentState.String() != "READY" {
if !c.conn.WaitForStateChange(ctx, currentState) {
return fmt.Errorf("timeout waiting for connection to be ready")
}
currentState = c.conn.GetState()
if currentState.String() == "READY" || currentState.String() == "IDLE" {
return nil
}
if currentState.String() == "SHUTDOWN" || currentState.String() == "TRANSIENT_FAILURE" {
return fmt.Errorf("connection is in %s state", currentState.String())
}
if resp.Status != grpc_health_v1.HealthCheckResponse_SERVING {
return fmt.Errorf("server not serving: %v", resp.Status)
}
return nil
}
func (c *Client) GetConfig() *ClientConfig {
func (c *Client) GetConfig() *GrpcConfig {
return c.config
}