From 26408533e11f1b260b455c75ad7fbb636cc7c580 Mon Sep 17 00:00:00 2001 From: Ansuman Satapathy Date: Tue, 3 Dec 2024 13:06:59 +0000 Subject: [PATCH] feat: [CDE-472]: handle stream closing gracefully, and simplify sync cmd execution (#3102) * feat: [CDE-472]: handle stream closing gracefully, and simplify sync cmd execution * feat: [CDE-472]: handle stream closing gracefully, and simplify sync cmd execution --- .../orchestrator/devcontainer/exec.go | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/app/gitspace/orchestrator/devcontainer/exec.go b/app/gitspace/orchestrator/devcontainer/exec.go index d9fe5579a..efac44e36 100644 --- a/app/gitspace/orchestrator/devcontainer/exec.go +++ b/app/gitspace/orchestrator/devcontainer/exec.go @@ -60,39 +60,34 @@ func (e *Exec) ExecuteCommand( ) (string, error) { containerExecCreate, err := e.createExecution(ctx, command, root, workingDir, false) if err != nil { - return "", err + return "", fmt.Errorf("failed to create exec instance: %w", err) } - // Attach and inspect exec session to get the output - inspectExec, err := e.attachAndInspectExec(ctx, containerExecCreate.ID, false) - if err != nil { - return "", fmt.Errorf("failed to start docker exec for container %s: %w", e.ContainerName, err) - } - var stdoutBuf bytes.Buffer - var stderrBuf bytes.Buffer - stdoutData, err := io.ReadAll(inspectExec.StdOut) + resp, err := e.DockerClient.ContainerExecAttach( + ctx, containerExecCreate.ID, container.ExecStartOptions{Detach: false}) if err != nil { - return "", fmt.Errorf("error reading stdout: %w", err) + return "", fmt.Errorf("failed to attach to exec session: %w", err) } - stdoutBuf.Write(stdoutData) - stderrData, err := io.ReadAll(inspectExec.StdErr) + defer resp.Close() + + // Prepare buffers for stdout and stderr + var stdoutBuf, stderrBuf bytes.Buffer + + // Use stdcopy to demultiplex output + _, err = stdcopy.StdCopy(&stdoutBuf, &stderrBuf, resp.Reader) if err != nil { - return "", fmt.Errorf("error reading stderr: %w", err) + return "", fmt.Errorf("error during stdcopy: %w", err) } - stderrBuf.Write(stderrData) inspect, err := e.DockerClient.ContainerExecInspect(ctx, containerExecCreate.ID) if err != nil { return "", fmt.Errorf("failed to inspect exec session: %w", err) } - - // If the exit code is non-zero, return both stdout and stderr + // Handle non-zero exit codes if inspect.ExitCode != 0 { - // Combine stdout and stderr return fmt.Sprintf( - "STDOUT:\n%s\nSTDERR:\n%s", stdoutBuf.String(), stderrBuf.String()), - fmt.Errorf("command exited with non-zero status: %d", inspect.ExitCode) + "STDOUT:\n%s\nSTDERR:\n%s", stdoutBuf.String(), stderrBuf.String(), + ), fmt.Errorf("command exited with non-zero status: %d", inspect.ExitCode) } - // If the exit code is zero, only return stdout return stdoutBuf.String(), nil } @@ -179,7 +174,7 @@ func (e *Exec) attachAndInspectExec(ctx context.Context, id string, detach bool) stdoutPipe, stdoutWriter := io.Pipe() stderrPipe, stderrWriter := io.Pipe() - go e.copyOutput(resp.Reader, stdoutWriter, stderrWriter) + go e.copyOutput(resp, stdoutWriter, stderrWriter) // Return the output streams and the response return &execResult{ @@ -224,12 +219,17 @@ func (e *Exec) streamResponse(resp *execResult, outputCh chan []byte) { }() } -func (e *Exec) copyOutput(reader io.Reader, stdoutWriter, stderrWriter io.WriteCloser) { +func (e *Exec) copyOutput(response dockerTypes.HijackedResponse, stdoutWriter, stderrWriter io.WriteCloser) { defer func() { - stdoutWriter.Close() - stderrWriter.Close() + if err := stdoutWriter.Close(); err != nil { + log.Error().Err(err).Msg("Error closing stdoutWriter") + } + if err := stderrWriter.Close(); err != nil { + log.Error().Err(err).Msg("Error closing stderrWriter") + } + response.Close() }() - _, err := stdcopy.StdCopy(stdoutWriter, stderrWriter, reader) + _, err := stdcopy.StdCopy(stdoutWriter, stderrWriter, response.Reader) if err != nil { log.Error().Err(err).Msg("Error in stdcopy.StdCopy " + err.Error()) }