feat: add event service and slug service

This commit is contained in:
2026-01-01 21:05:46 +07:00
parent c33ea5500e
commit 02780c66b5
2 changed files with 79 additions and 9 deletions

1
go.mod
View File

@@ -19,3 +19,4 @@ require (
golang.org/x/text v0.31.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
)
replace git.fossy.my.id/bagas/tunnel-please-grpc => ../tunnel-please-grpc

View File

@@ -6,22 +6,84 @@ import (
mathrand "math/rand"
"net"
"strings"
"sync"
"time"
"git.fossy.my.id/bagas/tunnel-please-controller/db/sqlc/repository"
identifier "git.fossy.my.id/bagas/tunnel-please-grpc/gen"
proto "git.fossy.my.id/bagas/tunnel-please-grpc/gen"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/emptypb"
)
type Server struct {
Database *repository.Queries
identifier.UnimplementedIdentityServer
Database *repository.Queries
Subscriber []chan *proto.Event
mu sync.RWMutex
proto.UnimplementedIdentityServer
proto.UnimplementedEventServiceServer
proto.UnimplementedSlugServer
}
func (s *Server) Get(ctx context.Context, request *identifier.IdentifierRequest) (*identifier.IdentifierResponse, error) {
func (s *Server) ChangeSlug(ctx context.Context, request *proto.ChangeSlugRequest) (*proto.ChangeSlugResponse, error) {
s.NotifyAllSubscriber(&proto.Event{
Type: proto.EventType_SLUG_CHANGE,
TimestampUnixMs: time.Now().Unix(),
Data: &proto.Event_DataEvent{DataEvent: &proto.SlugChangeEvent{
Old: request.GetOld(),
New: request.GetNew(),
}},
})
return &proto.ChangeSlugResponse{}, nil
}
func (s *Server) Subscribe(request *emptypb.Empty, g grpc.ServerStreamingServer[proto.Event]) error {
sr := make(chan *proto.Event)
s.AddSubscriberChan(sr)
defer s.RemoveSubscriberChan(sr)
for ev := range sr {
if err := g.Send(ev); err != nil {
return err
}
}
return nil
}
func (s *Server) NotifyAllSubscriber(event *proto.Event) {
for _, subs := range s.Subscriber {
subs <- event
}
}
func (s *Server) AddSubscriberChan(event chan *proto.Event) {
s.mu.Lock()
defer s.mu.Unlock()
s.Subscriber = append(s.Subscriber, event)
}
func (s *Server) RemoveSubscriberChan(ch chan *proto.Event) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.Subscriber) == 0 || ch == nil {
return
}
newSubs := s.Subscriber[:0]
for _, c := range s.Subscriber {
if c == ch {
continue
}
newSubs = append(newSubs, c)
}
s.Subscriber = newSubs
close(ch)
}
func (s *Server) Get(ctx context.Context, request *proto.IdentifierRequest) (*proto.IdentifierResponse, error) {
parse, err := uuid.Parse(request.GetId())
if err != nil {
return nil, err
@@ -30,18 +92,18 @@ func (s *Server) Get(ctx context.Context, request *identifier.IdentifierRequest)
if err != nil {
return nil, err
}
return &identifier.IdentifierResponse{
return &proto.IdentifierResponse{
Id: data.ID.String(),
Slug: data.Slug,
}, nil
}
func (s *Server) Create(ctx context.Context, request *emptypb.Empty) (*identifier.IdentifierResponse, error) {
func (s *Server) Create(ctx context.Context, request *emptypb.Empty) (*proto.IdentifierResponse, error) {
createIdentifier, err := s.Database.CreateIdentifier(ctx, GenerateRandomString(32))
if err != nil {
return nil, err
}
return &identifier.IdentifierResponse{
return &proto.IdentifierResponse{
Id: createIdentifier.ID.String(),
Slug: createIdentifier.Slug,
}, nil
@@ -59,7 +121,7 @@ func GenerateRandomString(length int) string {
}
func New(database *repository.Queries) *Server {
return &Server{Database: database}
return &Server{Database: database, Subscriber: make([]chan *proto.Event, 0)}
}
func (s *Server) ListenAndServe(Addr string) error {
@@ -71,7 +133,14 @@ func (s *Server) ListenAndServe(Addr string) error {
grpcServer := grpc.NewServer()
reflection.Register(grpcServer)
identifier.RegisterIdentityServer(grpcServer, s)
proto.RegisterIdentityServer(grpcServer, s)
proto.RegisterEventServiceServer(grpcServer, s)
proto.RegisterSlugServer(grpcServer, s)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
if err := grpcServer.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}