test(stream): add unit tests for stream behavior
- Fix duplicating EOF error when closing SSH connection - Add new SessionStatusCLOSED type
This commit is contained in:
@@ -2,6 +2,9 @@ package lifecycle
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
portUtil "tunnel_pls/internal/port"
|
||||
@@ -22,7 +25,9 @@ type SessionRegistry interface {
|
||||
}
|
||||
|
||||
type lifecycle struct {
|
||||
mu sync.Mutex
|
||||
status types.SessionStatus
|
||||
closeErr error
|
||||
conn ssh.Conn
|
||||
channel ssh.Channel
|
||||
forwarder Forwarder
|
||||
@@ -73,29 +78,41 @@ func (l *lifecycle) Connection() ssh.Conn {
|
||||
return l.conn
|
||||
}
|
||||
func (l *lifecycle) SetStatus(status types.SessionStatus) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.status = status
|
||||
if status == types.SessionStatusRUNNING && l.startedAt.IsZero() {
|
||||
l.startedAt = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func closeIfNotNil(c interface{ Close() error }) error {
|
||||
if c != nil {
|
||||
return c.Close()
|
||||
}
|
||||
return nil
|
||||
func (l *lifecycle) IsActive() bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return l.status == types.SessionStatusRUNNING
|
||||
}
|
||||
|
||||
func (l *lifecycle) Close() error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if l.status == types.SessionStatusCLOSED {
|
||||
return l.closeErr
|
||||
}
|
||||
l.status = types.SessionStatusCLOSED
|
||||
|
||||
var errs []error
|
||||
tunnelType := l.forwarder.TunnelType()
|
||||
|
||||
if err := closeIfNotNil(l.channel); err != nil {
|
||||
errs = append(errs, err)
|
||||
if l.channel != nil {
|
||||
if err := l.channel.Close(); err != nil && !isClosedError(err) {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := closeIfNotNil(l.conn); err != nil {
|
||||
errs = append(errs, err)
|
||||
if l.conn != nil {
|
||||
if err := l.conn.Close(); err != nil && !isClosedError(err) {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
clientSlug := l.slug.String()
|
||||
@@ -114,11 +131,15 @@ func (l *lifecycle) Close() error {
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Join(errs...)
|
||||
l.closeErr = errors.Join(errs...)
|
||||
return l.closeErr
|
||||
}
|
||||
|
||||
func (l *lifecycle) IsActive() bool {
|
||||
return l.status == types.SessionStatusRUNNING
|
||||
func isClosedError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) || err.Error() == "EOF"
|
||||
}
|
||||
|
||||
func (l *lifecycle) StartedAt() time.Time {
|
||||
|
||||
Reference in New Issue
Block a user