Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 106 additions & 68 deletions ssh/server/channels/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package channels

import (
"bytes"
"io"
"sync"

Expand All @@ -14,20 +13,88 @@ import (
gossh "golang.org/x/crypto/ssh"
)

// pipe pipes data between client and agent, and vise versa, recoding each frame when ShellHub instance are Cloud or
// Enterprise.
func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, agent gossh.Channel) {
defer func() {
ctx.Lock()
sess.Handled = false
ctx.Unlock()
type Recorder struct {
queue chan string
channel gossh.Channel
}

func NewRecorder(channel gossh.Channel, sess *session.Session, camera *session.Camera) (io.WriteCloser, error) {
// NOTE: The queue's size is a random number.
queue := make(chan string, 100)

go func() {
for {
msg, ok := <-queue
if !ok {
log.WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("recorder queue is closed")

return
}

if err := camera.WriteFrame(&models.SessionRecorded{ //nolint:errcheck
UID: sess.UID,
Namespace: sess.Lookup["domain"],
Message: msg,
Width: int(sess.Pty.Columns),
Height: int(sess.Pty.Rows),
}); err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to send the session frame to record")

// NOTE: When a frame isn't sent correctly, we stop the writing loop, only reading from the queue,
// and discarding the messages to avoid stuck the go routine.
break
}
}

for {
// NOTE: Reads the queue and discards the data to avoid stuck the go routine.
if _, ok := <-queue; !ok {
log.WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("recorder queue is closed")

return
}
}
}()

// NOTICE: avoid multiple pipe data in same channel due to protocol limitaion.
ctx.Lock()
sess.Handled = true
ctx.Unlock()
return &Recorder{
queue: queue,
channel: channel,
}, nil
}

// record enqueues a session frame to be recorded. If the queue is closed, nothing is done.
func (c *Recorder) record(msg string) {
select {
case c.queue <- msg:
default:
log.Trace("the message couldn't sent to the record queue")
}
}

func (c *Recorder) Write(data []byte) (int, error) {
read, err := c.channel.Write(data)
if err != nil {
return read, err
}

c.record(string(data))

return read, nil
}

func (c *Recorder) Close() error {
close(c.queue)

return c.channel.CloseWrite()
}

// pipe pipes data between client and agent, and vice versa, recording each frame when ShellHub instance are Cloud or
// Enterprise.
func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, agent gossh.Channel) {
defer log.
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Trace("data pipe between client and agent has done")
Expand All @@ -40,76 +107,47 @@ func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, ag

go func() {
defer wg.Done()
defer client.CloseWrite() //nolint:errcheck

// NOTE: As the copy required to record the session seem to be inefficient, if we don't have a record URL
// defined, we use an [io.Copy] for the data piping between agent and client.
recordURL := ctx.Value("RECORD_URL").(string)
if (envs.IsEnterprise() || envs.IsCloud()) && recordURL != "" {
// NOTE: Recoding variable is used to control if the frames will be recorded. If something wrong happens in
// this process, to spare resources, we don't send frames anymore for this session.
recording := true
if envs.IsEnterprise() || envs.IsCloud() {
recordURL := ctx.Value("RECORD_URL").(string)
if recordURL == "" {
log.WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID, "record_url": recordURL}).
Warning("failed to start session's record because the record URL is empty")

goto normal
}

camera, err := sess.Record(ctx, recordURL)
if err != nil {
goto normal
}

recorder, err := NewRecorder(client, sess, camera)
if err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID, "record_url": recordURL}).
Warning("failed to connect to session record endpoint")

recording = false
goto normal
}

defer camera.Close()
defer recorder.Close() //nolint:errcheck

buffer := make([]byte, 1024)
for {
read, err := a.Read(buffer)
// The occurrence of io.EOF is expected when the connection ends.
// This indicates that we have reached the end of the input stream, and we need
// to break out of the loop to handle the termination of the connection
if err == io.EOF {
break
}
// Unlike io.EOF, when 'err' is simply not nil, it signifies an unexpected error,
// and we need to log to handle it appropriately.
if err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to read from stdout in pty client")

break
}
if _, err := io.Copy(recorder, a); err != nil && err != io.EOF {
log.WithError(err).Error("failed on coping data from client to agent")
}

if _, err = io.Copy(client, bytes.NewReader(buffer[:read])); err != nil && err != io.EOF {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to copy from stdout in pty client")
return
}

break
}
// NOTE: "normal" labels indicate the default way of copying data between clients and the agent without recording.
// Their idea was, if something goes wrong with the recording flow, the session will continue, even without the
// recording.
normal:
defer client.CloseWrite() //nolint:errcheck

if recording {
if err := camera.WriteFrame(&models.SessionRecorded{ //nolint:errcheck
UID: sess.UID,
Namespace: sess.Lookup["domain"],
Message: string(buffer[:read]),
Width: int(sess.Pty.Columns),
Height: int(sess.Pty.Rows),
}); err != nil {
log.WithError(err).
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
Warning("failed to send the session frame to record")

recording = false

continue
}
}
}
} else {
if _, err := io.Copy(client, a); err != nil && err != io.EOF {
log.WithError(err).Error("failed on coping data from client to agent")
}
if _, err := io.Copy(client, a); err != nil && err != io.EOF {
log.WithError(err).Error("failed on coping data from client to agent")
}

log.Trace("agent channel data copy done")
Expand Down
Loading