12 Commits

Author SHA1 Message Date
6213ff8a30 feat: implement forwarder session termination
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Successful in 3m36s
2026-01-06 18:32:48 +07:00
4ffaec9d9a refactor: inject SessionRegistry interface instead of individual functions
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Successful in 4m16s
2026-01-05 16:49:17 +07:00
6de0a618ee update: proto file to v1.3.0
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Successful in 4m0s
2026-01-05 00:55:51 +07:00
8cc70fa45e feat(session): use session key for registry 2026-01-05 00:50:42 +07:00
d666ae5545 fix: use correct environment variable key
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Successful in 4m1s
2026-01-04 18:21:34 +07:00
5edb3c8086 fix: startup order
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Successful in 3m51s
2026-01-04 15:19:03 +07:00
5b603d8317 feat: implement sessions request from grpc server
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Successful in 4m7s
2026-01-03 21:17:01 +07:00
8fd9f8b567 feat: implement sessions request from grpc server
Docker Build and Push / build-and-push-branches (push) Has been skipped
Docker Build and Push / build-and-push-tags (push) Has been cancelled
2026-01-03 20:06:14 +07:00
30e84ac3b7 feat: implement get sessions by user 2026-01-02 22:58:54 +07:00
fd6ffc2500 feat(grpc): integrate slug edit handling 2026-01-02 18:27:48 +07:00
e1cd4ed981 WIP: gRPC integration, initial implementation 2026-01-01 21:03:17 +07:00
96d2b88f95 WIP: gRPC integration, initial implementation 2026-01-01 21:01:15 +07:00
16 changed files with 603 additions and 560 deletions
+21
View File
@@ -0,0 +1,21 @@
name: renovate
on:
schedule:
- cron: "0 0 * * *"
push:
branches:
- staging
jobs:
renovate:
runs-on: ubuntu-latest
container: git.fossy.my.id/renovate-clanker/renovate:latest
steps:
- uses: actions/checkout@v6
- run: renovate
env:
RENOVATE_CONFIG_FILE: ${{ gitea.workspace }}/renovate-config.js
LOG_LEVEL: "debug"
RENOVATE_TOKEN: ${{ secrets.RENOVATE_TOKEN }}
GITHUB_COM_TOKEN: ${{ secrets.COM_TOKEN }}
+2 -2
View File
@@ -4,7 +4,7 @@ go 1.25.5
require (
git.fossy.my.id/bagas/tunnel-please-grpc v1.5.0
github.com/caddyserver/certmagic v0.25.1
github.com/caddyserver/certmagic v0.25.0
github.com/charmbracelet/bubbles v0.21.0
github.com/charmbracelet/bubbletea v1.3.10
github.com/charmbracelet/lipgloss v1.1.0
@@ -19,7 +19,7 @@ require (
require (
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/caddyserver/zerossl v0.1.4 // indirect
github.com/caddyserver/zerossl v0.1.3 // indirect
github.com/charmbracelet/colorprofile v0.4.1 // indirect
github.com/charmbracelet/x/ansi v0.11.3 // indirect
github.com/charmbracelet/x/cellbuf v0.0.14 // indirect
-4
View File
@@ -12,12 +12,8 @@ github.com/aymanbagabas/go-udiff v0.2.0 h1:TK0fH4MteXUDspT88n8CKzvK0X9O2xu9yQjWp
github.com/aymanbagabas/go-udiff v0.2.0/go.mod h1:RE4Ex0qsGkTAJoQdQQCA0uG+nAzJO/pI/QwceO5fgrA=
github.com/caddyserver/certmagic v0.25.0 h1:VMleO/XA48gEWes5l+Fh6tRWo9bHkhwAEhx63i+F5ic=
github.com/caddyserver/certmagic v0.25.0/go.mod h1:m9yB7Mud24OQbPHOiipAoyKPn9pKHhpSJxXR1jydBxA=
github.com/caddyserver/certmagic v0.25.1 h1:4sIKKbOt5pg6+sL7tEwymE1x2bj6CHr80da1CRRIPbY=
github.com/caddyserver/certmagic v0.25.1/go.mod h1:VhyvndxtVton/Fo/wKhRoC46Rbw1fmjvQ3GjHYSQTEY=
github.com/caddyserver/zerossl v0.1.3 h1:onS+pxp3M8HnHpN5MMbOMyNjmTheJyWRaZYwn+YTAyA=
github.com/caddyserver/zerossl v0.1.3/go.mod h1:CxA0acn7oEGO6//4rtrRjYgEoa4MFw/XofZnrYwGqG4=
github.com/caddyserver/zerossl v0.1.4 h1:CVJOE3MZeFisCERZjkxIcsqIH4fnFdlYWnPYeFtBHRw=
github.com/caddyserver/zerossl v0.1.4/go.mod h1:CxA0acn7oEGO6//4rtrRjYgEoa4MFw/XofZnrYwGqG4=
github.com/charmbracelet/bubbles v0.21.0 h1:9TdC97SdRVg/1aaXNVWfFH3nnLAwOXr8Fn6u6mfQdFs=
github.com/charmbracelet/bubbles v0.21.0/go.mod h1:HF+v6QUR4HkEpz62dx7ym2xc71/KBHg+zKwJtMw+qtg=
github.com/charmbracelet/bubbletea v1.3.10 h1:otUDHWMMzQSB0Pkc87rm691KZ3SWa4KUlvF9nRvCICw=
+191 -138
View File
@@ -210,152 +210,205 @@ func (c *Client) SubscribeEvents(ctx context.Context, identity, authToken string
}
func (c *Client) processEventStream(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events]) error {
handlers := c.eventHandlers(subscribe)
for {
recv, err := subscribe.Recv()
if err != nil {
return err
}
switch recv.GetType() {
case proto.EventType_SLUG_CHANGE:
user := recv.GetSlugEvent().GetUser()
oldSlug := recv.GetSlugEvent().GetOld()
newSlug := recv.GetSlugEvent().GetNew()
var userSession *session.SSHSession
userSession, err = c.sessionRegistry.Get(types.SessionKey{
Id: oldSlug,
Type: types.HTTP,
})
if err != nil {
errSend := subscribe.Send(&proto.Node{
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
Payload: &proto.Node_SlugEventResponse{
SlugEventResponse: &proto.SlugChangeEventResponse{
Success: false,
Message: err.Error(),
},
},
})
if errSend != nil {
if c.isConnectionError(errSend) {
return errSend
}
log.Printf("non-connection send error for slug change failure: %v", errSend)
}
continue
}
err = c.sessionRegistry.Update(user, types.SessionKey{
Id: oldSlug,
Type: types.HTTP,
}, types.SessionKey{
Id: newSlug,
Type: types.HTTP,
})
if err != nil {
errSend := subscribe.Send(&proto.Node{
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
Payload: &proto.Node_SlugEventResponse{
SlugEventResponse: &proto.SlugChangeEventResponse{
Success: false,
Message: err.Error(),
},
},
})
if errSend != nil {
if c.isConnectionError(errSend) {
return errSend
}
log.Printf("non-connection send error for slug change failure: %v", errSend)
}
continue
}
userSession.GetInteraction().Redraw()
err = subscribe.Send(&proto.Node{
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
Payload: &proto.Node_SlugEventResponse{
SlugEventResponse: &proto.SlugChangeEventResponse{
Success: true,
Message: "",
},
},
})
if err != nil {
if c.isConnectionError(err) {
log.Printf("connection error sending slug change success: %v", err)
return err
}
log.Printf("non-connection send error for slug change success: %v", err)
continue
}
case proto.EventType_GET_SESSIONS:
sessions := c.sessionRegistry.GetAllSessionFromUser(recv.GetGetSessionsEvent().GetIdentity())
var details []*proto.Detail
for _, ses := range sessions {
detail := ses.Detail()
details = append(details, &proto.Detail{
Node: config.Getenv("DOMAIN", "localhost"),
ForwardingType: detail.ForwardingType,
Slug: detail.Slug,
UserId: detail.UserID,
Active: detail.Active,
StartedAt: timestamppb.New(detail.StartedAt),
})
}
err = subscribe.Send(&proto.Node{
Type: proto.EventType_GET_SESSIONS,
Payload: &proto.Node_GetSessionsEvent{
GetSessionsEvent: &proto.GetSessionsResponse{
Details: details,
},
},
})
if err != nil {
if c.isConnectionError(err) {
log.Printf("connection error sending sessions success: %v", err)
return err
}
log.Printf("non-connection send error for sessions success: %v", err)
continue
}
case proto.EventType_TERMINATE_SESSION:
user := recv.GetTerminateSessionEvent().GetUser()
tunnelTypeRaw := recv.GetTerminateSessionEvent().GetTunnelType()
slug := recv.GetTerminateSessionEvent().GetSlug()
handler, ok := handlers[recv.GetType()]
if !ok {
var userSession *session.SSHSession
var tunnelType types.TunnelType
if tunnelTypeRaw == proto.TunnelType_HTTP {
tunnelType = types.HTTP
} else if tunnelTypeRaw == proto.TunnelType_TCP {
tunnelType = types.TCP
} else {
err = subscribe.Send(&proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{
Success: false,
Message: "unknown tunnel type recived",
},
},
})
if err != nil {
if c.isConnectionError(err) {
log.Printf("connection error sending sessions success: %v", err)
return err
}
log.Printf("non-connection send error for sessions success: %v", err)
}
continue
}
userSession, err = c.sessionRegistry.GetWithUser(user, types.SessionKey{
Id: slug,
Type: tunnelType,
})
if err != nil {
err = subscribe.Send(&proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{
Success: false,
Message: err.Error(),
},
},
})
if err != nil {
if c.isConnectionError(err) {
log.Printf("connection error sending sessions success: %v", err)
return err
}
log.Printf("non-connection send error for sessions success: %v", err)
}
continue
}
err = userSession.GetLifecycle().Close()
if err != nil {
err = subscribe.Send(&proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{
Success: false,
Message: err.Error(),
},
},
})
if err != nil {
if c.isConnectionError(err) {
log.Printf("connection error sending sessions success: %v", err)
return err
}
log.Printf("non-connection send error for sessions success: %v", err)
}
continue
}
err = subscribe.Send(&proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{
Success: true,
Message: "",
},
},
})
if err != nil {
if c.isConnectionError(err) {
log.Printf("connection error sending sessions success: %v", err)
return err
}
log.Printf("non-connection send error for sessions success: %v", err)
continue
}
default:
log.Printf("Unknown event type received: %v", recv.GetType())
continue
}
if err = handler(recv); err != nil {
return err
}
}
}
func (c *Client) eventHandlers(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events]) map[proto.EventType]func(*proto.Events) error {
return map[proto.EventType]func(*proto.Events) error{
proto.EventType_SLUG_CHANGE: func(evt *proto.Events) error { return c.handleSlugChange(subscribe, evt) },
proto.EventType_GET_SESSIONS: func(evt *proto.Events) error { return c.handleGetSessions(subscribe, evt) },
proto.EventType_TERMINATE_SESSION: func(evt *proto.Events) error { return c.handleTerminateSession(subscribe, evt) },
}
}
func (c *Client) handleSlugChange(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
slugEvent := evt.GetSlugEvent()
user := slugEvent.GetUser()
oldSlug := slugEvent.GetOld()
newSlug := slugEvent.GetNew()
userSession, err := c.sessionRegistry.Get(types.SessionKey{Id: oldSlug, Type: types.HTTP})
if err != nil {
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
Payload: &proto.Node_SlugEventResponse{
SlugEventResponse: &proto.SlugChangeEventResponse{Success: false, Message: err.Error()},
},
}, "slug change failure response")
}
if err = c.sessionRegistry.Update(user, types.SessionKey{Id: oldSlug, Type: types.HTTP}, types.SessionKey{Id: newSlug, Type: types.HTTP}); err != nil {
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
Payload: &proto.Node_SlugEventResponse{
SlugEventResponse: &proto.SlugChangeEventResponse{Success: false, Message: err.Error()},
},
}, "slug change failure response")
}
userSession.Interaction().Redraw()
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_SLUG_CHANGE_RESPONSE,
Payload: &proto.Node_SlugEventResponse{
SlugEventResponse: &proto.SlugChangeEventResponse{Success: true, Message: ""},
},
}, "slug change success response")
}
func (c *Client) handleGetSessions(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
sessions := c.sessionRegistry.GetAllSessionFromUser(evt.GetGetSessionsEvent().GetIdentity())
var details []*proto.Detail
for _, ses := range sessions {
detail := ses.Detail()
details = append(details, &proto.Detail{
Node: config.Getenv("DOMAIN", "localhost"),
ForwardingType: detail.ForwardingType,
Slug: detail.Slug,
UserId: detail.UserID,
Active: detail.Active,
StartedAt: timestamppb.New(detail.StartedAt),
})
}
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_GET_SESSIONS,
Payload: &proto.Node_GetSessionsEvent{
GetSessionsEvent: &proto.GetSessionsResponse{Details: details},
},
}, "send get sessions response")
}
func (c *Client) handleTerminateSession(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], evt *proto.Events) error {
terminate := evt.GetTerminateSessionEvent()
user := terminate.GetUser()
slug := terminate.GetSlug()
tunnelType, err := c.protoToTunnelType(terminate.GetTunnelType())
if err != nil {
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()},
},
}, "terminate session invalid tunnel type")
}
userSession, err := c.sessionRegistry.GetWithUser(user, types.SessionKey{Id: slug, Type: tunnelType})
if err != nil {
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()},
},
}, "terminate session fetch failed")
}
if err = userSession.Lifecycle().Close(); err != nil {
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: false, Message: err.Error()},
},
}, "terminate session close failed")
}
return c.sendNode(subscribe, &proto.Node{
Type: proto.EventType_TERMINATE_SESSION,
Payload: &proto.Node_TerminateSessionEventResponse{
TerminateSessionEventResponse: &proto.TerminateSessionEventResponse{Success: true, Message: ""},
},
}, "terminate session success response")
}
func (c *Client) sendNode(subscribe grpc.BidiStreamingClient[proto.Node, proto.Events], node *proto.Node, context string) error {
if err := subscribe.Send(node); err != nil {
if c.isConnectionError(err) {
return err
}
log.Printf("%s: %v", context, err)
}
return nil
}
func (c *Client) protoToTunnelType(t proto.TunnelType) (types.TunnelType, error) {
switch t {
case proto.TunnelType_HTTP:
return types.HTTP, nil
case proto.TunnelType_TCP:
return types.TCP, nil
default:
return types.UNKNOWN, fmt.Errorf("unknown tunnel type received")
}
}
+6 -16
View File
@@ -13,7 +13,7 @@ type Manager interface {
AddPortRange(startPort, endPort uint16) error
GetUnassignedPort() (uint16, bool)
SetPortStatus(port uint16, assigned bool) error
ClaimPort(port uint16) (claimed bool)
GetPortStatus(port uint16) (bool, bool)
}
type manager struct {
@@ -74,6 +74,7 @@ func (pm *manager) GetUnassignedPort() (uint16, bool) {
for _, port := range pm.sortedPorts {
if !pm.ports[port] {
pm.ports[port] = true
return port, true
}
}
@@ -88,21 +89,10 @@ func (pm *manager) SetPortStatus(port uint16, assigned bool) error {
return nil
}
func (pm *manager) ClaimPort(port uint16) (claimed bool) {
pm.mu.Lock()
defer pm.mu.Unlock()
func (pm *manager) GetPortStatus(port uint16) (bool, bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
status, exists := pm.ports[port]
if exists && status {
return false
}
if !exists {
pm.ports[port] = true
return true
}
pm.ports[port] = true
return true
return status, exists
}
+8
View File
@@ -0,0 +1,8 @@
module.exports = {
"endpoint": "https://git.fossy.my.id/api/v1",
"gitAuthor": "Renovate-Clanker <renovate-bot@fossy.my.id>",
"platform": "gitea",
"onboardingConfigFileName": "renovate.json",
"autodiscover": true,
"optimizeForDisabled": true,
};
+6 -6
View File
@@ -335,8 +335,8 @@ func (hs *httpServer) handler(conn net.Conn) {
return
}
func forwardRequest(cw HTTPWriter, initialRequest RequestHeaderManager, sshSession session.Session) {
payload := sshSession.Forwarder().CreateForwardedTCPIPPayload(cw.GetRemoteAddr())
func forwardRequest(cw HTTPWriter, initialRequest RequestHeaderManager, sshSession *session.SSHSession) {
payload := sshSession.GetForwarder().CreateForwardedTCPIPPayload(cw.GetRemoteAddr())
type channelResult struct {
channel ssh.Channel
@@ -346,7 +346,7 @@ func forwardRequest(cw HTTPWriter, initialRequest RequestHeaderManager, sshSessi
resultChan := make(chan channelResult, 1)
go func() {
channel, reqs, err := sshSession.Lifecycle().Connection().OpenChannel("forwarded-tcpip", payload)
channel, reqs, err := sshSession.GetLifecycle().GetConnection().OpenChannel("forwarded-tcpip", payload)
resultChan <- channelResult{channel, reqs, err}
}()
@@ -357,14 +357,14 @@ func forwardRequest(cw HTTPWriter, initialRequest RequestHeaderManager, sshSessi
case result := <-resultChan:
if result.err != nil {
log.Printf("Failed to open forwarded-tcpip channel: %v", result.err)
sshSession.Forwarder().WriteBadGatewayResponse(cw.GetWriter())
sshSession.GetForwarder().WriteBadGatewayResponse(cw.GetWriter())
return
}
channel = result.channel
reqs = result.reqs
case <-time.After(5 * time.Second):
log.Printf("Timeout opening forwarded-tcpip channel")
sshSession.Forwarder().WriteBadGatewayResponse(cw.GetWriter())
sshSession.GetForwarder().WriteBadGatewayResponse(cw.GetWriter())
return
}
@@ -390,6 +390,6 @@ func forwardRequest(cw HTTPWriter, initialRequest RequestHeaderManager, sshSessi
return
}
sshSession.Forwarder().HandleConnection(cw, channel, cw.GetRemoteAddr())
sshSession.GetForwarder().HandleConnection(cw, channel, cw.GetRemoteAddr())
return
}
+10 -13
View File
@@ -2,11 +2,9 @@ package server
import (
"context"
"errors"
"fmt"
"log"
"net"
"time"
"tunnel_pls/internal/config"
"tunnel_pls/internal/grpc/client"
"tunnel_pls/session"
@@ -66,31 +64,30 @@ func (s *Server) Start() {
func (s *Server) handleConnection(conn net.Conn) {
sshConn, chans, forwardingReqs, err := ssh.NewServerConn(conn, s.config)
defer func(sshConn *ssh.ServerConn) {
err = sshConn.Close()
if err != nil {
log.Printf("failed to close SSH server: %v", err)
}
}(sshConn)
if err != nil {
log.Printf("failed to establish SSH connection: %v", err)
err = conn.Close()
err := conn.Close()
if err != nil {
log.Printf("failed to close SSH connection: %v", err)
return
}
return
}
defer func(sshConn *ssh.ServerConn) {
err = sshConn.Close()
if err != nil && !errors.Is(err, net.ErrClosed) {
log.Printf("failed to close SSH server: %v", err)
}
}(sshConn)
ctx := context.Background()
log.Println("SSH connection established:", sshConn.User())
user := "UNAUTHORIZED"
if s.grpcClient != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
_, u, _ := s.grpcClient.AuthorizeConn(ctx, sshConn.User())
user = u
cancel()
}
log.Println("SSH connection established:", sshConn.User())
sshSession := session.New(sshConn, forwardingReqs, chans, s.sessionRegistry, user)
err = sshSession.Start()
if err != nil {
+30 -30
View File
@@ -30,50 +30,50 @@ func copyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
return io.CopyBuffer(dst, src, buf)
}
type forwarder struct {
type Forwarder struct {
listener net.Listener
tunnelType types.TunnelType
forwardedPort uint16
slug slug.Slug
slugManager slug.Manager
lifecycle Lifecycle
}
func New(slug slug.Slug) Forwarder {
return &forwarder{
func NewForwarder(slugManager slug.Manager) *Forwarder {
return &Forwarder{
listener: nil,
tunnelType: types.UNKNOWN,
tunnelType: "",
forwardedPort: 0,
slug: slug,
slugManager: slugManager,
lifecycle: nil,
}
}
type Lifecycle interface {
Connection() ssh.Conn
GetConnection() ssh.Conn
}
type Forwarder interface {
type ForwardingController interface {
AcceptTCPConnections()
SetType(tunnelType types.TunnelType)
SetLifecycle(lifecycle Lifecycle)
GetTunnelType() types.TunnelType
GetForwardedPort() uint16
SetForwardedPort(port uint16)
SetListener(listener net.Listener)
Listener() net.Listener
TunnelType() types.TunnelType
ForwardedPort() uint16
GetListener() net.Listener
Close() error
HandleConnection(dst io.ReadWriter, src ssh.Channel, remoteAddr net.Addr)
SetLifecycle(lifecycle Lifecycle)
CreateForwardedTCPIPPayload(origin net.Addr) []byte
WriteBadGatewayResponse(dst io.Writer)
AcceptTCPConnections()
Close() error
}
func (f *forwarder) SetLifecycle(lifecycle Lifecycle) {
func (f *Forwarder) SetLifecycle(lifecycle Lifecycle) {
f.lifecycle = lifecycle
}
func (f *forwarder) AcceptTCPConnections() {
func (f *Forwarder) AcceptTCPConnections() {
for {
conn, err := f.Listener().Accept()
conn, err := f.GetListener().Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
@@ -100,7 +100,7 @@ func (f *forwarder) AcceptTCPConnections() {
resultChan := make(chan channelResult, 1)
go func() {
channel, reqs, err := f.lifecycle.Connection().OpenChannel("forwarded-tcpip", payload)
channel, reqs, err := f.lifecycle.GetConnection().OpenChannel("forwarded-tcpip", payload)
resultChan <- channelResult{channel, reqs, err}
}()
@@ -130,7 +130,7 @@ func (f *forwarder) AcceptTCPConnections() {
}
}
func (f *forwarder) HandleConnection(dst io.ReadWriter, src ssh.Channel, remoteAddr net.Addr) {
func (f *Forwarder) HandleConnection(dst io.ReadWriter, src ssh.Channel, remoteAddr net.Addr) {
defer func() {
_, err := io.Copy(io.Discard, src)
if err != nil {
@@ -174,31 +174,31 @@ func (f *forwarder) HandleConnection(dst io.ReadWriter, src ssh.Channel, remoteA
wg.Wait()
}
func (f *forwarder) SetType(tunnelType types.TunnelType) {
func (f *Forwarder) SetType(tunnelType types.TunnelType) {
f.tunnelType = tunnelType
}
func (f *forwarder) TunnelType() types.TunnelType {
func (f *Forwarder) GetTunnelType() types.TunnelType {
return f.tunnelType
}
func (f *forwarder) ForwardedPort() uint16 {
func (f *Forwarder) GetForwardedPort() uint16 {
return f.forwardedPort
}
func (f *forwarder) SetForwardedPort(port uint16) {
func (f *Forwarder) SetForwardedPort(port uint16) {
f.forwardedPort = port
}
func (f *forwarder) SetListener(listener net.Listener) {
func (f *Forwarder) SetListener(listener net.Listener) {
f.listener = listener
}
func (f *forwarder) Listener() net.Listener {
func (f *Forwarder) GetListener() net.Listener {
return f.listener
}
func (f *forwarder) WriteBadGatewayResponse(dst io.Writer) {
func (f *Forwarder) WriteBadGatewayResponse(dst io.Writer) {
_, err := dst.Write(types.BadGatewayResponse)
if err != nil {
log.Printf("failed to write Bad Gateway response: %v", err)
@@ -206,20 +206,20 @@ func (f *forwarder) WriteBadGatewayResponse(dst io.Writer) {
}
}
func (f *forwarder) Close() error {
if f.Listener() != nil {
func (f *Forwarder) Close() error {
if f.GetListener() != nil {
return f.listener.Close()
}
return nil
}
func (f *forwarder) CreateForwardedTCPIPPayload(origin net.Addr) []byte {
func (f *Forwarder) CreateForwardedTCPIPPayload(origin net.Addr) []byte {
var buf bytes.Buffer
host, originPort := parseAddr(origin.String())
writeSSHString(&buf, "localhost")
err := binary.Write(&buf, binary.BigEndian, uint32(f.ForwardedPort()))
err := binary.Write(&buf, binary.BigEndian, uint32(f.GetForwardedPort()))
if err != nil {
log.Printf("Failed to write string to buffer: %v", err)
return nil
+150 -89
View File
@@ -15,7 +15,7 @@ import (
var blockedReservedPorts = []uint16{1080, 1433, 1521, 1900, 2049, 3306, 3389, 5432, 5900, 6379, 8080, 8443, 9000, 9200, 27017}
func (s *session) HandleGlobalRequest(GlobalRequest <-chan *ssh.Request) {
func (s *SSHSession) HandleGlobalRequest(GlobalRequest <-chan *ssh.Request) {
for req := range GlobalRequest {
switch req.Type {
case "shell", "pty-req":
@@ -56,172 +56,233 @@ func (s *session) HandleGlobalRequest(GlobalRequest <-chan *ssh.Request) {
}
}
func (s *session) HandleTCPIPForward(req *ssh.Request) {
func (s *SSHSession) HandleTCPIPForward(req *ssh.Request) {
log.Println("Port forwarding request detected")
fail := func(msg string) {
log.Println(msg)
if err := req.Reply(false, nil); err != nil {
log.Println("Failed to reply to request:", err)
return
}
if err := s.lifecycle.Close(); err != nil {
log.Printf("failed to close session: %v", err)
}
}
reader := bytes.NewReader(req.Payload)
addr, err := readSSHString(reader)
if err != nil {
fail(fmt.Sprintf("Failed to read address from payload: %v", err))
log.Println("Failed to read address from payload:", err)
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
var rawPortToBind uint32
if err = binary.Read(reader, binary.BigEndian, &rawPortToBind); err != nil {
fail(fmt.Sprintf("Failed to read port from payload: %v", err))
if err := binary.Read(reader, binary.BigEndian, &rawPortToBind); err != nil {
log.Println("Failed to read port from payload:", err)
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
if rawPortToBind > 65535 {
fail(fmt.Sprintf("Port %d is larger than allowed port of 65535", rawPortToBind))
log.Printf("Port %d is larger than allowed port of 65535", rawPortToBind)
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
portToBind := uint16(rawPortToBind)
if isBlockedPort(portToBind) {
fail(fmt.Sprintf("Port %d is blocked or restricted", portToBind))
log.Printf("Port %d is blocked or restricted", portToBind)
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
switch portToBind {
case 80, 443:
if portToBind == 80 || portToBind == 443 {
s.HandleHTTPForward(req, portToBind)
default:
s.HandleTCPForward(req, addr, portToBind)
return
}
if portToBind == 0 {
unassign, success := portUtil.Default.GetUnassignedPort()
portToBind = unassign
if !success {
log.Println("No available port")
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
} else if isUse, isExist := portUtil.Default.GetPortStatus(portToBind); isExist && isUse {
log.Printf("Port %d is already in use or restricted", portToBind)
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
err = portUtil.Default.SetPortStatus(portToBind, true)
if err != nil {
log.Println("Failed to set port status:", err)
return
}
s.HandleTCPForward(req, addr, portToBind)
}
func (s *session) HandleHTTPForward(req *ssh.Request, portToBind uint16) {
fail := func(msg string, key *types.SessionKey) {
log.Println(msg)
if key != nil {
s.registry.Remove(*key)
}
if err := req.Reply(false, nil); err != nil {
log.Println("Failed to reply to request:", err)
}
}
func (s *SSHSession) HandleHTTPForward(req *ssh.Request, portToBind uint16) {
slug := random.GenerateRandomString(20)
key := types.SessionKey{Id: slug, Type: types.HTTP}
if !s.registry.Register(key, s) {
fail(fmt.Sprintf("Failed to register client with slug: %s", slug), nil)
log.Printf("Failed to register client with slug: %s", slug)
err := req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
}
return
}
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.BigEndian, uint32(portToBind))
if err != nil {
fail(fmt.Sprintf("Failed to write port to buffer: %v", err), &key)
log.Println("Failed to write port to buffer:", err)
s.registry.Remove(key)
err = req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
}
return
}
log.Printf("HTTP forwarding approved on port: %d", portToBind)
err = req.Reply(true, buf.Bytes())
if err != nil {
fail(fmt.Sprintf("Failed to reply to request: %v", err), &key)
log.Println("Failed to reply to request:", err)
s.registry.Remove(key)
err = req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
}
return
}
s.forwarder.SetType(types.HTTP)
s.forwarder.SetForwardedPort(portToBind)
s.slug.Set(slug)
s.slugManager.Set(slug)
s.lifecycle.SetStatus(types.RUNNING)
s.interaction.Start()
}
func (s *session) HandleTCPForward(req *ssh.Request, addr string, portToBind uint16) {
fail := func(msg string) {
log.Println(msg)
if err := req.Reply(false, nil); err != nil {
log.Println("Failed to reply to request:", err)
return
}
if err := s.lifecycle.Close(); err != nil {
log.Printf("failed to close session: %v", err)
}
}
cleanup := func(msg string, port uint16, listener net.Listener, key *types.SessionKey) {
log.Println(msg)
if key != nil {
s.registry.Remove(*key)
}
if port != 0 {
if setErr := portUtil.Default.SetPortStatus(port, false); setErr != nil {
log.Printf("Failed to reset port status: %v", setErr)
}
}
if listener != nil {
if closeErr := listener.Close(); closeErr != nil {
log.Printf("Failed to close listener: %v", closeErr)
}
}
if err := req.Reply(false, nil); err != nil {
log.Println("Failed to reply to request:", err)
}
_ = s.lifecycle.Close()
}
if portToBind == 0 {
unassigned, ok := portUtil.Default.GetUnassignedPort()
if !ok {
fail("No available port")
return
}
portToBind = unassigned
}
if claimed := portUtil.Default.ClaimPort(portToBind); !claimed {
fail(fmt.Sprintf("Port %d is already in use or restricted", portToBind))
return
}
func (s *SSHSession) HandleTCPForward(req *ssh.Request, addr string, portToBind uint16) {
log.Printf("Requested forwarding on %s:%d", addr, portToBind)
listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", portToBind))
if err != nil {
cleanup(fmt.Sprintf("Port %d is already in use or restricted", portToBind), portToBind, nil, nil)
log.Printf("Port %d is already in use or restricted", portToBind)
if setErr := portUtil.Default.SetPortStatus(portToBind, false); setErr != nil {
log.Printf("Failed to reset port status: %v", setErr)
}
err = req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
return
}
err = s.lifecycle.Close()
if err != nil {
log.Printf("failed to close session: %v", err)
}
return
}
key := types.SessionKey{Id: fmt.Sprintf("%d", portToBind), Type: types.TCP}
if !s.registry.Register(key, s) {
cleanup(fmt.Sprintf("Failed to register TCP client with id: %s", key.Id), portToBind, listener, nil)
log.Printf("Failed to register TCP client with id: %s", key.Id)
if setErr := portUtil.Default.SetPortStatus(portToBind, false); setErr != nil {
log.Printf("Failed to reset port status: %v", setErr)
}
if closeErr := listener.Close(); closeErr != nil {
log.Printf("Failed to close listener: %s", closeErr)
}
err = req.Reply(false, nil)
if err != nil {
log.Println("Failed to reply to request:", err)
}
_ = s.lifecycle.Close()
return
}
buf := new(bytes.Buffer)
err = binary.Write(buf, binary.BigEndian, uint32(portToBind))
if err != nil {
cleanup(fmt.Sprintf("Failed to write port to buffer: %v", err), portToBind, listener, &key)
log.Println("Failed to write port to buffer:", err)
s.registry.Remove(key)
if setErr := portUtil.Default.SetPortStatus(portToBind, false); setErr != nil {
log.Printf("Failed to reset port status: %v", setErr)
}
err = listener.Close()
if err != nil {
log.Printf("Failed to close listener: %s", err)
return
}
return
}
log.Printf("TCP forwarding approved on port: %d", portToBind)
err = req.Reply(true, buf.Bytes())
if err != nil {
cleanup(fmt.Sprintf("Failed to reply to request: %v", err), portToBind, listener, &key)
log.Println("Failed to reply to request:", err)
s.registry.Remove(key)
if setErr := portUtil.Default.SetPortStatus(portToBind, false); setErr != nil {
log.Printf("Failed to reset port status: %v", setErr)
}
err = listener.Close()
if err != nil {
log.Printf("Failed to close listener: %s", err)
return
}
return
}
s.forwarder.SetType(types.TCP)
s.forwarder.SetListener(listener)
s.forwarder.SetForwardedPort(portToBind)
s.slug.Set(key.Id)
s.slugManager.Set(key.Id)
s.lifecycle.SetStatus(types.RUNNING)
go s.forwarder.AcceptTCPConnections()
s.interaction.Start()
}
func readSSHString(reader *bytes.Reader) (string, error) {
+36 -68
View File
@@ -23,59 +23,40 @@ import (
type Lifecycle interface {
Close() error
User() string
GetUser() string
}
type SessionRegistry interface {
Update(user string, oldKey, newKey types.SessionKey) error
}
type Interaction interface {
Mode() types.Mode
type Controller interface {
SetChannel(channel ssh.Channel)
SetLifecycle(lifecycle Lifecycle)
SetSessionRegistry(registry SessionRegistry)
SetMode(m types.Mode)
SetWH(w, h int)
Start()
SetWH(w, h int)
Redraw()
Send(message string) error
SetSessionRegistry(registry SessionRegistry)
}
type Forwarder interface {
Close() error
TunnelType() types.TunnelType
ForwardedPort() uint16
GetTunnelType() types.TunnelType
GetForwardedPort() uint16
}
type interaction struct {
type Interaction struct {
channel ssh.Channel
slug slug.Slug
slugManager slug.Manager
forwarder Forwarder
lifecycle Lifecycle
sessionRegistry SessionRegistry
program *tea.Program
ctx context.Context
cancel context.CancelFunc
mode types.Mode
}
func (i *interaction) SetMode(m types.Mode) {
i.mode = m
}
func (i *interaction) Mode() types.Mode {
return i.mode
}
func (i *interaction) Send(message string) error {
if i.channel != nil {
_, err := i.channel.Write([]byte(message))
return err
}
return nil
}
func (i *interaction) SetWH(w, h int) {
func (i *Interaction) SetWH(w, h int) {
if i.program != nil {
i.program.Send(tea.WindowSizeMsg{
Width: w,
@@ -103,14 +84,14 @@ type model struct {
commandList list.Model
slugInput textinput.Model
slugError string
interaction *interaction
interaction *Interaction
width int
height int
}
func (m *model) getTunnelURL() string {
if m.tunnelType == types.HTTP {
return buildURL(m.protocol, m.interaction.slug.String(), m.domain)
return buildURL(m.protocol, m.interaction.slugManager.Get(), m.domain)
}
return fmt.Sprintf("tcp://%s:%d", m.domain, m.port)
}
@@ -123,11 +104,11 @@ type keymap struct {
type tickMsg time.Time
func New(slug slug.Slug, forwarder Forwarder) Interaction {
func NewInteraction(slugManager slug.Manager, forwarder Forwarder) *Interaction {
ctx, cancel := context.WithCancel(context.Background())
return &interaction{
return &Interaction{
channel: nil,
slug: slug,
slugManager: slugManager,
forwarder: forwarder,
lifecycle: nil,
sessionRegistry: nil,
@@ -137,19 +118,19 @@ func New(slug slug.Slug, forwarder Forwarder) Interaction {
}
}
func (i *interaction) SetSessionRegistry(registry SessionRegistry) {
func (i *Interaction) SetSessionRegistry(registry SessionRegistry) {
i.sessionRegistry = registry
}
func (i *interaction) SetLifecycle(lifecycle Lifecycle) {
func (i *Interaction) SetLifecycle(lifecycle Lifecycle) {
i.lifecycle = lifecycle
}
func (i *interaction) SetChannel(channel ssh.Channel) {
func (i *Interaction) SetChannel(channel ssh.Channel) {
i.channel = channel
}
func (i *interaction) Stop() {
func (i *Interaction) Stop() {
if i.cancel != nil {
i.cancel()
}
@@ -242,8 +223,8 @@ func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
return m, tea.Batch(tea.ClearScreen, textinput.Blink)
case "enter":
inputValue := m.slugInput.Value()
if err := m.interaction.sessionRegistry.Update(m.interaction.lifecycle.User(), types.SessionKey{
Id: m.interaction.slug.String(),
if err := m.interaction.sessionRegistry.Update(m.interaction.lifecycle.GetUser(), types.SessionKey{
Id: m.interaction.slugManager.Get(),
Type: types.HTTP,
}, types.SessionKey{
Id: inputValue,
@@ -285,7 +266,7 @@ func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
if item.name == "slug" {
m.showingCommands = false
m.editingSlug = true
m.slugInput.SetValue(m.interaction.slug.String())
m.slugInput.SetValue(m.interaction.slugManager.Get())
m.slugInput.Focus()
return m, tea.Batch(tea.ClearScreen, textinput.Blink)
} else if item.name == "tunnel-type" {
@@ -317,7 +298,7 @@ func (m *model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
return m, nil
}
func (i *interaction) Redraw() {
func (i *Interaction) Redraw() {
if i.program != nil {
i.program.Send(tea.ClearScreen())
}
@@ -691,32 +672,22 @@ func (m *model) View() string {
MarginBottom(boxMargin).
Width(boxMaxWidth)
authenticatedUser := m.interaction.lifecycle.User()
userInfoStyle := lipgloss.NewStyle().
Foreground(lipgloss.Color("#FAFAFA")).
Bold(true)
sectionHeaderStyle := lipgloss.NewStyle().
Foreground(lipgloss.Color("#888888")).
Bold(true)
addressStyle := lipgloss.NewStyle().
Foreground(lipgloss.Color("#FAFAFA"))
urlDisplay := m.getTunnelURL()
if shouldUseCompactLayout(m.width, 80) && len(urlDisplay) > m.width-20 {
maxLen := m.width - 25
if maxLen > 10 {
urlDisplay = truncateString(urlDisplay, maxLen)
}
}
var infoContent string
if shouldUseCompactLayout(m.width, 70) {
infoContent = fmt.Sprintf("👤 %s\n\n%s\n%s",
userInfoStyle.Render(authenticatedUser),
sectionHeaderStyle.Render("🌐 FORWARDING ADDRESS:"),
addressStyle.Render(fmt.Sprintf(" %s", urlBoxStyle.Render(m.getTunnelURL()))))
infoContent = fmt.Sprintf("🌐 %s", urlBoxStyle.Render(urlDisplay))
} else if isCompact {
infoContent = fmt.Sprintf("🌐 Forwarding to:\n\n %s", urlBoxStyle.Render(urlDisplay))
} else {
infoContent = fmt.Sprintf("👤 Authenticated as: %s\n\n%s\n %s",
userInfoStyle.Render(authenticatedUser),
sectionHeaderStyle.Render("🌐 FORWARDING ADDRESS:"),
addressStyle.Render(urlBoxStyle.Render(m.getTunnelURL())))
infoContent = fmt.Sprintf("🌐 F O R W A R D I N G T O:\n\n %s", urlBoxStyle.Render(urlDisplay))
}
b.WriteString(responsiveInfoBox.Render(infoContent))
b.WriteString("\n")
@@ -767,10 +738,7 @@ func (m *model) View() string {
return b.String()
}
func (i *interaction) Start() {
if i.mode == types.HEADLESS {
return
}
func (i *Interaction) Start() {
lipgloss.SetColorProfile(termenv.TrueColor)
domain := config.Getenv("DOMAIN", "localhost")
@@ -779,8 +747,8 @@ func (i *interaction) Start() {
protocol = "https"
}
tunnelType := i.forwarder.TunnelType()
port := i.forwarder.ForwardedPort()
tunnelType := i.forwarder.GetTunnelType()
port := i.forwarder.GetForwardedPort()
items := []list.Item{
commandItem{name: "slug", desc: "Set custom subdomain"},
+40 -44
View File
@@ -15,119 +15,115 @@ import (
type Forwarder interface {
Close() error
TunnelType() types.TunnelType
ForwardedPort() uint16
GetTunnelType() types.TunnelType
GetForwardedPort() uint16
}
type SessionRegistry interface {
Remove(key types.SessionKey)
}
type lifecycle struct {
type Lifecycle struct {
status types.Status
conn ssh.Conn
channel ssh.Channel
forwarder Forwarder
sessionRegistry SessionRegistry
slug slug.Slug
slugManager slug.Manager
startedAt time.Time
user string
}
func New(conn ssh.Conn, forwarder Forwarder, slugManager slug.Slug, user string) Lifecycle {
return &lifecycle{
func NewLifecycle(conn ssh.Conn, forwarder Forwarder, slugManager slug.Manager, user string) *Lifecycle {
return &Lifecycle{
status: types.INITIALIZING,
conn: conn,
channel: nil,
forwarder: forwarder,
slug: slugManager,
slugManager: slugManager,
sessionRegistry: nil,
startedAt: time.Now(),
user: user,
}
}
func (l *lifecycle) SetSessionRegistry(registry SessionRegistry) {
func (l *Lifecycle) SetSessionRegistry(registry SessionRegistry) {
l.sessionRegistry = registry
}
type Lifecycle interface {
Connection() ssh.Conn
Channel() ssh.Channel
User() string
type SessionLifecycle interface {
Close() error
SetStatus(status types.Status)
GetConnection() ssh.Conn
GetChannel() ssh.Channel
GetUser() string
SetChannel(channel ssh.Channel)
SetSessionRegistry(registry SessionRegistry)
SetStatus(status types.Status)
IsActive() bool
StartedAt() time.Time
Close() error
}
func (l *lifecycle) User() string {
func (l *Lifecycle) GetUser() string {
return l.user
}
func (l *lifecycle) Channel() ssh.Channel {
func (l *Lifecycle) GetChannel() ssh.Channel {
return l.channel
}
func (l *lifecycle) SetChannel(channel ssh.Channel) {
func (l *Lifecycle) SetChannel(channel ssh.Channel) {
l.channel = channel
}
func (l *lifecycle) Connection() ssh.Conn {
func (l *Lifecycle) GetConnection() ssh.Conn {
return l.conn
}
func (l *lifecycle) SetStatus(status types.Status) {
func (l *Lifecycle) SetStatus(status types.Status) {
l.status = status
if status == types.RUNNING && l.startedAt.IsZero() {
l.startedAt = time.Now()
}
}
func (l *lifecycle) Close() error {
var firstErr error
tunnelType := l.forwarder.TunnelType()
if err := l.forwarder.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
firstErr = err
func (l *Lifecycle) Close() error {
err := l.forwarder.Close()
if err != nil && !errors.Is(err, net.ErrClosed) {
return err
}
if l.channel != nil {
if err := l.channel.Close(); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, net.ErrClosed) {
if firstErr == nil {
firstErr = err
}
err := l.channel.Close()
if err != nil && !errors.Is(err, io.EOF) {
return err
}
}
if l.conn != nil {
if err := l.conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
if firstErr == nil {
firstErr = err
}
err := l.conn.Close()
if err != nil && !errors.Is(err, net.ErrClosed) {
return err
}
}
clientSlug := l.slug.String()
key := types.SessionKey{
Id: clientSlug,
Type: tunnelType,
clientSlug := l.slugManager.Get()
if clientSlug != "" && l.sessionRegistry.Remove != nil {
key := types.SessionKey{Id: clientSlug, Type: l.forwarder.GetTunnelType()}
l.sessionRegistry.Remove(key)
}
l.sessionRegistry.Remove(key)
if tunnelType == types.TCP {
if err := portUtil.Default.SetPortStatus(l.forwarder.ForwardedPort(), false); err != nil && firstErr == nil {
firstErr = err
if l.forwarder.GetTunnelType() == types.TCP {
err = portUtil.Default.SetPortStatus(l.forwarder.GetForwardedPort(), false)
if err != nil {
return err
}
}
return firstErr
return nil
}
func (l *lifecycle) IsActive() bool {
func (l *Lifecycle) IsActive() bool {
return l.status == types.RUNNING
}
func (l *lifecycle) StartedAt() time.Time {
func (l *Lifecycle) StartedAt() time.Time {
return l.startedAt
}
+16 -16
View File
@@ -9,27 +9,27 @@ import (
type Key = types.SessionKey
type Registry interface {
Get(key Key) (session Session, err error)
GetWithUser(user string, key Key) (session Session, err error)
Get(key Key) (session *SSHSession, err error)
GetWithUser(user string, key Key) (session *SSHSession, err error)
Update(user string, oldKey, newKey Key) error
Register(key Key, session Session) (success bool)
Register(key Key, session *SSHSession) (success bool)
Remove(key Key)
GetAllSessionFromUser(user string) []Session
GetAllSessionFromUser(user string) []*SSHSession
}
type registry struct {
mu sync.RWMutex
byUser map[string]map[Key]Session
byUser map[string]map[Key]*SSHSession
slugIndex map[Key]string
}
func NewRegistry() Registry {
return &registry{
byUser: make(map[string]map[Key]Session),
byUser: make(map[string]map[Key]*SSHSession),
slugIndex: make(map[Key]string),
}
}
func (r *registry) Get(key Key) (session Session, err error) {
func (r *registry) Get(key Key) (session *SSHSession, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
@@ -45,7 +45,7 @@ func (r *registry) Get(key Key) (session Session, err error) {
return client, nil
}
func (r *registry) GetWithUser(user string, key Key) (session Session, err error) {
func (r *registry) GetWithUser(user string, key Key) (session *SSHSession, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
@@ -87,17 +87,17 @@ func (r *registry) Update(user string, oldKey, newKey Key) error {
delete(r.byUser[user], oldKey)
delete(r.slugIndex, oldKey)
client.Slug().Set(newKey.Id)
client.slugManager.Set(newKey.Id)
r.slugIndex[newKey] = user
if r.byUser[user] == nil {
r.byUser[user] = make(map[Key]Session)
r.byUser[user] = make(map[Key]*SSHSession)
}
r.byUser[user][newKey] = client
return nil
}
func (r *registry) Register(key Key, session Session) (success bool) {
func (r *registry) Register(key Key, session *SSHSession) (success bool) {
r.mu.Lock()
defer r.mu.Unlock()
@@ -105,9 +105,9 @@ func (r *registry) Register(key Key, session Session) (success bool) {
return false
}
userID := session.Lifecycle().User()
userID := session.lifecycle.GetUser()
if r.byUser[userID] == nil {
r.byUser[userID] = make(map[Key]Session)
r.byUser[userID] = make(map[Key]*SSHSession)
}
r.byUser[userID][key] = session
@@ -115,16 +115,16 @@ func (r *registry) Register(key Key, session Session) (success bool) {
return true
}
func (r *registry) GetAllSessionFromUser(user string) []Session {
func (r *registry) GetAllSessionFromUser(user string) []*SSHSession {
r.mu.RLock()
defer r.mu.RUnlock()
m := r.byUser[user]
if len(m) == 0 {
return []Session{}
return []*SSHSession{}
}
sessions := make([]Session, 0, len(m))
sessions := make([]*SSHSession, 0, len(m))
for _, s := range m {
sessions = append(sessions, s)
}
+72 -112
View File
@@ -9,11 +9,65 @@ import (
"tunnel_pls/session/interaction"
"tunnel_pls/session/lifecycle"
"tunnel_pls/session/slug"
"tunnel_pls/types"
"golang.org/x/crypto/ssh"
)
type Session interface {
HandleGlobalRequest(ch <-chan *ssh.Request)
HandleTCPIPForward(req *ssh.Request)
HandleHTTPForward(req *ssh.Request, port uint16)
HandleTCPForward(req *ssh.Request, addr string, port uint16)
}
type SSHSession struct {
initialReq <-chan *ssh.Request
sshReqChannel <-chan ssh.NewChannel
lifecycle lifecycle.SessionLifecycle
interaction interaction.Controller
forwarder forwarder.ForwardingController
slugManager slug.Manager
registry Registry
}
func (s *SSHSession) GetLifecycle() lifecycle.SessionLifecycle {
return s.lifecycle
}
func (s *SSHSession) GetInteraction() interaction.Controller {
return s.interaction
}
func (s *SSHSession) GetForwarder() forwarder.ForwardingController {
return s.forwarder
}
func (s *SSHSession) GetSlugManager() slug.Manager {
return s.slugManager
}
func New(conn *ssh.ServerConn, forwardingReq <-chan *ssh.Request, sshChan <-chan ssh.NewChannel, sessionRegistry Registry, user string) *SSHSession {
slugManager := slug.NewManager()
forwarderManager := forwarder.NewForwarder(slugManager)
interactionManager := interaction.NewInteraction(slugManager, forwarderManager)
lifecycleManager := lifecycle.NewLifecycle(conn, forwarderManager, slugManager, user)
interactionManager.SetLifecycle(lifecycleManager)
forwarderManager.SetLifecycle(lifecycleManager)
interactionManager.SetSessionRegistry(sessionRegistry)
lifecycleManager.SetSessionRegistry(sessionRegistry)
return &SSHSession{
initialReq: forwardingReq,
sshReqChannel: sshChan,
lifecycle: lifecycleManager,
interaction: interactionManager,
forwarder: forwarderManager,
slugManager: slugManager,
registry: sessionRegistry,
}
}
type Detail struct {
ForwardingType string `json:"forwarding_type,omitempty"`
Slug string `json:"slug,omitempty"`
@@ -22,136 +76,42 @@ type Detail struct {
StartedAt time.Time `json:"started_at,omitempty"`
}
type Session interface {
HandleGlobalRequest(ch <-chan *ssh.Request)
HandleTCPIPForward(req *ssh.Request)
HandleHTTPForward(req *ssh.Request, port uint16)
HandleTCPForward(req *ssh.Request, addr string, port uint16)
Lifecycle() lifecycle.Lifecycle
Interaction() interaction.Interaction
Forwarder() forwarder.Forwarder
Slug() slug.Slug
Detail() *Detail
Start() error
}
type session struct {
initialReq <-chan *ssh.Request
sshChan <-chan ssh.NewChannel
lifecycle lifecycle.Lifecycle
interaction interaction.Interaction
forwarder forwarder.Forwarder
slug slug.Slug
registry Registry
}
func New(conn *ssh.ServerConn, initialReq <-chan *ssh.Request, sshChan <-chan ssh.NewChannel, sessionRegistry Registry, user string) Session {
slugManager := slug.New()
forwarderManager := forwarder.New(slugManager)
interactionManager := interaction.New(slugManager, forwarderManager)
lifecycleManager := lifecycle.New(conn, forwarderManager, slugManager, user)
interactionManager.SetLifecycle(lifecycleManager)
forwarderManager.SetLifecycle(lifecycleManager)
interactionManager.SetSessionRegistry(sessionRegistry)
lifecycleManager.SetSessionRegistry(sessionRegistry)
return &session{
initialReq: initialReq,
sshChan: sshChan,
lifecycle: lifecycleManager,
interaction: interactionManager,
forwarder: forwarderManager,
slug: slugManager,
registry: sessionRegistry,
}
}
func (s *session) Lifecycle() lifecycle.Lifecycle {
return s.lifecycle
}
func (s *session) Interaction() interaction.Interaction {
return s.interaction
}
func (s *session) Forwarder() forwarder.Forwarder {
return s.forwarder
}
func (s *session) Slug() slug.Slug {
return s.slug
}
func (s *session) Detail() *Detail {
var tunnelType string
if s.forwarder.TunnelType() == types.HTTP {
tunnelType = "HTTP"
} else if s.forwarder.TunnelType() == types.TCP {
tunnelType = "TCP"
} else {
tunnelType = "UNKNOWN"
}
return &Detail{
ForwardingType: tunnelType,
Slug: s.slug.String(),
UserID: s.lifecycle.User(),
func (s *SSHSession) Detail() Detail {
return Detail{
ForwardingType: string(s.forwarder.GetTunnelType()),
Slug: s.slugManager.Get(),
UserID: s.lifecycle.GetUser(),
Active: s.lifecycle.IsActive(),
StartedAt: s.lifecycle.StartedAt(),
}
}
func (s *session) Start() error {
var channel ssh.NewChannel
var ok bool
select {
case channel, ok = <-s.sshChan:
if !ok {
log.Println("Forwarding request channel closed")
return nil
}
ch, reqs, err := channel.Accept()
if err != nil {
log.Printf("failed to accept channel: %v", err)
return err
}
go s.HandleGlobalRequest(reqs)
s.lifecycle.SetChannel(ch)
s.interaction.SetChannel(ch)
s.interaction.SetMode(types.INTERACTIVE)
case <-time.After(500 * time.Millisecond):
s.interaction.SetMode(types.HEADLESS)
func (s *SSHSession) Start() error {
channel := <-s.sshReqChannel
ch, reqs, err := channel.Accept()
if err != nil {
log.Printf("failed to accept channel: %v", err)
return err
}
go s.HandleGlobalRequest(reqs)
tcpipReq := s.waitForTCPIPForward()
if tcpipReq == nil {
err := s.interaction.Send(fmt.Sprintf("Port forwarding request not received. Ensure you ran the correct command with -R flag. Example: ssh %s -p %s -R 80:localhost:3000", config.Getenv("DOMAIN", "localhost"), config.Getenv("PORT", "2200")))
_, err := ch.Write([]byte(fmt.Sprintf("Port forwarding request not received. Ensure you ran the correct command with -R flag. Example: ssh %s -p %s -R 80:localhost:3000", config.Getenv("DOMAIN", "localhost"), config.Getenv("PORT", "2200"))))
if err != nil {
return err
}
if err = s.lifecycle.Close(); err != nil {
if err := s.lifecycle.Close(); err != nil {
log.Printf("failed to close session: %v", err)
}
return fmt.Errorf("no forwarding Request")
}
if (s.interaction.Mode() == types.HEADLESS && config.Getenv("MODE", "standalone") == "standalone") && s.lifecycle.User() == "UNAUTHORIZED" {
if err := tcpipReq.Reply(false, nil); err != nil {
log.Printf("cannot reply to tcpip req: %s\n", err)
return err
}
if err := s.lifecycle.Close(); err != nil {
log.Printf("failed to close session: %v", err)
return err
}
return nil
}
s.lifecycle.SetChannel(ch)
s.interaction.SetChannel(ch)
s.HandleTCPIPForward(tcpipReq)
s.interaction.Start()
s.lifecycle.Connection().Wait()
if err := s.lifecycle.Close(); err != nil {
log.Printf("failed to close session: %v", err)
return err
@@ -159,7 +119,7 @@ func (s *session) Start() error {
return nil
}
func (s *session) waitForTCPIPForward() *ssh.Request {
func (s *SSHSession) waitForTCPIPForward() *ssh.Request {
select {
case req, ok := <-s.initialReq:
if !ok {
+7 -7
View File
@@ -1,24 +1,24 @@
package slug
type Slug interface {
String() string
type Manager interface {
Get() string
Set(slug string)
}
type slug struct {
type manager struct {
slug string
}
func New() Slug {
return &slug{
func NewManager() Manager {
return &manager{
slug: "",
}
}
func (s *slug) String() string {
func (s *manager) Get() string {
return s.slug
}
func (s *slug) Set(slug string) {
func (s *manager) Set(slug string) {
s.slug = slug
}
+7 -14
View File
@@ -1,25 +1,18 @@
package types
type Status int
type Status string
const (
INITIALIZING Status = iota
RUNNING
INITIALIZING Status = "INITIALIZING"
RUNNING Status = "RUNNING"
SETUP Status = "SETUP"
)
type Mode int
type TunnelType string
const (
INTERACTIVE Mode = iota
HEADLESS
)
type TunnelType int
const (
UNKNOWN TunnelType = iota
HTTP
TCP
HTTP TunnelType = "HTTP"
TCP TunnelType = "TCP"
)
type SessionKey struct {