diff --git a/.drone.yml b/.drone.yml index b00d04a34..2c364fc92 100644 --- a/.drone.yml +++ b/.drone.yml @@ -3,7 +3,7 @@ workspace: path: src/github.com/drone/drone pipeline: - backend: + test: image: golang:1.6 environment: - GO15VENDOREXPERIMENT=1 @@ -23,7 +23,7 @@ pipeline: event: push publish: - image: s3 + image: plugins/s3 acl: public-read bucket: downloads.drone.io source: release/**/*.* @@ -32,9 +32,9 @@ pipeline: branch: master docker: + image: plugins/docker repo: drone/drone - tag: [ "0.5.0", "0.5" ] - storage_driver: overlay + tag: [ "0.5", "0.5.0", "0.5.0-rc" ] when: branch: master event: push diff --git a/.drone.yml.sig b/.drone.yml.sig index 4f1196374..42b618e41 100644 --- a/.drone.yml.sig +++ b/.drone.yml.sig @@ -1 +1 @@ -eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICBiYWNrZW5kOgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBzMwogICAgYWNsOiBwdWJsaWMtcmVhZAogICAgYnVja2V0OiBkb3dubG9hZHMuZHJvbmUuaW8KICAgIHNvdXJjZTogcmVsZWFzZS8qKi8qLioKICAgIHdoZW46CiAgICAgIGV2ZW50OiBwdXNoCiAgICAgIGJyYW5jaDogbWFzdGVyCgogIGRvY2tlcjoKICAgIHJlcG86IGRyb25lL2Ryb25lCiAgICB0YWc6IFsgIjAuNS4wIiwgIjAuNSIgXQogICAgc3RvcmFnZV9kcml2ZXI6IG92ZXJsYXkKICAgIHdoZW46CiAgICAgIGJyYW5jaDogbWFzdGVyCiAgICAgIGV2ZW50OiBwdXNoCgpzZXJ2aWNlczoKICBwb3N0Z3JlczoKICAgIGltYWdlOiBwb3N0Z3Jlczo5LjQuNQogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gUE9TVEdSRVNfVVNFUj1wb3N0Z3JlcwogIG15c3FsOgogICAgaW1hZ2U6IG15c3FsOjUuNi4yNwogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gTVlTUUxfREFUQUJBU0U9dGVzdAogICAgICAtIE1ZU1FMX0FMTE9XX0VNUFRZX1BBU1NXT1JEPXllcwo.kQIwqIgs7PnoKIGmzJ6hlbWTbV5zK0w4HVWsux79P3s \ No newline at end of file +eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICB0ZXN0OgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBwbHVnaW5zL3MzCiAgICBhY2w6IHB1YmxpYy1yZWFkCiAgICBidWNrZXQ6IGRvd25sb2Fkcy5kcm9uZS5pbwogICAgc291cmNlOiByZWxlYXNlLyoqLyouKgogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKICAgICAgYnJhbmNoOiBtYXN0ZXIKCiAgZG9ja2VyOgogICAgaW1hZ2U6IHBsdWdpbnMvZG9ja2VyCiAgICByZXBvOiBkcm9uZS9kcm9uZQogICAgdGFnOiBbICIwLjUiLCAiMC41LjAiLCAiMC41LjAtcmMiIF0KICAgIHdoZW46CiAgICAgIGJyYW5jaDogbWFzdGVyCiAgICAgIGV2ZW50OiBwdXNoCgpzZXJ2aWNlczoKICBwb3N0Z3JlczoKICAgIGltYWdlOiBwb3N0Z3Jlczo5LjQuNQogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gUE9TVEdSRVNfVVNFUj1wb3N0Z3JlcwogIG15c3FsOgogICAgaW1hZ2U6IG15c3FsOjUuNi4yNwogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gTVlTUUxfREFUQUJBU0U9dGVzdAogICAgICAtIE1ZU1FMX0FMTE9XX0VNUFRZX1BBU1NXT1JEPXllcwo.0lD1m6yILbU8ZrSJcZv7Y1CcGEG5zIaJma1C1lUTc7o \ No newline at end of file diff --git a/Makefile b/Makefile index 03a8807b0..f40359b2f 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,8 @@ deps_backend: go get -u golang.org/x/tools/cmd/cover go get -u github.com/jteeuwen/go-bindata/... go get -u github.com/elazarl/go-bindata-assetfs/... + go get -u github.com/drone/mq/... + go get -u github.com/tidwall/redlog gen: gen_template gen_migrations diff --git a/agent/agent.go b/agent/agent.go index afe0e14df..74e364c17 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -11,7 +11,6 @@ import ( "github.com/drone/drone/build" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/version" "github.com/drone/drone/yaml" "github.com/drone/drone/yaml/expander" @@ -48,7 +47,7 @@ func (a *Agent) Poll() error { return nil } -func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error { +func (a *Agent) Run(payload *model.Work, cancel <-chan bool) error { payload.Job.Status = model.StatusRunning payload.Job.Started = time.Now().Unix() @@ -90,7 +89,7 @@ func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error { return err } -func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { +func (a *Agent) prep(w *model.Work) (*yaml.Config, error) { envs := toEnv(w) w.Yaml = expander.ExpandString(w.Yaml, envs) @@ -155,8 +154,6 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { transform.CommandTransform(conf) transform.ImagePull(conf, a.Pull) transform.ImageTag(conf) - transform.ImageName(conf) - transform.ImageNamespace(conf, a.Namespace) if err := transform.ImageEscalate(conf, a.Escalate); err != nil { return nil, err } @@ -172,7 +169,7 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { return conf, nil } -func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) error { +func (a *Agent) exec(spec *yaml.Config, payload *model.Work, cancel <-chan bool) error { conf := build.Config{ Engine: a.Engine, @@ -231,7 +228,7 @@ func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) } } -func toEnv(w *queue.Work) map[string]string { +func toEnv(w *model.Work) map[string]string { envs := map[string]string{ "CI": "drone", "DRONE": "true", diff --git a/agent/updater.go b/agent/updater.go index 15922e0e2..90fc999f7 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -1,25 +1,22 @@ package agent import ( - "encoding/json" "fmt" - "io" - "sync" - "time" "github.com/Sirupsen/logrus" "github.com/drone/drone/build" - "github.com/drone/drone/client" - "github.com/drone/drone/queue" + "github.com/drone/drone/model" + "github.com/drone/mq/logger" + "github.com/drone/mq/stomp" ) // UpdateFunc handles buid pipeline status updates. -type UpdateFunc func(*queue.Work) +type UpdateFunc func(*model.Work) // LoggerFunc handles buid pipeline logging updates. type LoggerFunc func(*build.Line) -var NoopUpdateFunc = func(*queue.Work) {} +var NoopUpdateFunc = func(*model.Work) {} var TermLoggerFunc = func(line *build.Line) { fmt.Println(line) @@ -27,65 +24,44 @@ var TermLoggerFunc = func(line *build.Line) { // NewClientUpdater returns an updater that sends updated build details // to the drone server. -func NewClientUpdater(client client.Client) UpdateFunc { - return func(w *queue.Work) { - for { - err := client.Push(w) - if err == nil { - return - } - logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s", +func NewClientUpdater(client *stomp.Client) UpdateFunc { + return func(w *model.Work) { + err := client.SendJSON("/queue/updates", w) + if err != nil { + logger.Warningf("Error updating %s/%s#%d.%d. %s", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) - logrus.Infof("Retry update in 30s") - time.Sleep(time.Second * 30) + } + if w.Job.Status != model.StatusRunning { + var dest = fmt.Sprintf("/topic/logs.%d", w.Job.ID) + var opts = []stomp.MessageOption{ + stomp.WithHeader("eof", "true"), + stomp.WithRetain("all"), + } + + if err := client.Send(dest, []byte("eof"), opts...); err != nil { + logger.Warningf("Error sending eof %s/%s#%d.%d. %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) + } } } } -func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc { - var err error - var size int64 - return func(line *build.Line) { +func NewClientLogger(client *stomp.Client, id int64, limit int64) LoggerFunc { + var size int64 + var dest = fmt.Sprintf("/topic/logs.%d", id) + var opts = []stomp.MessageOption{ + stomp.WithRetain("all"), + } + + return func(line *build.Line) { if size > limit { return } - - // TODO remove this double-serialization - linejson, _ := json.Marshal(line) - w.Write(linejson) - w.Write([]byte{'\n'}) - - if err = stream.WriteJSON(line); err != nil { + if err := client.SendJSON(dest, line, opts...); err != nil { logrus.Errorf("Error streaming build logs. %s", err) } size += int64(len(line.Out)) } } - -func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc { - var once sync.Once - var size int64 - return func(line *build.Line) { - // annoying hack to only start streaming once the first line is written - once.Do(func() { - go func() { - err := client.Stream(id, rc) - if err != nil && err != io.ErrClosedPipe { - logrus.Errorf("Error streaming build logs. %s", err) - } - }() - }) - - if size > limit { - return - } - - linejson, _ := json.Marshal(line) - wc.Write(linejson) - wc.Write([]byte{'\n'}) - - size += int64(len(line.Out)) - } -} diff --git a/build/pipeline.go b/build/pipeline.go index ddeca757b..7b83d1f46 100644 --- a/build/pipeline.go +++ b/build/pipeline.go @@ -175,7 +175,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { } p.containers = append(p.containers, name) - logrus.Debugf("wait.add(1) for %s logs", name) p.wait.Add(1) go func() { defer func() { @@ -183,7 +182,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { logrus.Errorln("recover writing build output", r) } - logrus.Debugf("wait.done() for %s logs", name) p.wait.Done() }() @@ -217,7 +215,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { return err } - logrus.Debugf("wait.add(1) for %s exit code", name) p.wait.Add(1) go func() { defer func() { @@ -225,7 +222,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { logrus.Errorln("recover writing exit code to output", r) } p.wait.Done() - logrus.Debugf("wait.done() for %s exit code", name) }() p.pipe <- &Line{ diff --git a/bus/bus.go b/bus/bus.go deleted file mode 100644 index 7bb39c534..000000000 --- a/bus/bus.go +++ /dev/null @@ -1,40 +0,0 @@ -package bus - -//go:generate mockery -name Bus -output mock -case=underscore - -import "golang.org/x/net/context" - -// Bus represents an event bus implementation that -// allows a publisher to broadcast Event notifications -// to a list of subscribers. -type Bus interface { - // Publish broadcasts an event to all subscribers. - Publish(*Event) - - // Subscribe adds the channel to the list of - // subscribers. Each subscriber in the list will - // receive broadcast events. - Subscribe(chan *Event) - - // Unsubscribe removes the channel from the list - // of subscribers. - Unsubscribe(chan *Event) -} - -// Publish broadcasts an event to all subscribers. -func Publish(c context.Context, event *Event) { - FromContext(c).Publish(event) -} - -// Subscribe adds the channel to the list of -// subscribers. Each subscriber in the list will -// receive broadcast events. -func Subscribe(c context.Context, eventc chan *Event) { - FromContext(c).Subscribe(eventc) -} - -// Unsubscribe removes the channel from the -// list of subscribers. -func Unsubscribe(c context.Context, eventc chan *Event) { - FromContext(c).Unsubscribe(eventc) -} diff --git a/bus/bus_impl.go b/bus/bus_impl.go deleted file mode 100644 index d0f0e6a64..000000000 --- a/bus/bus_impl.go +++ /dev/null @@ -1,46 +0,0 @@ -package bus - -import ( - "sync" -) - -type eventbus struct { - sync.Mutex - subs map[chan *Event]bool -} - -// New creates a simple event bus that manages a list of -// subscribers to which events are published. -func New() Bus { - return newEventbus() -} - -func newEventbus() *eventbus { - return &eventbus{ - subs: make(map[chan *Event]bool), - } -} - -func (b *eventbus) Subscribe(c chan *Event) { - b.Lock() - b.subs[c] = true - b.Unlock() -} - -func (b *eventbus) Unsubscribe(c chan *Event) { - b.Lock() - delete(b.subs, c) - b.Unlock() -} - -func (b *eventbus) Publish(event *Event) { - b.Lock() - defer b.Unlock() - - for s := range b.subs { - go func(c chan *Event) { - defer recover() - c <- event - }(s) - } -} diff --git a/bus/bus_impl_test.go b/bus/bus_impl_test.go deleted file mode 100644 index ffcb1e563..000000000 --- a/bus/bus_impl_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package bus - -import ( - "sync" - "testing" - - "github.com/drone/drone/model" - . "github.com/franela/goblin" - "github.com/gin-gonic/gin" -) - -func TestBus(t *testing.T) { - g := Goblin(t) - g.Describe("Event bus", func() { - - g.It("Should unsubscribe", func() { - c := new(gin.Context) - b := newEventbus() - ToContext(c, b) - - c1 := make(chan *Event) - c2 := make(chan *Event) - Subscribe(c, c1) - Subscribe(c, c2) - - g.Assert(len(b.subs)).Equal(2) - }) - - g.It("Should subscribe", func() { - c := new(gin.Context) - b := newEventbus() - ToContext(c, b) - - c1 := make(chan *Event) - c2 := make(chan *Event) - Subscribe(c, c1) - Subscribe(c, c2) - - g.Assert(len(b.subs)).Equal(2) - - Unsubscribe(c, c1) - Unsubscribe(c, c2) - - g.Assert(len(b.subs)).Equal(0) - }) - - g.It("Should publish", func() { - c := new(gin.Context) - b := New() - ToContext(c, b) - - e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) - e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) - c1 := make(chan *Event) - - Subscribe(c, c1) - - var wg sync.WaitGroup - wg.Add(1) - - var r1, r2 *Event - go func() { - r1 = <-c1 - r2 = <-c1 - wg.Done() - }() - Publish(c, e1) - Publish(c, e2) - wg.Wait() - }) - }) - -} diff --git a/bus/context.go b/bus/context.go deleted file mode 100644 index 4eccfa7f0..000000000 --- a/bus/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package bus - -import "golang.org/x/net/context" - -const key = "bus" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Bus associated with this context. -func FromContext(c context.Context) Bus { - return c.Value(key).(Bus) -} - -// ToContext adds the Bus to this context if it supports -// the Setter interface. -func ToContext(c Setter, b Bus) { - c.Set(key, b) -} diff --git a/client/client.go b/client/client.go index 8a423aad2..2d849cb7e 100644 --- a/client/client.go +++ b/client/client.go @@ -4,7 +4,6 @@ import ( "io" "github.com/drone/drone/model" - "github.com/drone/drone/queue" ) // Client is used to communicate with a Drone server. @@ -103,27 +102,4 @@ type Client interface { // AgentList returns a list of build agents. AgentList() ([]*model.Agent, error) - - // - // below items for Queue (internal use only) - // - - // Pull pulls work from the server queue. - Pull(os, arch string) (*queue.Work, error) - - // Push pushes an update to the server. - Push(*queue.Work) error - - // Stream streams the build logs to the server. - Stream(int64, io.ReadCloser) error - - LogStream(int64) (StreamWriter, error) - - LogPost(int64, io.ReadCloser) error - - // Wait waits for the job to the complete. - Wait(int64) *Wait - - // Ping the server - Ping() error } diff --git a/client/client_impl.go b/client/client_impl.go index ca661a88e..d877c4665 100644 --- a/client/client_impl.go +++ b/client/client_impl.go @@ -12,10 +12,6 @@ import ( "strconv" "github.com/drone/drone/model" - "github.com/drone/drone/queue" - "github.com/gorilla/websocket" - "golang.org/x/net/context" - "golang.org/x/net/context/ctxhttp" "golang.org/x/oauth2" ) @@ -323,110 +319,6 @@ func (c *client) AgentList() ([]*model.Agent, error) { return out, err } -// -// below items for Queue (internal use only) -// - -// Pull pulls work from the server queue. -func (c *client) Pull(os, arch string) (*queue.Work, error) { - out := new(queue.Work) - uri := fmt.Sprintf(pathPull, c.base, os, arch) - err := c.post(uri, nil, out) - return out, err -} - -// Push pushes an update to the server. -func (c *client) Push(p *queue.Work) error { - uri := fmt.Sprintf(pathPush, c.base, p.Job.ID) - err := c.post(uri, p, nil) - return err -} - -// Ping pings the server. -func (c *client) Ping() error { - uri := fmt.Sprintf(pathPing, c.base) - err := c.post(uri, nil, nil) - return err -} - -// Stream streams the build logs to the server. -func (c *client) Stream(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathStream, c.base, id) - err := c.post(uri, rc, nil) - - return err -} - -// LogPost sends the full build logs to the server. -func (c *client) LogPost(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathLogs, c.base, id) - return c.post(uri, rc, nil) -} - -// StreamWriter implements a special writer for streaming log entries to the -// central Drone server. The standard implementation is the gorilla.Connection. -type StreamWriter interface { - Close() error - WriteJSON(interface{}) error -} - -// LogStream streams the build logs to the server. -func (c *client) LogStream(id int64) (StreamWriter, error) { - rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token) - uri, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - if uri.Scheme == "https" { - uri.Scheme = "wss" - } else { - uri.Scheme = "ws" - } - - // TODO need TLS client configuration - - conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil) - return conn, err -} - -// Wait watches and waits for the build to cancel or finish. -func (c *client) Wait(id int64) *Wait { - ctx, cancel := context.WithCancel(context.Background()) - return &Wait{id, c, ctx, cancel} -} - -type Wait struct { - id int64 - client *client - - ctx context.Context - cancel context.CancelFunc -} - -func (w *Wait) Done() (*model.Job, error) { - uri := fmt.Sprintf(pathWait, w.client.base, w.id) - req, err := w.client.createRequest(uri, "POST", nil) - if err != nil { - return nil, err - } - - res, err := ctxhttp.Do(w.ctx, w.client.client, req) - if err != nil { - return nil, err - } - defer res.Body.Close() - job := &model.Job{} - err = json.NewDecoder(res.Body).Decode(&job) - if err != nil { - return nil, err - } - return job, nil -} - -func (w *Wait) Cancel() { - w.cancel() -} - // // http request helper functions // diff --git a/drone/agent/agent.go b/drone/agent/agent.go index 48f1657fa..c9a51eaaa 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -3,17 +3,19 @@ package agent import ( "os" "os/signal" + "strings" "sync" "syscall" "time" - "github.com/drone/drone/client" - "github.com/drone/drone/shared/token" - "github.com/samalba/dockerclient" + "github.com/drone/drone/model" + "github.com/drone/mq/logger" + "github.com/drone/mq/stomp" + "github.com/tidwall/redlog" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" - "strings" + "github.com/samalba/dockerclient" ) // AgentCmd is the exported command for starting the drone agent. @@ -57,17 +59,11 @@ var AgentCmd = cli.Command{ Usage: "docker architecture system", Value: "amd64", }, - cli.StringFlag{ - EnvVar: "DRONE_STORAGE_DRIVER", - Name: "drone-storage-driver", - Usage: "docker storage driver", - Value: "overlay", - }, cli.StringFlag{ EnvVar: "DRONE_SERVER", Name: "drone-server", Usage: "drone server address", - Value: "http://localhost:8000", + Value: "ws://localhost:8000/ws/broker", }, cli.StringFlag{ EnvVar: "DRONE_TOKEN", @@ -102,6 +98,11 @@ var AgentCmd = cli.Command{ Usage: "drone timeout due to log inactivity", Value: time.Minute * 5, }, + cli.StringFlag{ + EnvVar: "DRONE_FILTER", + Name: "filter", + Usage: "filter jobs processed by this agent", + }, cli.IntFlag{ EnvVar: "DRONE_MAX_LOGS", Name: "max-log-size", @@ -137,30 +138,31 @@ var AgentCmd = cli.Command{ func start(c *cli.Context) { + log := redlog.New(os.Stderr) + log.SetLevel(0) + logger.SetLogger(log) + // debug level if requested by user if c.Bool("debug") { logrus.SetLevel(logrus.DebugLevel) + + log.SetLevel(1) } else { logrus.SetLevel(logrus.WarnLevel) } var accessToken string if c.String("drone-secret") != "" { - secretToken := c.String("drone-secret") - accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken) + // secretToken := c.String("drone-secret") + accessToken = c.String("drone-secret") + // accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken) } else { accessToken = c.String("drone-token") } - logrus.Infof("Connecting to %s with token %s", - c.String("drone-server"), - accessToken, - ) + logger.Noticef("connecting to server%s", c.String("drone-server")) - client := client.NewClientToken( - strings.TrimRight(c.String("drone-server"), "/"), - accessToken, - ) + server := strings.TrimRight(c.String("drone-server"), "/") tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) if err == nil { @@ -171,42 +173,76 @@ func start(c *cli.Context) { logrus.Fatal(err) } - go func() { - for { - if err := client.Ping(); err != nil { - logrus.Warnf("unable to ping the server. %s", err.Error()) - } - time.Sleep(c.Duration("ping")) - } - }() + var client *stomp.Client - var wg sync.WaitGroup - for i := 0; i < c.Int("docker-max-procs"); i++ { - wg.Add(1) - go func() { - r := pipeline{ - drone: client, - docker: docker, - config: config{ - platform: c.String("docker-os") + "/" + c.String("docker-arch"), - timeout: c.Duration("timeout"), - namespace: c.String("namespace"), - privileged: c.StringSlice("privileged"), - pull: c.BoolT("pull"), - logs: int64(c.Int("max-log-size")) * 1000000, - }, - } - for { - if err := r.run(); err != nil { - dur := c.Duration("backoff") - logrus.Warnf("reconnect in %v. %s", dur, err.Error()) - time.Sleep(dur) - } - } + handler := func(m *stomp.Message) { + running.Add(1) + defer func() { + running.Done() + client.Ack(m.Ack) }() + + r := pipeline{ + drone: client, + docker: docker, + config: config{ + platform: c.String("docker-os") + "/" + c.String("docker-arch"), + timeout: c.Duration("timeout"), + namespace: c.String("namespace"), + privileged: c.StringSlice("privileged"), + pull: c.BoolT("pull"), + logs: int64(c.Int("max-log-size")) * 1000000, + }, + } + + work := new(model.Work) + m.Unmarshal(work) + r.run(work) } + handleSignals() - wg.Wait() + + backoff := c.Duration("backoff") + + for { + // dial the drone server to establish a TCP connection. + client, err = stomp.Dial(server) + if err != nil { + logger.Warningf("connection failed, retry in %v. %s", backoff, err) + <-time.After(backoff) + continue + } + opts := []stomp.MessageOption{ + stomp.WithCredentials("x-token", accessToken), + } + + // initialize the stomp session and authenticate. + if err = client.Connect(opts...); err != nil { + logger.Warningf("session failed, retry in %v", backoff, err) + <-time.After(backoff) + continue + } + + opts = []stomp.MessageOption{ + stomp.WithAck("client"), + stomp.WithPrefetch( + c.Int("docker-max-procs"), + ), + } + if filter := c.String("filter"); filter != "" { + opts = append(opts, stomp.WithSelector(filter)) + } + + // subscribe to the pending build queue. + client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) { + go handler(m) // HACK until we a channel based Subscribe implementation + }), opts...) + + logger.Noticef("connection establish, ready to process builds.") + <-client.Done() + + logger.Warningf("connection interrupted, attempting to reconnect.") + } } // tracks running builds @@ -220,10 +256,10 @@ func handleSignals() { go func() { <-c - logrus.Debugln("SIGTERM received.") - logrus.Debugln("wait for running builds to finish.") + logger.Warningf("SIGTERM received.") + logger.Warningf("wait for running builds to finish.") running.Wait() - logrus.Debugln("done.") + logger.Warningf("done.") os.Exit(0) }() } diff --git a/drone/agent/exec.go b/drone/agent/exec.go index bd5381141..7ad0b458c 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -1,14 +1,13 @@ package agent import ( - "bytes" - "io/ioutil" "time" "github.com/Sirupsen/logrus" "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" - "github.com/drone/drone/client" + "github.com/drone/drone/model" + "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" ) @@ -23,20 +22,16 @@ type config struct { } type pipeline struct { - drone client.Client + drone *stomp.Client docker dockerclient.Client config config } -func (r *pipeline) run() error { - w, err := r.drone.Pull("linux", "amd64") - if err != nil { - return err - } - running.Add(1) - defer func() { - running.Done() - }() +func (r *pipeline) run(w *model.Work) { + + // defer func() { + // // r.drone.Ack(id, opts) + // }() logrus.Infof("Starting build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) @@ -44,24 +39,9 @@ func (r *pipeline) run() error { cancel := make(chan bool, 1) engine := docker.NewClient(r.docker) - // streaming the logs - // rc, wc := io.Pipe() - // defer func() { - // wc.Close() - // rc.Close() - // }() - - var buf bytes.Buffer - - stream, err := r.drone.LogStream(w.Job.ID) - if err != nil { - return err - } - a := agent.Agent{ - Update: agent.NewClientUpdater(r.drone), - // Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs), - Logger: agent.NewStreamLogger(stream, &buf, r.config.logs), + Update: agent.NewClientUpdater(r.drone), + Logger: agent.NewClientLogger(r.drone, w.Job.ID, r.config.logs), Engine: engine, Timeout: r.config.timeout, Platform: r.config.platform, @@ -70,27 +50,34 @@ func (r *pipeline) run() error { Pull: r.config.pull, } - // signal for canceling the build. - wait := r.drone.Wait(w.Job.ID) - defer wait.Cancel() - go func() { - if _, err := wait.Done(); err == nil { + cancelFunc := func(m *stomp.Message) { + defer m.Release() + + id := m.Header.GetInt64("job-id") + if id == w.Job.ID { cancel <- true logrus.Infof("Cancel build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) } + } + + // signal for canceling the build. + sub, err := r.drone.Subscribe("/topic/cancel", stomp.HandlerFunc(cancelFunc)) + if err != nil { + logrus.Errorf("Error subscribing to /topic/cancel. %s", err) + } + defer func() { + r.drone.Unsubscribe(sub) }() a.Run(w, cancel) - if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { - logrus.Errorf("Error sending logs for %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - } - stream.Close() + // if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { + // logrus.Errorf("Error sending logs for %s/%s#%d.%d", + // w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + // } + // stream.Close() logrus.Infof("Finished build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - - return nil } diff --git a/drone/exec.go b/drone/exec.go index c766707df..f061f1bc9 100644 --- a/drone/exec.go +++ b/drone/exec.go @@ -13,7 +13,6 @@ import ( "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/yaml" "github.com/codegangsta/cli" @@ -340,7 +339,7 @@ func exec(c *cli.Context) error { Pull: c.Bool("pull"), } - payload := &queue.Work{ + payload := &model.Work{ Yaml: string(file), Verified: c.BoolT("yaml.verified"), Signed: c.BoolT("yaml.signed"), diff --git a/drone/server.go b/drone/server.go index f930ae373..465595ce3 100644 --- a/drone/server.go +++ b/drone/server.go @@ -6,10 +6,10 @@ import ( "github.com/drone/drone/router" "github.com/drone/drone/router/middleware" - "github.com/gin-gonic/contrib/ginrus" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/gin-gonic/contrib/ginrus" ) var serverCmd = cli.Command{ @@ -288,13 +288,11 @@ func server(c *cli.Context) error { ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), middleware.Version, middleware.Config(c), - middleware.Queue(c), - middleware.Stream(c), - middleware.Bus(c), middleware.Cache(c), middleware.Store(c), middleware.Remote(c), middleware.Agents(c), + middleware.Broker(c), ) // start the server with tls enabled diff --git a/bus/types.go b/model/event.go similarity index 61% rename from bus/types.go rename to model/event.go index 6be95a3d7..efee037cc 100644 --- a/bus/types.go +++ b/model/event.go @@ -1,6 +1,4 @@ -package bus - -import "github.com/drone/drone/model" +package model // EventType defines the possible types of build events. type EventType string @@ -14,15 +12,15 @@ const ( // Event represents a build event. type Event struct { - Type EventType `json:"type"` - Repo model.Repo `json:"repo"` - Build model.Build `json:"build"` - Job model.Job `json:"job"` + Type EventType `json:"type"` + Repo Repo `json:"repo"` + Build Build `json:"build"` + Job Job `json:"job"` } // NewEvent creates a new Event for the build, using copies of // the build data to avoid possible mutation or race conditions. -func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { +func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event { return &Event{ Type: t, Repo: *r, @@ -31,7 +29,7 @@ func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { } } -func NewBuildEvent(t EventType, r *model.Repo, b *model.Build) *Event { +func NewBuildEvent(t EventType, r *Repo, b *Build) *Event { return &Event{ Type: t, Repo: *r, diff --git a/model/work.go b/model/work.go new file mode 100644 index 000000000..06c69d91c --- /dev/null +++ b/model/work.go @@ -0,0 +1,19 @@ +package model + +// Work represents an item for work to be +// processed by a worker. +type Work struct { + Signed bool `json:"signed"` + Verified bool `json:"verified"` + Yaml string `json:"config"` + YamlEnc string `json:"secret"` + Repo *Repo `json:"repo"` + Build *Build `json:"build"` + BuildLast *Build `json:"build_last"` + Job *Job `json:"job"` + Netrc *Netrc `json:"netrc"` + Keys *Key `json:"keys"` + System *System `json:"system"` + Secrets []*Secret `json:"secrets"` + User *User `json:"user"` +} diff --git a/queue/context.go b/queue/context.go deleted file mode 100644 index 98a78e9ec..000000000 --- a/queue/context.go +++ /dev/null @@ -1,23 +0,0 @@ -package queue - -import ( - "golang.org/x/net/context" -) - -const key = "queue" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Queue associated with this context. -func FromContext(c context.Context) Queue { - return c.Value(key).(Queue) -} - -// ToContext adds the Queue to this context if it supports -// the Setter interface. -func ToContext(c Setter, q Queue) { - c.Set(key, q) -} diff --git a/queue/queue.go b/queue/queue.go deleted file mode 100644 index 0cdf2e6fc..000000000 --- a/queue/queue.go +++ /dev/null @@ -1,67 +0,0 @@ -package queue - -//go:generate mockery -name Queue -output mock -case=underscore - -import ( - "errors" - - "golang.org/x/net/context" -) - -// ErrNotFound indicates the requested work item does not -// exist in the queue. -var ErrNotFound = errors.New("queue item not found") - -type Queue interface { - // Publish inserts work at the tail of this queue, waiting for - // space to become available if the queue is full. - Publish(*Work) error - - // Remove removes the specified work item from this queue, - // if it is present. - Remove(*Work) error - - // PullClose retrieves and removes the head of this queue, - // waiting if necessary until work becomes available. - Pull() *Work - - // PullClose retrieves and removes the head of this queue, - // waiting if necessary until work becomes available. The - // CloseNotifier should be provided to clone the channel - // if the subscribing client terminates its connection. - PullClose(CloseNotifier) *Work -} - -// Publish inserts work at the tail of this queue, waiting for -// space to become available if the queue is full. -func Publish(c context.Context, w *Work) error { - return FromContext(c).Publish(w) -} - -// Remove removes the specified work item from this queue, -// if it is present. -func Remove(c context.Context, w *Work) error { - return FromContext(c).Remove(w) -} - -// Pull retrieves and removes the head of this queue, -// waiting if necessary until work becomes available. -func Pull(c context.Context) *Work { - return FromContext(c).Pull() -} - -// PullClose retrieves and removes the head of this queue, -// waiting if necessary until work becomes available. The -// CloseNotifier should be provided to clone the channel -// if the subscribing client terminates its connection. -func PullClose(c context.Context, cn CloseNotifier) *Work { - return FromContext(c).PullClose(cn) -} - -// CloseNotifier defines a datastructure that is capable of notifying -// a subscriber when its connection is closed. -type CloseNotifier interface { - // CloseNotify returns a channel that receives a single value - // when the client connection has gone away. - CloseNotify() <-chan bool -} diff --git a/queue/queue_impl.go b/queue/queue_impl.go deleted file mode 100644 index 8882bc24d..000000000 --- a/queue/queue_impl.go +++ /dev/null @@ -1,85 +0,0 @@ -package queue - -import "sync" - -type queue struct { - sync.Mutex - - items map[*Work]struct{} - itemc chan *Work -} - -func New() Queue { - return newQueue() -} - -func newQueue() *queue { - return &queue{ - items: make(map[*Work]struct{}), - itemc: make(chan *Work, 999), - } -} - -func (q *queue) Publish(work *Work) error { - q.Lock() - q.items[work] = struct{}{} - q.Unlock() - q.itemc <- work - return nil -} - -func (q *queue) Remove(work *Work) error { - q.Lock() - defer q.Unlock() - - _, ok := q.items[work] - if !ok { - return ErrNotFound - } - var items []*Work - - // loop through and drain all items - // from the -drain: - for { - select { - case item := <-q.itemc: - items = append(items, item) - default: - break drain - } - } - - // re-add all items to the queue except - // the item we're trying to remove - for _, item := range items { - if item == work { - delete(q.items, work) - continue - } - q.itemc <- item - } - return nil -} - -func (q *queue) Pull() *Work { - work := <-q.itemc - q.Lock() - delete(q.items, work) - q.Unlock() - return work -} - -func (q *queue) PullClose(cn CloseNotifier) *Work { - for { - select { - case <-cn.CloseNotify(): - return nil - case work := <-q.itemc: - q.Lock() - delete(q.items, work) - q.Unlock() - return work - } - } -} diff --git a/queue/queue_impl_test.go b/queue/queue_impl_test.go deleted file mode 100644 index 778576232..000000000 --- a/queue/queue_impl_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package queue - -import ( - "sync" - "testing" - - . "github.com/franela/goblin" - "github.com/gin-gonic/gin" -) - -func TestBuild(t *testing.T) { - g := Goblin(t) - g.Describe("Queue", func() { - - g.It("Should publish item", func() { - c := new(gin.Context) - q := newQueue() - ToContext(c, q) - - w1 := &Work{} - w2 := &Work{} - Publish(c, w1) - Publish(c, w2) - g.Assert(len(q.items)).Equal(2) - g.Assert(len(q.itemc)).Equal(2) - }) - - g.It("Should remove item", func() { - c := new(gin.Context) - q := newQueue() - ToContext(c, q) - - w1 := &Work{} - w2 := &Work{} - w3 := &Work{} - Publish(c, w1) - Publish(c, w2) - Publish(c, w3) - Remove(c, w2) - g.Assert(len(q.items)).Equal(2) - g.Assert(len(q.itemc)).Equal(2) - - g.Assert(Pull(c)).Equal(w1) - g.Assert(Pull(c)).Equal(w3) - g.Assert(Remove(c, w2)).Equal(ErrNotFound) - }) - - g.It("Should pull item", func() { - c := new(gin.Context) - q := New() - ToContext(c, q) - - cn := new(closeNotifier) - cn.closec = make(chan bool, 1) - w1 := &Work{} - w2 := &Work{} - - Publish(c, w1) - g.Assert(Pull(c)).Equal(w1) - - Publish(c, w2) - g.Assert(PullClose(c, cn)).Equal(w2) - }) - - g.It("Should cancel pulling item", func() { - c := new(gin.Context) - q := New() - ToContext(c, q) - - cn := new(closeNotifier) - cn.closec = make(chan bool, 1) - var wg sync.WaitGroup - go func() { - wg.Add(1) - g.Assert(PullClose(c, cn) == nil).IsTrue() - wg.Done() - }() - go func() { - cn.closec <- true - }() - wg.Wait() - - }) - }) -} - -type closeNotifier struct { - closec chan bool -} - -func (c *closeNotifier) CloseNotify() <-chan bool { - return c.closec -} diff --git a/queue/types.go b/queue/types.go deleted file mode 100644 index 48fc41942..000000000 --- a/queue/types.go +++ /dev/null @@ -1,21 +0,0 @@ -package queue - -import "github.com/drone/drone/model" - -// Work represents an item for work to be -// processed by a worker. -type Work struct { - Signed bool `json:"signed"` - Verified bool `json:"verified"` - Yaml string `json:"config"` - YamlEnc string `json:"secret"` - Repo *model.Repo `json:"repo"` - Build *model.Build `json:"build"` - BuildLast *model.Build `json:"build_last"` - Job *model.Job `json:"job"` - Netrc *model.Netrc `json:"netrc"` - Keys *model.Key `json:"keys"` - System *model.System `json:"system"` - Secrets []*model.Secret `json:"secrets"` - User *model.User `json:"user"` -} diff --git a/router/middleware/broker.go b/router/middleware/broker.go new file mode 100644 index 000000000..2222df510 --- /dev/null +++ b/router/middleware/broker.go @@ -0,0 +1,52 @@ +package middleware + +import ( + "sync" + + handlers "github.com/drone/drone/server" + + "github.com/codegangsta/cli" + "github.com/drone/mq/server" + "github.com/drone/mq/stomp" + + "github.com/Sirupsen/logrus" + "github.com/gin-gonic/gin" +) + +const ( + serverKey = "broker" + clientKey = "stomp.client" // mirrored from stomp/context +) + +// Broker is a middleware function that initializes the broker +// and adds the broker client to the request context. +func Broker(cli *cli.Context) gin.HandlerFunc { + secret := cli.String("agent-secret") + if secret == "" { + logrus.Fatalf("failed to generate token from DRONE_SECRET") + } + + broker := server.NewServer( + server.WithCredentials("x-token", secret), + ) + client := broker.Client() + + var once sync.Once + return func(c *gin.Context) { + c.Set(serverKey, broker) + c.Set(clientKey, client) + once.Do(func() { + // this is some really hacky stuff + // turns out I need to do some refactoring + // don't judge! + // will fix in 0.6 release + ctx := c.Copy() + client.Connect( + stomp.WithCredentials("x-token", secret), + ) + client.Subscribe("/queue/updates", stomp.HandlerFunc(func(m *stomp.Message) { + go handlers.HandleUpdate(ctx, m.Copy()) + })) + }) + } +} diff --git a/router/middleware/bus.go b/router/middleware/bus.go deleted file mode 100644 index 25665da1d..000000000 --- a/router/middleware/bus.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/bus" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Bus is a middleware function that initializes the Event Bus and attaches to -// the context of every http.Request. -func Bus(cli *cli.Context) gin.HandlerFunc { - v := bus.New() - return func(c *gin.Context) { - bus.ToContext(c, v) - } -} diff --git a/router/middleware/queue.go b/router/middleware/queue.go deleted file mode 100644 index d2791033e..000000000 --- a/router/middleware/queue.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/queue" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Queue is a middleware function that initializes the Queue and attaches to -// the context of every http.Request. -func Queue(cli *cli.Context) gin.HandlerFunc { - v := queue.New() - return func(c *gin.Context) { - queue.ToContext(c, v) - } -} diff --git a/router/middleware/stream.go b/router/middleware/stream.go deleted file mode 100644 index d78a119c2..000000000 --- a/router/middleware/stream.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/stream" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Stream is a middleware function that initializes the Stream and attaches to -// the context of every http.Request. -func Stream(cli *cli.Context) gin.HandlerFunc { - v := stream.New() - return func(c *gin.Context) { - stream.ToContext(c, v) - } -} diff --git a/router/router.go b/router/router.go index d3c8590e9..2bb7f6160 100644 --- a/router/router.go +++ b/router/router.go @@ -113,17 +113,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { e.POST("/hook", server.PostHook) e.POST("/api/hook", server.PostHook) - stream := e.Group("/api/stream") - { - stream.Use(session.SetRepo()) - stream.Use(session.SetPerm()) - stream.Use(session.MustPull) - - stream.GET("/:owner/:name", server.GetRepoEvents) - stream.GET("/:owner/:name/:build/:number", server.GetStream) - } ws := e.Group("/ws") { + ws.GET("/broker", server.Broker) ws.GET("/feed", server.EventStream) ws.GET("/logs/:owner/:name/:build/:number", session.SetRepo(), @@ -152,20 +144,6 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { agents.GET("", server.GetAgents) } - queue := e.Group("/api/queue") - { - queue.Use(session.AuthorizeAgent) - queue.POST("/pull", server.Pull) - queue.POST("/pull/:os/:arch", server.Pull) - queue.POST("/wait/:id", server.Wait) - queue.POST("/stream/:id", server.Stream) - queue.POST("/status/:id", server.Update) - queue.POST("/ping", server.Ping) - - queue.POST("/logs/:id", server.PostLogs) - queue.GET("/logs/:id", server.WriteLogs) - } - // DELETE THESE // gitlab := e.Group("/gitlab/:owner/:name") // { diff --git a/server/broker.go b/server/broker.go new file mode 100644 index 000000000..83a97b870 --- /dev/null +++ b/server/broker.go @@ -0,0 +1,13 @@ +package server + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +// Broker handles connections to the embedded message broker. +func Broker(c *gin.Context) { + broker := c.MustGet("broker").(http.Handler) + broker.ServeHTTP(c.Writer, c.Request) +} diff --git a/server/build.go b/server/build.go index 391688b5d..e6f71361a 100644 --- a/server/build.go +++ b/server/build.go @@ -1,22 +1,23 @@ package server import ( + "bufio" + "io" "net/http" "strconv" "time" log "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" - "github.com/drone/drone/stream" + "github.com/drone/drone/yaml" "github.com/gin-gonic/gin" "github.com/square/go-jose" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" + "github.com/drone/mq/stomp" ) func GetBuilds(c *gin.Context) { @@ -112,7 +113,7 @@ func GetBuildLogs(c *gin.Context) { } c.Header("Content-Type", "application/json") - stream.Copy(c.Writer, r) + copyLogs(c.Writer, r) } func DeleteBuild(c *gin.Context) { @@ -148,7 +149,14 @@ func DeleteBuild(c *gin.Context) { job.ExitCode = 137 store.UpdateBuildJob(c, build, job) - bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/cancel", model.Event{ + Type: model.Cancelled, + Repo: *repo, + Build: *build, + Job: *job, + }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10))) + c.String(204, "") } @@ -294,7 +302,7 @@ func PostBuild(c *gin.Context) { last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) secs, err := store.GetMergedSecretList(c, repo) if err != nil { - log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } var signed bool @@ -319,9 +327,19 @@ func PostBuild(c *gin.Context) { log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/events", model.Event{ + Type: model.Enqueued, + Repo: *repo, + Build: *build, + }, + stomp.WithHeader("repo", repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)), + ) + for _, job := range jobs { - queue.Publish(c, &queue.Work{ + broker, _ := stomp.FromContext(c) + broker.SendJSON("/queue/pending", &model.Work{ Signed: signed, Verified: verified, User: user, @@ -333,7 +351,15 @@ func PostBuild(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }) + }, + stomp.WithHeader( + "platform", + yaml.ParsePlatformDefault(raw, "linux/amd64"), + ), + stomp.WithHeaders( + yaml.ParseLabel(raw), + ), + ) } } @@ -345,3 +371,20 @@ func GetBuildQueue(c *gin.Context) { } c.JSON(200, out) } + +// copyLogs copies the stream from the source to the destination in valid JSON +// format. This converts the logs, which are per-line JSON objects, to a +// proper JSON array. +func copyLogs(dest io.Writer, src io.Reader) error { + io.WriteString(dest, "[") + + scanner := bufio.NewScanner(src) + for scanner.Scan() { + io.WriteString(dest, scanner.Text()) + io.WriteString(dest, ",\n") + } + + io.WriteString(dest, "{}]") + + return nil +} diff --git a/server/hook.go b/server/hook.go index 1c93ed867..9f87437e6 100644 --- a/server/hook.go +++ b/server/hook.go @@ -3,19 +3,19 @@ package server import ( "fmt" "regexp" + "strconv" "github.com/gin-gonic/gin" "github.com/square/go-jose" log "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/shared/token" "github.com/drone/drone/store" "github.com/drone/drone/yaml" + "github.com/drone/mq/stomp" ) var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`) @@ -208,12 +208,22 @@ func PostHook(c *gin.Context) { last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) secs, err := store.GetMergedSecretList(c, repo) if err != nil { - log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/events", model.Event{ + Type: model.Enqueued, + Repo: *repo, + Build: *build, + }, + stomp.WithHeader("repo", repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)), + ) + for _, job := range jobs { - queue.Publish(c, &queue.Work{ + broker, _ := stomp.FromContext(c) + broker.SendJSON("/queue/pending", &model.Work{ Signed: build.Signed, Verified: build.Verified, User: user, @@ -225,7 +235,15 @@ func PostHook(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }) + }, + stomp.WithHeader( + "platform", + yaml.ParsePlatformDefault(raw, "linux/amd64"), + ), + stomp.WithHeaders( + yaml.ParseLabel(raw), + ), + ) } } diff --git a/server/queue.go b/server/queue.go index 7cdaf863f..155b9c039 100644 --- a/server/queue.go +++ b/server/queue.go @@ -1,80 +1,46 @@ package server import ( + "bytes" "fmt" - "io" "net/http" "strconv" - "sync" "time" + "golang.org/x/net/context" + "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/store" - "github.com/drone/drone/stream" - "github.com/gin-gonic/gin" + "github.com/drone/mq/stomp" "github.com/gorilla/websocket" ) -// Pull is a long request that polls and attemts to pull work off the queue stack. -func Pull(c *gin.Context) { - logrus.Debugf("Agent %s connected.", c.ClientIP()) +// newline defines a newline constant to separate lines in the build output +var newline = []byte{'\n'} - w := queue.PullClose(c, c.Writer) - if w == nil { - logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) - } else { - - // setup the channel to stream logs - if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { - logrus.Errorf("Unable to create stream. %s", err) - } - - c.JSON(202, w) - - logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", - c.ClientIP(), - w.Repo.Owner, - w.Repo.Name, - w.Build.Number, - w.Job.Number, - ) - } +// upgrader defines the default behavior for upgrading the websocket. +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, } -// Wait is a long request that polls and waits for cancelled build requests. -func Wait(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - eventc := make(chan *bus.Event, 1) - - bus.Subscribe(c, eventc) - defer bus.Unsubscribe(c, eventc) - - for { - select { - case event := <-eventc: - if event.Job.ID == id && event.Type == bus.Cancelled { - c.JSON(200, event.Job) - return - } - case <-c.Writer.CloseNotify(): - return +// HandleUpdate handles build updates from the agent and persists to the database. +func HandleUpdate(c context.Context, message *stomp.Message) { + defer func() { + message.Release() + if r := recover(); r != nil { + err := r.(error) + logrus.Errorf("Panic recover: broker update handler: %s", err) } - } -} + }() -// Update handles build updates from the agent and persists to the database. -func Update(c *gin.Context) { - work := &queue.Work{} - if err := c.BindJSON(work); err != nil { + work := new(model.Work) + if err := message.Unmarshal(work); err != nil { logrus.Errorf("Invalid input. %s", err) return } @@ -85,12 +51,12 @@ func Update(c *gin.Context) { // empty values if we just saved what was coming in the http.Request body. build, err := store.GetBuild(c, work.Build.ID) if err != nil { - c.String(404, "Unable to find build. %s", err) + logrus.Errorf("Unable to find build. %s", err) return } job, err := store.GetJob(c, work.Job.ID) if err != nil { - c.String(404, "Unable to find job. %s", err) + logrus.Errorf("Unable to find job. %s", err) return } build.Started = work.Build.Started @@ -117,189 +83,81 @@ func Update(c *gin.Context) { ok, err := store.UpdateBuildJob(c, build, job) if err != nil { - c.String(500, "Unable to update job. %s", err) + logrus.Errorf("Unable to update job. %s", err) return } - if ok && build.Status != model.StatusRunning { + if ok { // get the user because we transfer the user form the server to agent // and back we lose the token which does not get serialized to json. - user, err := store.GetUser(c, work.User.ID) - if err != nil { - c.String(500, "Unable to find user. %s", err) + user, uerr := store.GetUser(c, work.User.ID) + if uerr != nil { + logrus.Errorf("Unable to find user. %s", err) return } remote.Status(c, user, work.Repo, build, fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) } - if build.Status == model.StatusRunning { - bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) - } else { - bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) - } - - c.JSON(200, work) -} - -// Stream streams the logs to disk or memory for broadcasing to listeners. Once -// the stream is closed it is moved to permanent storage in the database. -func Stream(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - key := c.Param("id") - logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) - - wc, err := stream.Writer(c, key) - if err != nil { - c.String(500, "Failed to create stream writer. %s", err) - return - } - - defer func() { - wc.Close() - stream.Delete(c, key) - }() - - io.Copy(wc, c.Request.Body) - - rc, err := stream.Reader(c, key) - if err != nil { - c.String(500, "Failed to create stream reader. %s", err) - return - } - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer recover() - store.WriteLog(c, &model.Job{ID: id}, rc) - wg.Done() - }() - - wc.Close() - wg.Wait() - c.String(200, "") - - logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) -} - -func Ping(c *gin.Context) { - agent, err := store.GetAgentAddr(c, c.ClientIP()) - if err == nil { - agent.Updated = time.Now().Unix() - err = store.UpdateAgent(c, agent) - } else { - err = store.CreateAgent(c, &model.Agent{ - Address: c.ClientIP(), - Platform: "linux/amd64", - Capacity: 2, - Created: time.Now().Unix(), - Updated: time.Now().Unix(), - }) - } - if err != nil { - logrus.Errorf("Unable to register agent. %s", err.Error()) - } - c.String(200, "PONG") -} - -// -// -// Below are alternate implementations for the Queue that use websockets. -// -// - -// PostLogs handles an http request from the agent to post build logs. These -// logs are posted at the end of the build process. -func PostLogs(c *gin.Context) { - id, _ := strconv.ParseInt(c.Param("id"), 10, 64) - job, err := store.GetJob(c, id) - if err != nil { - c.String(404, "Cannot upload logs. %s", err) - return - } - if err := store.WriteLog(c, job, c.Request.Body); err != nil { - c.String(500, "Cannot persist logs", err) - return - } - c.String(200, "") -} - -// WriteLogs handles an http request from the agent to stream build logs from -// the agent to the server to enable real time streamings to the client. -func WriteLogs(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - c.String(500, "Cannot upgrade to websocket. %s", err) - return - } - defer conn.Close() - - wc, err := stream.Writer(c, stream.ToKey(id)) - if err != nil { - c.String(500, "Cannot create stream writer. %s", err) - return - } - defer func() { - wc.Close() - stream.Delete(c, stream.ToKey(id)) - }() - - var msg []byte - for { - _, msg, err = conn.ReadMessage() - if err != nil { - break - } - wc.Write(msg) - wc.Write(newline) - } - - if err != nil && err != io.EOF { - c.String(500, "Error reading logs. %s", err) - return - } - // - // rc, err := stream.Reader(c, stream.ToKey(id)) - // if err != nil { - // c.String(500, "Failed to create stream reader. %s", err) - // return - // } - // - // wg := sync.WaitGroup{} - // wg.Add(1) - // - // go func() { - // defer recover() - // store.WriteLog(c, &model.Job{ID: id}, rc) - // wg.Done() - // }() - // - // wc.Close() - // wg.Wait() - -} - -// newline defines a newline constant to separate lines in the build output -var newline = []byte{'\n'} - -// upgrader defines the default behavior for upgrading the websocket. -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true + client := stomp.MustFromContext(c) + err = client.SendJSON("/topic/events", model.Event{ + Type: func() model.EventType { + // HACK we don't even really care about the event type. + // so we should just simplify how events are triggered. + if job.Status == model.StatusRunning { + return model.Started + } + return model.Finished + }(), + Repo: *work.Repo, + Build: *build, + Job: *job, }, + stomp.WithHeader("repo", work.Repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(work.Repo.IsPrivate)), + ) + if err != nil { + logrus.Errorf("Unable to publish to /topic/events. %s", err) + } + + if job.Status == model.StatusRunning { + return + } + + var buf bytes.Buffer + var sub []byte + + done := make(chan bool) + dest := fmt.Sprintf("/topic/logs.%d", job.ID) + sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { + defer m.Release() + if m.Header.GetBool("eof") { + done <- true + return + } + buf.Write(m.Body) + buf.WriteByte('\n') + })) + + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) + return + } + + defer func() { + client.Unsubscribe(sub) + client.Send(dest, []byte{}, stomp.WithRetain("remove")) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + logrus.Errorf("Unable to read logs from broker. Timeout. %s", err) + return + } + + if err := store.WriteLog(c, job, &buf); err != nil { + logrus.Errorf("Unable to write logs to store. %s", err) + return + } } diff --git a/server/stream.go b/server/stream.go index b80bbc73a..d4a626a48 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,121 +1,21 @@ package server import ( - "bufio" - "encoding/json" - "io" + "fmt" "strconv" "time" - "github.com/drone/drone/bus" "github.com/drone/drone/cache" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" "github.com/drone/drone/store" - "github.com/drone/drone/stream" + "github.com/drone/mq/stomp" "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" - "github.com/manucorporat/sse" ) -// GetRepoEvents will upgrade the connection to a Websocket and will stream -// event updates to the browser. -func GetRepoEvents(c *gin.Context) { - repo := session.Repo(c) - c.Writer.Header().Set("Content-Type", "text/event-stream") - - eventc := make(chan *bus.Event, 1) - bus.Subscribe(c, eventc) - defer func() { - bus.Unsubscribe(c, eventc) - close(eventc) - logrus.Infof("closed event stream") - }() - - c.Stream(func(w io.Writer) bool { - select { - case event := <-eventc: - if event == nil { - logrus.Infof("nil event received") - return false - } - - // TODO(bradrydzewski) This is a super hacky workaround until we improve - // the actual bus. Having a per-call database event is just plain stupid. - if event.Repo.FullName == repo.FullName { - - var payload = struct { - model.Build - Jobs []*model.Job `json:"jobs"` - }{} - payload.Build = event.Build - payload.Jobs, _ = store.GetJobList(c, &event.Build) - data, _ := json.Marshal(&payload) - - sse.Encode(w, sse.Event{ - Event: "message", - Data: string(data), - }) - } - case <-c.Writer.CloseNotify(): - return false - } - return true - }) -} - -func GetStream(c *gin.Context) { - - repo := session.Repo(c) - buildn, _ := strconv.Atoi(c.Param("build")) - jobn, _ := strconv.Atoi(c.Param("number")) - - c.Writer.Header().Set("Content-Type", "text/event-stream") - - build, err := store.GetBuildNumber(c, repo, buildn) - if err != nil { - logrus.Debugln("stream cannot get build number.", err) - c.AbortWithError(404, err) - return - } - job, err := store.GetJobNumber(c, build, jobn) - if err != nil { - logrus.Debugln("stream cannot get job number.", err) - c.AbortWithError(404, err) - return - } - - rc, err := stream.Reader(c, stream.ToKey(job.ID)) - if err != nil { - c.AbortWithError(404, err) - return - } - - go func() { - <-c.Writer.CloseNotify() - rc.Close() - }() - - var line int - var scanner = bufio.NewScanner(rc) - for scanner.Scan() { - line++ - var err = sse.Encode(c.Writer, sse.Event{ - Id: strconv.Itoa(line), - Event: "message", - Data: scanner.Text(), - }) - if err != nil { - break - } - c.Writer.Flush() - } - - logrus.Debugf("Closed stream %s#%d", repo.FullName, build.Number) -} - var ( // Time allowed to write the file to the client. writeWait = 5 * time.Second @@ -165,47 +65,41 @@ func LogStream(c *gin.Context) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() - rc, err := stream.Reader(c, stream.ToKey(job.ID)) + logs := make(chan []byte) + done := make(chan bool) + dest := fmt.Sprintf("/topic/logs.%d", job.ID) + client, _ := stomp.FromContext(c) + sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { + if m.Header.GetBool("eof") { + done <- true + } else { + logs <- m.Body + } + m.Release() + })) if err != nil { - c.AbortWithError(404, err) + logrus.Errorf("Unable to read logs from broker. %s", err) return } - - quitc := make(chan bool) defer func() { - quitc <- true - close(quitc) - rc.Close() - ws.Close() - logrus.Debug("Successfully closed websocket") + client.Unsubscribe(sub) + close(done) + close(logs) }() - go func() { - defer func() { - recover() - }() - for { - select { - case <-quitc: + for { + select { + case buf := <-logs: + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, buf) + case <-done: + return + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) + if err != nil { return - case <-ticker.C: - err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) - if err != nil { - return - } } } - }() - - var scanner = bufio.NewScanner(rc) - var b []byte - for scanner.Scan() { - b = scanner.Bytes() - if len(b) == 0 { - continue - } - ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteMessage(websocket.TextMessage, b) } } @@ -227,20 +121,34 @@ func EventStream(c *gin.Context) { repo, _ = cache.GetRepoMap(c, user) } - ticker := time.NewTicker(pingPeriod) + eventc := make(chan []byte, 10) quitc := make(chan bool) - eventc := make(chan *bus.Event, 10) - bus.Subscribe(c, eventc) + tick := time.NewTicker(pingPeriod) defer func() { - ticker.Stop() - bus.Unsubscribe(c, eventc) - quitc <- true - close(quitc) - close(eventc) + tick.Stop() ws.Close() logrus.Debug("Successfully closed websocket") }() + client := stomp.MustFromContext(c) + sub, err := client.Subscribe("/topic/events", stomp.HandlerFunc(func(m *stomp.Message) { + name := m.Header.GetString("repo") + priv := m.Header.GetBool("private") + if repo[name] || !priv { + eventc <- m.Body + } + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) + return + } + defer func() { + client.Unsubscribe(sub) + close(quitc) + close(eventc) + }() + go func() { defer func() { recover() @@ -249,15 +157,13 @@ func EventStream(c *gin.Context) { select { case <-quitc: return - case event := <-eventc: - if event == nil { + case event, ok := <-eventc: + if !ok { return } - if repo[event.Repo.FullName] || !event.Repo.IsPrivate { - ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteJSON(event) - } - case <-ticker.C: + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, event) + case <-tick.C: err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) if err != nil { return diff --git a/stream/context.go b/stream/context.go deleted file mode 100644 index e1202cd1b..000000000 --- a/stream/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package stream - -import "golang.org/x/net/context" - -const key = "stream" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Stream associated with this context. -func FromContext(c context.Context) Stream { - return c.Value(key).(Stream) -} - -// ToContext adds the Stream to this context if it supports the -// Setter interface. -func ToContext(c Setter, s Stream) { - c.Set(key, s) -} diff --git a/stream/reader.go b/stream/reader.go deleted file mode 100644 index 935f0f93c..000000000 --- a/stream/reader.go +++ /dev/null @@ -1,54 +0,0 @@ -package stream - -import ( - "bytes" - "io" - "sync/atomic" -) - -type reader struct { - w *writer - off int - closed uint32 -} - -// Read reads from the Buffer -func (r *reader) Read(p []byte) (n int, err error) { - r.w.RLock() - defer r.w.RUnlock() - - var m int - - for len(p) > 0 { - - m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p) - n += m - r.off += n - - if n > 0 { - break - } - - if r.w.Closed() { - err = io.EOF - break - } - if r.Closed() { - err = io.EOF - break - } - - r.w.Wait() - } - - return -} - -func (r *reader) Close() error { - atomic.StoreUint32(&r.closed, 1) - return nil -} - -func (r *reader) Closed() bool { - return atomic.LoadUint32(&r.closed) != 0 -} diff --git a/stream/reader_test.go b/stream/reader_test.go deleted file mode 100644 index e113cc4a4..000000000 --- a/stream/reader_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsReader(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement reader tests -} diff --git a/stream/stream.go b/stream/stream.go deleted file mode 100644 index 2619b53cb..000000000 --- a/stream/stream.go +++ /dev/null @@ -1,60 +0,0 @@ -package stream - -import ( - "bufio" - "io" - "strconv" - - "golang.org/x/net/context" -) - -// Stream manages the stream of build logs. -type Stream interface { - Create(string) error - Delete(string) error - Reader(string) (io.ReadCloser, error) - Writer(string) (io.WriteCloser, error) -} - -// Create creates a new stream. -func Create(c context.Context, key string) error { - return FromContext(c).Create(key) -} - -// Reader opens the stream for reading. -func Reader(c context.Context, key string) (io.ReadCloser, error) { - return FromContext(c).Reader(key) -} - -// Writer opens the stream for writing. -func Writer(c context.Context, key string) (io.WriteCloser, error) { - return FromContext(c).Writer(key) -} - -// Delete deletes the stream by key. -func Delete(c context.Context, key string) error { - return FromContext(c).Delete(key) -} - -// ToKey is a helper function that converts a unique identifier -// of type int64 into a string. -func ToKey(i int64) string { - return strconv.FormatInt(i, 10) -} - -// Copy copies the stream from the source to the destination in valid JSON -// format. This converts the logs, which are per-line JSON objects, to a -// proper JSON array. -func Copy(dest io.Writer, src io.Reader) error { - io.WriteString(dest, "[") - - scanner := bufio.NewScanner(src) - for scanner.Scan() { - io.WriteString(dest, scanner.Text()) - io.WriteString(dest, ",\n") - } - - io.WriteString(dest, "{}]") - - return nil -} diff --git a/stream/stream_impl.go b/stream/stream_impl.go deleted file mode 100644 index 8e21aaf9b..000000000 --- a/stream/stream_impl.go +++ /dev/null @@ -1,72 +0,0 @@ -package stream - -import ( - "fmt" - "io" - "sync" -) - -type stream struct { - sync.Mutex - writers map[string]*writer -} - -// New returns a new in-memory implementation of Stream. -func New() Stream { - return &stream{ - writers: map[string]*writer{}, - } -} - -// Reader returns an io.Reader for reading from to the stream. -func (s *stream) Reader(name string) (io.ReadCloser, error) { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return nil, fmt.Errorf("stream: cannot read stream %s, not found", name) - } - return s.writers[name].Reader() -} - -// Writer returns an io.WriteCloser for writing to the stream. -func (s *stream) Writer(name string) (io.WriteCloser, error) { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return nil, fmt.Errorf("stream: cannot write stream %s, not found", name) - } - return s.writers[name], nil -} - -// Create creates a new stream. -func (s *stream) Create(name string) error { - s.Lock() - defer s.Unlock() - - if s.exists(name) { - return fmt.Errorf("stream: cannot create stream %s, already exists", name) - } - - s.writers[name] = newWriter() - return nil -} - -// Delete deletes the stream by key. -func (s *stream) Delete(name string) error { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return fmt.Errorf("stream: cannot delete stream %s, not found", name) - } - w := s.writers[name] - delete(s.writers, name) - return w.Close() -} - -func (s *stream) exists(name string) bool { - _, exists := s.writers[name] - return exists -} diff --git a/stream/stream_impl_test.go b/stream/stream_impl_test.go deleted file mode 100644 index fdc29fce4..000000000 --- a/stream/stream_impl_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsStream(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement stream tests -} diff --git a/stream/writer.go b/stream/writer.go deleted file mode 100644 index 15a873604..000000000 --- a/stream/writer.go +++ /dev/null @@ -1,52 +0,0 @@ -package stream - -import ( - "bytes" - "io" - "sync" - "sync/atomic" -) - -type writer struct { - sync.RWMutex - *sync.Cond - - buffer bytes.Buffer - closed uint32 -} - -func newWriter() *writer { - var w writer - w.Cond = sync.NewCond(w.RWMutex.RLocker()) - return &w -} - -func (w *writer) Write(p []byte) (n int, err error) { - defer w.Broadcast() - w.Lock() - defer w.Unlock() - if w.Closed() { - return 0, io.EOF - } - return w.buffer.Write(p) -} - -func (w *writer) Reader() (io.ReadCloser, error) { - return &reader{w: w}, nil -} - -func (w *writer) Wait() { - if !w.Closed() { - w.Cond.Wait() - } -} - -func (w *writer) Close() error { - atomic.StoreUint32(&w.closed, 1) - w.Cond.Broadcast() - return nil -} - -func (w *writer) Closed() bool { - return atomic.LoadUint32(&w.closed) != 0 -} diff --git a/stream/writer_test.go b/stream/writer_test.go deleted file mode 100644 index c0c757e10..000000000 --- a/stream/writer_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsWriter(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement writer tests -} diff --git a/yaml/label.go b/yaml/label.go new file mode 100644 index 000000000..445a9b6b4 --- /dev/null +++ b/yaml/label.go @@ -0,0 +1,26 @@ +package yaml + +import ( + "gopkg.in/yaml.v2" + + "github.com/drone/drone/yaml/types" +) + +// ParseLabel parses the labels section of the Yaml document. +func ParseLabel(in []byte) map[string]string { + out := struct { + Labels types.MapEqualSlice `yaml:"labels"` + }{} + + yaml.Unmarshal(in, &out) + labels := out.Labels.Map() + if labels == nil { + labels = make(map[string]string) + } + return labels +} + +// ParseLabelString parses the labels section of the Yaml document. +func ParseLabelString(in string) map[string]string { + return ParseLabel([]byte(in)) +} diff --git a/yaml/label_test.go b/yaml/label_test.go new file mode 100644 index 000000000..3032f33ed --- /dev/null +++ b/yaml/label_test.go @@ -0,0 +1,32 @@ +package yaml + +import ( + "testing" + + "github.com/franela/goblin" +) + +func TestLabel(t *testing.T) { + + g := goblin.Goblin(t) + g.Describe("Label parser", func() { + + g.It("Should parse empty yaml", func() { + labels := ParseLabelString("") + g.Assert(len(labels)).Equal(0) + }) + + g.It("Should parse slice", func() { + labels := ParseLabelString("labels: [foo=bar, baz=boo]") + g.Assert(len(labels)).Equal(2) + g.Assert(labels["foo"]).Equal("bar") + g.Assert(labels["baz"]).Equal("boo") + }) + + g.It("Should parse map", func() { + labels := ParseLabelString("labels: {foo: bar, baz: boo}") + g.Assert(labels["foo"]).Equal("bar") + g.Assert(labels["baz"]).Equal("boo") + }) + }) +} diff --git a/yaml/platform.go b/yaml/platform.go new file mode 100644 index 000000000..69e1cccc2 --- /dev/null +++ b/yaml/platform.go @@ -0,0 +1,26 @@ +package yaml + +import "gopkg.in/yaml.v2" + +// ParsePlatform parses the platform section of the Yaml document. +func ParsePlatform(in []byte) string { + out := struct { + Platform string `yaml:"platform"` + }{} + + yaml.Unmarshal(in, &out) + return out.Platform +} + +// ParsePlatformString parses the platform section of the Yaml document. +func ParsePlatformString(in string) string { + return ParsePlatform([]byte(in)) +} + +// ParsePlatformDefault parses the platform section of the Yaml document. +func ParsePlatformDefault(in []byte, platform string) string { + if p := ParsePlatform([]byte(in)); p != "" { + return p + } + return platform +} diff --git a/yaml/transform/clone.go b/yaml/transform/clone.go index f47abf41f..60c1e2094 100644 --- a/yaml/transform/clone.go +++ b/yaml/transform/clone.go @@ -6,8 +6,11 @@ const clone = "clone" // Clone transforms the Yaml to include a clone step. func Clone(c *yaml.Config, plugin string) error { - if plugin == "" { - plugin = "git" + switch plugin { + case "", "git": + plugin = "plugins/git:latest" + case "hg": + plugin = "plugins/hg:latest" } for _, p := range c.Pipeline { diff --git a/yaml/transform/image.go b/yaml/transform/image.go index a13a983cc..69aee9746 100644 --- a/yaml/transform/image.go +++ b/yaml/transform/image.go @@ -34,28 +34,6 @@ func ImageTag(conf *yaml.Config) error { return nil } -// ImageName transforms the Yaml to replace underscores with dashes. -func ImageName(conf *yaml.Config) error { - for _, image := range conf.Pipeline { - image.Image = strings.Replace(image.Image, "_", "-", -1) - } - return nil -} - -// ImageNamespace transforms the Yaml to use a default namepsace for plugins. -func ImageNamespace(conf *yaml.Config, namespace string) error { - for _, image := range conf.Pipeline { - if strings.Contains(image.Image, "/") { - continue - } - if !isPlugin(image) { - continue - } - image.Image = filepath.Join(namespace, image.Image) - } - return nil -} - // ImageEscalate transforms the Yaml to automatically enable privileged mode // for a subset of white-listed plugins matching the given patterns. func ImageEscalate(conf *yaml.Config, patterns []string) error { diff --git a/yaml/transform/image_test.go b/yaml/transform/image_test.go index d5abbd6dc..26ad37dc3 100644 --- a/yaml/transform/image_test.go +++ b/yaml/transform/image_test.go @@ -128,35 +128,5 @@ func Test_normalize(t *testing.T) { g.Assert(c.Pipeline[0].Image).Equal("golang:1.5") }) }) - - g.Describe("plugins", func() { - - g.It("should prepend namespace", func() { - c := newConfig(&yaml.Container{ - Image: "slack", - }) - - ImageNamespace(c, "plugins") - g.Assert(c.Pipeline[0].Image).Equal("plugins/slack") - }) - - g.It("should not override existing namespace", func() { - c := newConfig(&yaml.Container{ - Image: "index.docker.io/drone/git", - }) - - ImageNamespace(c, "plugins") - g.Assert(c.Pipeline[0].Image).Equal("index.docker.io/drone/git") - }) - - g.It("should replace underscores with dashes", func() { - c := newConfig(&yaml.Container{ - Image: "gh_pages", - }) - - ImageName(c) - g.Assert(c.Pipeline[0].Image).Equal("gh-pages") - }) - }) }) }