mirror of
https://github.com/harness/drone.git
synced 2025-05-04 07:29:21 +08:00

* feat: [CDE-96]: Renaming reusable scanner as scanner doesn't support resetting the reader. Removing closec as return param in livelog.Tail method. CLosing the handler channel when the stream is closed, this ensures the consumer to know the channel is closed. Adding a check in the render.StreamSSE to close the stream when the events channel is closed and drained. * feat: [CDE-127]: Changes to enable log stream API for gitspaces. Adding close channel to the logstream Tail method. Changes to use a default host and working dir from config for a running gitspace instance's URL. Change to append gitspaces to the bind mount of the gitspace container in the container orchestrator instead of in the config provider. * Initial commit * feat: [CDE-127]: Removing streams map from StatefulLogger. Flushing the stream every time the invoking function is closed. * feat: [CDE-127]: Removing streams map from StatefulLogger. Flushing the stream every time the invoking function is closed. * feat: [CDE-127]: Adding flush stream logic to delete gitspace in orchestrator. Linting. * feat: [CDE-127]: Addressing review comments. * feat: [CDE-127]: Adding changes to make logutil.StatefulLogger a wrapper on livelog.LogStream and adding initialisation and flush functions. * feat: [CDE-127]: Adding changes to make logutil.StatefulLogger a wrapper on livelog.LogStream and adding initialisation and flush functions. * feat: [CDE-127]: Adding changes to include logging in container orchestration's start flow.
175 lines
4.0 KiB
Go
175 lines
4.0 KiB
Go
// Copyright 2023 Harness, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package render
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/harness/gitness/app/api/usererror"
|
|
"github.com/harness/gitness/app/sse"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
func StreamSSE(
|
|
ctx context.Context,
|
|
w http.ResponseWriter,
|
|
chStop <-chan struct{},
|
|
chEvents <-chan *sse.Event,
|
|
chErr <-chan error,
|
|
) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
UserError(ctx, w, usererror.ErrResponseNotFlushable)
|
|
log.Ctx(ctx).Warn().Err(usererror.ErrResponseNotFlushable).Msg("failed to build SSE stream")
|
|
return
|
|
}
|
|
|
|
h := w.Header()
|
|
h.Set("Content-Type", "text/event-stream")
|
|
h.Set("Cache-Control", "no-cache")
|
|
h.Set("Connection", "keep-alive")
|
|
h.Set("X-Accel-Buffering", "no")
|
|
h.Set("Access-Control-Allow-Origin", "*")
|
|
|
|
enc := json.NewEncoder(w)
|
|
enc.SetEscapeHTML(false)
|
|
|
|
stream := sseStream{
|
|
enc: enc,
|
|
writer: w,
|
|
flusher: flusher,
|
|
}
|
|
|
|
const (
|
|
pingInterval = 30 * time.Second
|
|
tailMaxTime = 2 * time.Hour
|
|
)
|
|
|
|
ctx, ctxCancel := context.WithTimeout(ctx, tailMaxTime)
|
|
defer ctxCancel()
|
|
|
|
if err := stream.ping(); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to send initial ping")
|
|
return
|
|
}
|
|
|
|
defer func() {
|
|
if err := stream.close(); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to close SSE stream")
|
|
}
|
|
}()
|
|
|
|
pingTimer := time.NewTimer(pingInterval)
|
|
defer pingTimer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Ctx(ctx).Debug().Err(ctx.Err()).Msg("stream SSE request context done")
|
|
return
|
|
|
|
case <-chStop:
|
|
log.Ctx(ctx).Debug().Msg("app shutdown")
|
|
return
|
|
|
|
case err := <-chErr:
|
|
log.Ctx(ctx).Debug().Err(err).Msg("received error from SSE stream")
|
|
return
|
|
|
|
case <-pingTimer.C:
|
|
if err := stream.ping(); err != nil {
|
|
log.Ctx(ctx).Err(err).Msg("failed to send SSE ping")
|
|
return
|
|
}
|
|
|
|
case event, canProduce := <-chEvents:
|
|
if !canProduce {
|
|
log.Ctx(ctx).Debug().Msg("events channel is drained and closed.")
|
|
return
|
|
}
|
|
if err := stream.event(event); err != nil {
|
|
log.Ctx(ctx).Err(err).Msgf("failed to send SSE event: %s", event.Type)
|
|
return
|
|
}
|
|
}
|
|
|
|
pingTimer.Stop() // stop timer
|
|
|
|
select {
|
|
case <-pingTimer.C: // drain channel
|
|
default:
|
|
}
|
|
|
|
pingTimer.Reset(pingInterval) // reset timer
|
|
}
|
|
}
|
|
|
|
type sseStream struct {
|
|
enc *json.Encoder
|
|
writer io.Writer
|
|
flusher http.Flusher
|
|
}
|
|
|
|
func (r sseStream) event(event *sse.Event) error {
|
|
_, err := io.WriteString(r.writer, fmt.Sprintf("event: %s\n", event.Type))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send event header: %w", err)
|
|
}
|
|
|
|
_, err = io.WriteString(r.writer, "data: ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send data header: %w", err)
|
|
}
|
|
|
|
err = r.enc.Encode(event.Data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send data: %w", err)
|
|
}
|
|
|
|
// NOTE: enc.Encode is ending the data with a new line, only add one more
|
|
// Source: https://cs.opensource.google/go/go/+/refs/tags/go1.21.1:src/encoding/json/stream.go;l=220
|
|
_, err = r.writer.Write([]byte{'\n'})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send end of message: %w", err)
|
|
}
|
|
|
|
r.flusher.Flush()
|
|
return nil
|
|
}
|
|
|
|
func (r sseStream) close() error {
|
|
_, err := io.WriteString(r.writer, "event: error\ndata: eof\n\n")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send EOF: %w", err)
|
|
}
|
|
r.flusher.Flush()
|
|
return nil
|
|
}
|
|
|
|
func (r sseStream) ping() error {
|
|
_, err := io.WriteString(r.writer, ": ping\n\n")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send ping: %w", err)
|
|
}
|
|
r.flusher.Flush()
|
|
return nil
|
|
}
|