diff --git a/go.mod b/go.mod index 342a837..3f1afef 100644 --- a/go.mod +++ b/go.mod @@ -19,3 +19,4 @@ require ( golang.org/x/text v0.31.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect ) +replace git.fossy.my.id/bagas/tunnel-please-grpc => ../tunnel-please-grpc diff --git a/server/server.go b/server/server.go index 2668ab0..d83c9df 100644 --- a/server/server.go +++ b/server/server.go @@ -6,22 +6,84 @@ import ( mathrand "math/rand" "net" "strings" + "sync" "time" "git.fossy.my.id/bagas/tunnel-please-controller/db/sqlc/repository" - identifier "git.fossy.my.id/bagas/tunnel-please-grpc/gen" + proto "git.fossy.my.id/bagas/tunnel-please-grpc/gen" "github.com/google/uuid" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/types/known/emptypb" ) type Server struct { - Database *repository.Queries - identifier.UnimplementedIdentityServer + Database *repository.Queries + Subscriber []chan *proto.Event + mu sync.RWMutex + proto.UnimplementedIdentityServer + proto.UnimplementedEventServiceServer + proto.UnimplementedSlugServer } -func (s *Server) Get(ctx context.Context, request *identifier.IdentifierRequest) (*identifier.IdentifierResponse, error) { +func (s *Server) ChangeSlug(ctx context.Context, request *proto.ChangeSlugRequest) (*proto.ChangeSlugResponse, error) { + s.NotifyAllSubscriber(&proto.Event{ + Type: proto.EventType_SLUG_CHANGE, + TimestampUnixMs: time.Now().Unix(), + Data: &proto.Event_DataEvent{DataEvent: &proto.SlugChangeEvent{ + Old: request.GetOld(), + New: request.GetNew(), + }}, + }) + return &proto.ChangeSlugResponse{}, nil +} + +func (s *Server) Subscribe(request *emptypb.Empty, g grpc.ServerStreamingServer[proto.Event]) error { + sr := make(chan *proto.Event) + s.AddSubscriberChan(sr) + defer s.RemoveSubscriberChan(sr) + + for ev := range sr { + if err := g.Send(ev); err != nil { + return err + } + } + return nil +} + +func (s *Server) NotifyAllSubscriber(event *proto.Event) { + for _, subs := range s.Subscriber { + subs <- event + } +} + +func (s *Server) AddSubscriberChan(event chan *proto.Event) { + s.mu.Lock() + defer s.mu.Unlock() + s.Subscriber = append(s.Subscriber, event) +} + +func (s *Server) RemoveSubscriberChan(ch chan *proto.Event) { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.Subscriber) == 0 || ch == nil { + return + } + newSubs := s.Subscriber[:0] + for _, c := range s.Subscriber { + if c == ch { + continue + } + newSubs = append(newSubs, c) + } + s.Subscriber = newSubs + + close(ch) +} + +func (s *Server) Get(ctx context.Context, request *proto.IdentifierRequest) (*proto.IdentifierResponse, error) { parse, err := uuid.Parse(request.GetId()) if err != nil { return nil, err @@ -30,18 +92,18 @@ func (s *Server) Get(ctx context.Context, request *identifier.IdentifierRequest) if err != nil { return nil, err } - return &identifier.IdentifierResponse{ + return &proto.IdentifierResponse{ Id: data.ID.String(), Slug: data.Slug, }, nil } -func (s *Server) Create(ctx context.Context, request *emptypb.Empty) (*identifier.IdentifierResponse, error) { +func (s *Server) Create(ctx context.Context, request *emptypb.Empty) (*proto.IdentifierResponse, error) { createIdentifier, err := s.Database.CreateIdentifier(ctx, GenerateRandomString(32)) if err != nil { return nil, err } - return &identifier.IdentifierResponse{ + return &proto.IdentifierResponse{ Id: createIdentifier.ID.String(), Slug: createIdentifier.Slug, }, nil @@ -59,7 +121,7 @@ func GenerateRandomString(length int) string { } func New(database *repository.Queries) *Server { - return &Server{Database: database} + return &Server{Database: database, Subscriber: make([]chan *proto.Event, 0)} } func (s *Server) ListenAndServe(Addr string) error { @@ -71,7 +133,14 @@ func (s *Server) ListenAndServe(Addr string) error { grpcServer := grpc.NewServer() reflection.Register(grpcServer) - identifier.RegisterIdentityServer(grpcServer, s) + proto.RegisterIdentityServer(grpcServer, s) + proto.RegisterEventServiceServer(grpcServer, s) + proto.RegisterSlugServer(grpcServer, s) + + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + if err := grpcServer.Serve(listener); err != nil { log.Fatalf("failed to serve: %v", err) }