feat: implement get session by user

This commit is contained in:
2026-01-02 23:00:13 +07:00
parent f7a1044343
commit d864027c31
8 changed files with 277 additions and 198 deletions

View File

@@ -26,21 +26,96 @@ type Subscriber struct {
closeOnce sync.Once
}
type Server struct {
Database *repository.Queries
Subscribers map[string]*Subscriber
mu *sync.RWMutex
authToken string
Database *repository.Queries
Subscribers map[string]*Subscriber
mu *sync.RWMutex
authToken string
broadcastChan chan *proto.Controller
broadcastResultChan chan []SubscriberResult
notifyAllCancel context.CancelFunc
proto.UnimplementedEventServiceServer
proto.UnimplementedSlugChangeServer
proto.UnimplementedUserServiceServer
proto.UnimplementedUserSessionsServer
}
func New(database *repository.Queries, authToken string) *Server {
return &Server{
Database: database,
Subscribers: make(map[string]*Subscriber),
mu: new(sync.RWMutex),
authToken: authToken,
broadcastChan := make(chan *proto.Controller, 10)
broadcastResultChan := make(chan []SubscriberResult, 10)
ctx, cancel := context.WithCancel(context.Background())
srv := &Server{
Database: database,
Subscribers: make(map[string]*Subscriber),
mu: new(sync.RWMutex),
authToken: authToken,
broadcastChan: broadcastChan,
broadcastResultChan: broadcastResultChan,
notifyAllCancel: cancel,
}
go srv.notifyAllSubscriber(ctx, broadcastChan, broadcastResultChan)
return srv
}
func (s *Server) GetSession(ctx context.Context, req *proto.GetSessionRequest) (*proto.GetSessionsResponse, error) {
if req == nil {
return nil, status.Error(codes.InvalidArgument, "request is nil")
}
controllerReq := &proto.Controller{
Type: proto.EventType_GET_SESSIONS,
Payload: &proto.Controller_GetSessionsEvent{
GetSessionsEvent: &proto.GetSessionsEvent{
Identity: req.GetIdentity(),
},
},
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case s.broadcastChan <- controllerReq:
}
var results []SubscriberResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case results = <-s.broadcastResultChan:
}
responses := make([]*proto.Detail, 0, len(results))
for _, result := range results {
for _, detail := range result.Response.Payload.(*proto.Client_GetSessionsEvent).GetSessionsEvent.Details {
responses = append(responses, detail)
}
responses = append(responses)
}
if len(responses) == 0 {
return nil, status.Error(codes.NotFound, "no subscriber responded")
}
return &proto.GetSessionsResponse{Details: responses}, nil
}
func (s *Server) Check(ctx context.Context, request *proto.CheckRequest) (*proto.CheckResponse, error) {
exist, err := s.Database.UserExistsByIdentifier(ctx, request.GetAuthToken())
if err != nil {
return nil, err
}
if exist {
return &proto.CheckResponse{
Response: proto.AuthorizationResponse_MESSAGE_TYPE_AUTHORIZED,
}, nil
}
return &proto.CheckResponse{
Response: proto.AuthorizationResponse_MESSAGE_TYPE_UNAUTHORIZED,
}, nil
}
func (s *Server) Subscribe(event grpc.BidiStreamingServer[proto.Client, proto.Controller]) error {
@@ -124,6 +199,21 @@ func processEventStream(ctx context.Context, requestChan *Subscriber, event grpc
case requestChan.client <- recv:
}
log.Printf("Received slug change event: %v", recv)
case proto.EventType_GET_SESSIONS:
log.Printf("Processing session event")
if err := event.Send(request); err != nil {
return err
}
recv, err := event.Recv()
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case requestChan.client <- recv:
}
log.Printf("Received SESSIONS event: %v", recv)
default:
log.Printf("Unknown event type: %v", request.GetType())
}
@@ -220,6 +310,66 @@ func (s *Server) RequestChangeSlug(ctx context.Context, request *proto.ChangeSlu
return (*proto.ChangeSlugResponse)(response.SlugEventResponse), nil
}
type SubscriberResult struct {
Identity string
Response *proto.Client
Err error
}
func (s *Server) notifyAllSubscriber(ctx context.Context, recvChan <-chan *proto.Controller, resultChan chan<- []SubscriberResult) {
for {
select {
case <-ctx.Done():
return
case controllerReq, ok := <-recvChan:
if !ok {
return
}
if controllerReq == nil {
continue
}
s.mu.RLock()
subs := make(map[string]*Subscriber, len(s.Subscribers))
for id, sub := range s.Subscribers {
subs[id] = sub
}
s.mu.RUnlock()
results := make([]SubscriberResult, 0, len(subs))
for id, sub := range subs {
select {
case <-ctx.Done():
return
case <-sub.done:
results = append(results, SubscriberResult{Identity: id, Err: status.Error(codes.Canceled, "subscriber removed")})
continue
case sub.controller <- controllerReq:
default:
results = append(results, SubscriberResult{Identity: id, Err: status.Error(codes.Unavailable, "controller channel blocked")})
continue
}
select {
case <-ctx.Done():
return
case <-sub.done:
results = append(results, SubscriberResult{Identity: id, Err: status.Error(codes.Canceled, "subscriber removed")})
case resp, ok := <-sub.client:
if !ok {
results = append(results, SubscriberResult{Identity: id, Err: status.Error(codes.FailedPrecondition, "client channel closed")})
continue
}
results = append(results, SubscriberResult{Identity: id, Response: resp})
}
}
select {
case <-ctx.Done():
return
case resultChan <- results:
}
}
}
}
func (s *Server) ListenAndServe(Addr string) error {
listener, err := net.Listen("tcp", Addr)
if err != nil {
@@ -246,6 +396,8 @@ func (s *Server) ListenAndServe(Addr string) error {
proto.RegisterSlugChangeServer(grpcServer, s)
proto.RegisterEventServiceServer(grpcServer, s)
proto.RegisterUserServiceServer(grpcServer, s)
proto.RegisterUserSessionsServer(grpcServer, s)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)