From f2c1d46f9ed461a2c5a85b676c1aab3ebb7cf6bc Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Mon, 26 Sep 2016 00:39:28 -0500 Subject: [PATCH 01/14] queue integrated with server, but not agent --- bus/bus.go | 37 ++- drone/server.go | 1 + queue/queue_impl_test.go | 183 ++++++------ router/middleware/broker.go | 52 ++++ router/router.go | 24 +- server/broker.go | 13 + server/build.go | 29 +- server/hook.go | 20 +- server/queue.go | 552 +++++++++++++++++++++--------------- server/stream.go | 212 ++++---------- yaml/label.go | 26 ++ yaml/label_test.go | 32 +++ 12 files changed, 658 insertions(+), 523 deletions(-) create mode 100644 router/middleware/broker.go create mode 100644 server/broker.go create mode 100644 yaml/label.go create mode 100644 yaml/label_test.go diff --git a/bus/bus.go b/bus/bus.go index 7bb39c534..5f858b954 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2,8 +2,6 @@ package bus //go:generate mockery -name Bus -output mock -case=underscore -import "golang.org/x/net/context" - // Bus represents an event bus implementation that // allows a publisher to broadcast Event notifications // to a list of subscribers. @@ -21,20 +19,21 @@ type Bus interface { Unsubscribe(chan *Event) } -// Publish broadcasts an event to all subscribers. -func Publish(c context.Context, event *Event) { - FromContext(c).Publish(event) -} - -// Subscribe adds the channel to the list of -// subscribers. Each subscriber in the list will -// receive broadcast events. -func Subscribe(c context.Context, eventc chan *Event) { - FromContext(c).Subscribe(eventc) -} - -// Unsubscribe removes the channel from the -// list of subscribers. -func Unsubscribe(c context.Context, eventc chan *Event) { - FromContext(c).Unsubscribe(eventc) -} +// +// // Publish broadcasts an event to all subscribers. +// func Publish(c context.Context, event *Event) { +// FromContext(c).Publish(event) +// } +// +// // Subscribe adds the channel to the list of +// // subscribers. Each subscriber in the list will +// // receive broadcast events. +// func Subscribe(c context.Context, eventc chan *Event) { +// FromContext(c).Subscribe(eventc) +// } +// +// // Unsubscribe removes the channel from the +// // list of subscribers. +// func Unsubscribe(c context.Context, eventc chan *Event) { +// FromContext(c).Unsubscribe(eventc) +// } diff --git a/drone/server.go b/drone/server.go index f930ae373..390642fbc 100644 --- a/drone/server.go +++ b/drone/server.go @@ -295,6 +295,7 @@ func server(c *cli.Context) error { middleware.Store(c), middleware.Remote(c), middleware.Agents(c), + middleware.Broker(c), ) // start the server with tls enabled diff --git a/queue/queue_impl_test.go b/queue/queue_impl_test.go index 778576232..45f38bff6 100644 --- a/queue/queue_impl_test.go +++ b/queue/queue_impl_test.go @@ -1,93 +1,94 @@ package queue -import ( - "sync" - "testing" - - . "github.com/franela/goblin" - "github.com/gin-gonic/gin" -) - -func TestBuild(t *testing.T) { - g := Goblin(t) - g.Describe("Queue", func() { - - g.It("Should publish item", func() { - c := new(gin.Context) - q := newQueue() - ToContext(c, q) - - w1 := &Work{} - w2 := &Work{} - Publish(c, w1) - Publish(c, w2) - g.Assert(len(q.items)).Equal(2) - g.Assert(len(q.itemc)).Equal(2) - }) - - g.It("Should remove item", func() { - c := new(gin.Context) - q := newQueue() - ToContext(c, q) - - w1 := &Work{} - w2 := &Work{} - w3 := &Work{} - Publish(c, w1) - Publish(c, w2) - Publish(c, w3) - Remove(c, w2) - g.Assert(len(q.items)).Equal(2) - g.Assert(len(q.itemc)).Equal(2) - - g.Assert(Pull(c)).Equal(w1) - g.Assert(Pull(c)).Equal(w3) - g.Assert(Remove(c, w2)).Equal(ErrNotFound) - }) - - g.It("Should pull item", func() { - c := new(gin.Context) - q := New() - ToContext(c, q) - - cn := new(closeNotifier) - cn.closec = make(chan bool, 1) - w1 := &Work{} - w2 := &Work{} - - Publish(c, w1) - g.Assert(Pull(c)).Equal(w1) - - Publish(c, w2) - g.Assert(PullClose(c, cn)).Equal(w2) - }) - - g.It("Should cancel pulling item", func() { - c := new(gin.Context) - q := New() - ToContext(c, q) - - cn := new(closeNotifier) - cn.closec = make(chan bool, 1) - var wg sync.WaitGroup - go func() { - wg.Add(1) - g.Assert(PullClose(c, cn) == nil).IsTrue() - wg.Done() - }() - go func() { - cn.closec <- true - }() - wg.Wait() - - }) - }) -} - -type closeNotifier struct { - closec chan bool -} - -func (c *closeNotifier) CloseNotify() <-chan bool { - return c.closec -} +// +// import ( +// "sync" +// "testing" +// +// . "github.com/franela/goblin" +// "github.com/gin-gonic/gin" +// ) +// +// func TestBuild(t *testing.T) { +// g := Goblin(t) +// g.Describe("Queue", func() { +// +// g.It("Should publish item", func() { +// c := new(gin.Context) +// q := newQueue() +// ToContext(c, q) +// +// w1 := &Work{} +// w2 := &Work{} +// Publish(c, w1) +// Publish(c, w2) +// g.Assert(len(q.items)).Equal(2) +// g.Assert(len(q.itemc)).Equal(2) +// }) +// +// g.It("Should remove item", func() { +// c := new(gin.Context) +// q := newQueue() +// ToContext(c, q) +// +// w1 := &Work{} +// w2 := &Work{} +// w3 := &Work{} +// Publish(c, w1) +// Publish(c, w2) +// Publish(c, w3) +// Remove(c, w2) +// g.Assert(len(q.items)).Equal(2) +// g.Assert(len(q.itemc)).Equal(2) +// +// g.Assert(Pull(c)).Equal(w1) +// g.Assert(Pull(c)).Equal(w3) +// g.Assert(Remove(c, w2)).Equal(ErrNotFound) +// }) +// +// g.It("Should pull item", func() { +// c := new(gin.Context) +// q := New() +// ToContext(c, q) +// +// cn := new(closeNotifier) +// cn.closec = make(chan bool, 1) +// w1 := &Work{} +// w2 := &Work{} +// +// Publish(c, w1) +// g.Assert(Pull(c)).Equal(w1) +// +// Publish(c, w2) +// g.Assert(PullClose(c, cn)).Equal(w2) +// }) +// +// g.It("Should cancel pulling item", func() { +// c := new(gin.Context) +// q := New() +// ToContext(c, q) +// +// cn := new(closeNotifier) +// cn.closec = make(chan bool, 1) +// var wg sync.WaitGroup +// go func() { +// wg.Add(1) +// g.Assert(PullClose(c, cn) == nil).IsTrue() +// wg.Done() +// }() +// go func() { +// cn.closec <- true +// }() +// wg.Wait() +// +// }) +// }) +// } +// +// type closeNotifier struct { +// closec chan bool +// } +// +// func (c *closeNotifier) CloseNotify() <-chan bool { +// return c.closec +// } diff --git a/router/middleware/broker.go b/router/middleware/broker.go new file mode 100644 index 000000000..2222df510 --- /dev/null +++ b/router/middleware/broker.go @@ -0,0 +1,52 @@ +package middleware + +import ( + "sync" + + handlers "github.com/drone/drone/server" + + "github.com/codegangsta/cli" + "github.com/drone/mq/server" + "github.com/drone/mq/stomp" + + "github.com/Sirupsen/logrus" + "github.com/gin-gonic/gin" +) + +const ( + serverKey = "broker" + clientKey = "stomp.client" // mirrored from stomp/context +) + +// Broker is a middleware function that initializes the broker +// and adds the broker client to the request context. +func Broker(cli *cli.Context) gin.HandlerFunc { + secret := cli.String("agent-secret") + if secret == "" { + logrus.Fatalf("failed to generate token from DRONE_SECRET") + } + + broker := server.NewServer( + server.WithCredentials("x-token", secret), + ) + client := broker.Client() + + var once sync.Once + return func(c *gin.Context) { + c.Set(serverKey, broker) + c.Set(clientKey, client) + once.Do(func() { + // this is some really hacky stuff + // turns out I need to do some refactoring + // don't judge! + // will fix in 0.6 release + ctx := c.Copy() + client.Connect( + stomp.WithCredentials("x-token", secret), + ) + client.Subscribe("/queue/updates", stomp.HandlerFunc(func(m *stomp.Message) { + go handlers.HandleUpdate(ctx, m.Copy()) + })) + }) + } +} diff --git a/router/router.go b/router/router.go index 912ae2736..ce3ac4936 100644 --- a/router/router.go +++ b/router/router.go @@ -113,17 +113,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { e.POST("/hook", server.PostHook) e.POST("/api/hook", server.PostHook) - stream := e.Group("/api/stream") - { - stream.Use(session.SetRepo()) - stream.Use(session.SetPerm()) - stream.Use(session.MustPull) - - stream.GET("/:owner/:name", server.GetRepoEvents) - stream.GET("/:owner/:name/:build/:number", server.GetStream) - } ws := e.Group("/ws") { + ws.GET("/broker", server.Broker) ws.GET("/feed", server.EventStream) ws.GET("/logs/:owner/:name/:build/:number", session.SetRepo(), @@ -152,20 +144,6 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { agents.GET("", server.GetAgents) } - queue := e.Group("/api/queue") - { - queue.Use(session.AuthorizeAgent) - queue.POST("/pull", server.Pull) - queue.POST("/pull/:os/:arch", server.Pull) - queue.POST("/wait/:id", server.Wait) - queue.POST("/stream/:id", server.Stream) - queue.POST("/status/:id", server.Update) - queue.POST("/ping", server.Ping) - - queue.POST("/logs/:id", server.PostLogs) - queue.GET("/logs/:id", server.WriteLogs) - } - // DELETE THESE // gitlab := e.Group("/gitlab/:owner/:name") // { diff --git a/server/broker.go b/server/broker.go new file mode 100644 index 000000000..83a97b870 --- /dev/null +++ b/server/broker.go @@ -0,0 +1,13 @@ +package server + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +// Broker handles connections to the embedded message broker. +func Broker(c *gin.Context) { + broker := c.MustGet("broker").(http.Handler) + broker.ServeHTTP(c.Writer, c.Request) +} diff --git a/server/build.go b/server/build.go index b7f805cdc..3e9d7a4ac 100644 --- a/server/build.go +++ b/server/build.go @@ -12,11 +12,13 @@ import ( "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" "github.com/drone/drone/stream" + "github.com/drone/drone/yaml" "github.com/gin-gonic/gin" "github.com/square/go-jose" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" + "github.com/drone/mq/stomp" ) func GetBuilds(c *gin.Context) { @@ -148,7 +150,14 @@ func DeleteBuild(c *gin.Context) { job.ExitCode = 137 store.UpdateBuildJob(c, build, job) - bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/cancel", bus.Event{ + Type: bus.Cancelled, + Repo: *repo, + Build: *build, + Job: *job, + }) + c.String(204, "") } @@ -293,7 +302,7 @@ func PostBuild(c *gin.Context) { last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) secs, err := store.GetMergedSecretList(c, repo) if err != nil { - log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } var signed bool @@ -318,9 +327,19 @@ func PostBuild(c *gin.Context) { log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/events", bus.Event{ + Type: bus.Enqueued, + Repo: *repo, + Build: *build, + }, + stomp.WithHeader("repo", repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)), + ) + for _, job := range jobs { - queue.Publish(c, &queue.Work{ + broker, _ := stomp.FromContext(c) + broker.SendJSON("/queue/pending", &queue.Work{ Signed: signed, Verified: verified, User: user, @@ -332,7 +351,7 @@ func PostBuild(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }) + }, stomp.WithHeaders(yaml.ParseLabel(raw))) } } diff --git a/server/hook.go b/server/hook.go index 1c93ed867..7f722c127 100644 --- a/server/hook.go +++ b/server/hook.go @@ -3,6 +3,7 @@ package server import ( "fmt" "regexp" + "strconv" "github.com/gin-gonic/gin" "github.com/square/go-jose" @@ -16,6 +17,7 @@ import ( "github.com/drone/drone/shared/token" "github.com/drone/drone/store" "github.com/drone/drone/yaml" + "github.com/drone/mq/stomp" ) var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`) @@ -208,12 +210,22 @@ func PostHook(c *gin.Context) { last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) secs, err := store.GetMergedSecretList(c, repo) if err != nil { - log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/events", bus.Event{ + Type: bus.Enqueued, + Repo: *repo, + Build: *build, + }, + stomp.WithHeader("repo", repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)), + ) + for _, job := range jobs { - queue.Publish(c, &queue.Work{ + broker, _ := stomp.FromContext(c) + broker.SendJSON("/queue/pending", &queue.Work{ Signed: build.Signed, Verified: build.Verified, User: user, @@ -225,7 +237,7 @@ func PostHook(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }) + }, stomp.WithHeaders(yaml.ParseLabel(raw))) } } diff --git a/server/queue.go b/server/queue.go index 7cdaf863f..adfbd11ff 100644 --- a/server/queue.go +++ b/server/queue.go @@ -1,80 +1,319 @@ package server import ( + "bytes" "fmt" - "io" "net/http" - "strconv" - "sync" - "time" + + "golang.org/x/net/context" "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/store" - "github.com/drone/drone/stream" - "github.com/gin-gonic/gin" + "github.com/drone/mq/stomp" "github.com/gorilla/websocket" ) -// Pull is a long request that polls and attemts to pull work off the queue stack. -func Pull(c *gin.Context) { - logrus.Debugf("Agent %s connected.", c.ClientIP()) +// +// // Pull is a long request that polls and attemts to pull work off the queue stack. +// func Pull(c *gin.Context) { +// logrus.Debugf("Agent %s connected.", c.ClientIP()) +// +// w := queue.PullClose(c, c.Writer) +// if w == nil { +// logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) +// } else { +// +// // setup the channel to stream logs +// if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { +// logrus.Errorf("Unable to create stream. %s", err) +// } +// +// c.JSON(202, w) +// +// logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", +// c.ClientIP(), +// w.Repo.Owner, +// w.Repo.Name, +// w.Build.Number, +// w.Job.Number, +// ) +// } +// } +// +// // Wait is a long request that polls and waits for cancelled build requests. +// func Wait(c *gin.Context) { +// id, err := strconv.ParseInt(c.Param("id"), 10, 64) +// if err != nil { +// c.String(500, "Invalid input. %s", err) +// return +// } +// +// eventc := make(chan *bus.Event, 1) +// +// bus.Subscribe(c, eventc) +// defer bus.Unsubscribe(c, eventc) +// +// for { +// select { +// case event := <-eventc: +// if event.Job.ID == id && event.Type == bus.Cancelled { +// c.JSON(200, event.Job) +// return +// } +// case <-c.Writer.CloseNotify(): +// return +// } +// } +// } +// +// // Update handles build updates from the agent and persists to the database. +// func Update(c *gin.Context) { +// work := &queue.Work{} +// if err := c.BindJSON(work); err != nil { +// logrus.Errorf("Invalid input. %s", err) +// return +// } +// +// // TODO(bradrydzewski) it is really annoying that we have to do this lookup +// // and I'd prefer not to. The reason we do this is because the Build and Job +// // have fields that aren't serialized to json and would be reset to their +// // empty values if we just saved what was coming in the http.Request body. +// build, err := store.GetBuild(c, work.Build.ID) +// if err != nil { +// c.String(404, "Unable to find build. %s", err) +// return +// } +// job, err := store.GetJob(c, work.Job.ID) +// if err != nil { +// c.String(404, "Unable to find job. %s", err) +// return +// } +// build.Started = work.Build.Started +// build.Finished = work.Build.Finished +// build.Status = work.Build.Status +// job.Started = work.Job.Started +// job.Finished = work.Job.Finished +// job.Status = work.Job.Status +// job.ExitCode = work.Job.ExitCode +// job.Error = work.Job.Error +// +// if build.Status == model.StatusPending { +// build.Started = work.Job.Started +// build.Status = model.StatusRunning +// store.UpdateBuild(c, build) +// } +// +// // if job.Status == model.StatusRunning { +// // err := stream.Create(c, stream.ToKey(job.ID)) +// // if err != nil { +// // logrus.Errorf("Unable to create stream. %s", err) +// // } +// // } +// +// ok, err := store.UpdateBuildJob(c, build, job) +// if err != nil { +// c.String(500, "Unable to update job. %s", err) +// return +// } +// +// if ok && build.Status != model.StatusRunning { +// // get the user because we transfer the user form the server to agent +// // and back we lose the token which does not get serialized to json. +// user, err := store.GetUser(c, work.User.ID) +// if err != nil { +// c.String(500, "Unable to find user. %s", err) +// return +// } +// remote.Status(c, user, work.Repo, build, +// fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) +// } +// +// if build.Status == model.StatusRunning { +// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) +// } else { +// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) +// } +// +// c.JSON(200, work) +// } +// +// // Stream streams the logs to disk or memory for broadcasing to listeners. Once +// // the stream is closed it is moved to permanent storage in the database. +// func Stream(c *gin.Context) { +// id, err := strconv.ParseInt(c.Param("id"), 10, 64) +// if err != nil { +// c.String(500, "Invalid input. %s", err) +// return +// } +// +// key := c.Param("id") +// logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) +// +// wc, err := stream.Writer(c, key) +// if err != nil { +// c.String(500, "Failed to create stream writer. %s", err) +// return +// } +// +// defer func() { +// wc.Close() +// stream.Delete(c, key) +// }() +// +// io.Copy(wc, c.Request.Body) +// +// rc, err := stream.Reader(c, key) +// if err != nil { +// c.String(500, "Failed to create stream reader. %s", err) +// return +// } +// +// wg := sync.WaitGroup{} +// wg.Add(1) +// +// go func() { +// defer recover() +// store.WriteLog(c, &model.Job{ID: id}, rc) +// wg.Done() +// }() +// +// wc.Close() +// wg.Wait() +// c.String(200, "") +// +// logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) +// } +// +// func Ping(c *gin.Context) { +// agent, err := store.GetAgentAddr(c, c.ClientIP()) +// if err == nil { +// agent.Updated = time.Now().Unix() +// err = store.UpdateAgent(c, agent) +// } else { +// err = store.CreateAgent(c, &model.Agent{ +// Address: c.ClientIP(), +// Platform: "linux/amd64", +// Capacity: 2, +// Created: time.Now().Unix(), +// Updated: time.Now().Unix(), +// }) +// } +// if err != nil { +// logrus.Errorf("Unable to register agent. %s", err.Error()) +// } +// c.String(200, "PONG") +// } - w := queue.PullClose(c, c.Writer) - if w == nil { - logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) - } else { +// +// +// Below are alternate implementations for the Queue that use websockets. +// +// +// +// // PostLogs handles an http request from the agent to post build logs. These +// // logs are posted at the end of the build process. +// func PostLogs(c *gin.Context) { +// id, _ := strconv.ParseInt(c.Param("id"), 10, 64) +// job, err := store.GetJob(c, id) +// if err != nil { +// c.String(404, "Cannot upload logs. %s", err) +// return +// } +// if err := store.WriteLog(c, job, c.Request.Body); err != nil { +// c.String(500, "Cannot persist logs", err) +// return +// } +// c.String(200, "") +// } +// +// // WriteLogs handles an http request from the agent to stream build logs from +// // the agent to the server to enable real time streamings to the client. +// func WriteLogs(c *gin.Context) { +// id, err := strconv.ParseInt(c.Param("id"), 10, 64) +// if err != nil { +// c.String(500, "Invalid input. %s", err) +// return +// } +// +// conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) +// if err != nil { +// c.String(500, "Cannot upgrade to websocket. %s", err) +// return +// } +// defer conn.Close() +// +// wc, err := stream.Writer(c, stream.ToKey(id)) +// if err != nil { +// c.String(500, "Cannot create stream writer. %s", err) +// return +// } +// defer func() { +// wc.Close() +// stream.Delete(c, stream.ToKey(id)) +// }() +// +// var msg []byte +// for { +// _, msg, err = conn.ReadMessage() +// if err != nil { +// break +// } +// wc.Write(msg) +// wc.Write(newline) +// } +// +// if err != nil && err != io.EOF { +// c.String(500, "Error reading logs. %s", err) +// return +// } +// // +// // rc, err := stream.Reader(c, stream.ToKey(id)) +// // if err != nil { +// // c.String(500, "Failed to create stream reader. %s", err) +// // return +// // } +// // +// // wg := sync.WaitGroup{} +// // wg.Add(1) +// // +// // go func() { +// // defer recover() +// // store.WriteLog(c, &model.Job{ID: id}, rc) +// // wg.Done() +// // }() +// // +// // wc.Close() +// // wg.Wait() +// +// } - // setup the channel to stream logs - if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { - logrus.Errorf("Unable to create stream. %s", err) - } +// newline defines a newline constant to separate lines in the build output +var newline = []byte{'\n'} - c.JSON(202, w) - - logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", - c.ClientIP(), - w.Repo.Owner, - w.Repo.Name, - w.Build.Number, - w.Job.Number, - ) - } +// upgrader defines the default behavior for upgrading the websocket. +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, } -// Wait is a long request that polls and waits for cancelled build requests. -func Wait(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - eventc := make(chan *bus.Event, 1) - - bus.Subscribe(c, eventc) - defer bus.Unsubscribe(c, eventc) - - for { - select { - case event := <-eventc: - if event.Job.ID == id && event.Type == bus.Cancelled { - c.JSON(200, event.Job) - return - } - case <-c.Writer.CloseNotify(): - return +// HandleUpdate handles build updates from the agent and persists to the database. +func HandleUpdate(c context.Context, message *stomp.Message) { + defer func() { + message.Release() + if r := recover(); r != nil { + err := r.(error) + logrus.Errorf("Panic recover: broker update handler: %s", err) } - } -} + }() -// Update handles build updates from the agent and persists to the database. -func Update(c *gin.Context) { - work := &queue.Work{} - if err := c.BindJSON(work); err != nil { + work := new(queue.Work) + if err := message.Unmarshal(work); err != nil { logrus.Errorf("Invalid input. %s", err) return } @@ -85,12 +324,12 @@ func Update(c *gin.Context) { // empty values if we just saved what was coming in the http.Request body. build, err := store.GetBuild(c, work.Build.ID) if err != nil { - c.String(404, "Unable to find build. %s", err) + logrus.Errorf("Unable to find build. %s", err) return } job, err := store.GetJob(c, work.Job.ID) if err != nil { - c.String(404, "Unable to find job. %s", err) + logrus.Errorf("Unable to find job. %s", err) return } build.Started = work.Build.Started @@ -117,189 +356,52 @@ func Update(c *gin.Context) { ok, err := store.UpdateBuildJob(c, build, job) if err != nil { - c.String(500, "Unable to update job. %s", err) + logrus.Errorf("Unable to update job. %s", err) return } if ok && build.Status != model.StatusRunning { // get the user because we transfer the user form the server to agent // and back we lose the token which does not get serialized to json. - user, err := store.GetUser(c, work.User.ID) - if err != nil { - c.String(500, "Unable to find user. %s", err) + user, uerr := store.GetUser(c, work.User.ID) + if uerr != nil { + logrus.Errorf("Unable to find user. %s", err) return } remote.Status(c, user, work.Repo, build, fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) } - if build.Status == model.StatusRunning { - bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) - } else { - bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) - } + var buf bytes.Buffer + var sub []byte - c.JSON(200, work) -} - -// Stream streams the logs to disk or memory for broadcasing to listeners. Once -// the stream is closed it is moved to permanent storage in the database. -func Stream(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - key := c.Param("id") - logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) - - wc, err := stream.Writer(c, key) - if err != nil { - c.String(500, "Failed to create stream writer. %s", err) - return - } - - defer func() { - wc.Close() - stream.Delete(c, key) - }() - - io.Copy(wc, c.Request.Body) - - rc, err := stream.Reader(c, key) - if err != nil { - c.String(500, "Failed to create stream reader. %s", err) - return - } - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer recover() - store.WriteLog(c, &model.Job{ID: id}, rc) - wg.Done() - }() - - wc.Close() - wg.Wait() - c.String(200, "") - - logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) -} - -func Ping(c *gin.Context) { - agent, err := store.GetAgentAddr(c, c.ClientIP()) - if err == nil { - agent.Updated = time.Now().Unix() - err = store.UpdateAgent(c, agent) - } else { - err = store.CreateAgent(c, &model.Agent{ - Address: c.ClientIP(), - Platform: "linux/amd64", - Capacity: 2, - Created: time.Now().Unix(), - Updated: time.Now().Unix(), - }) - } - if err != nil { - logrus.Errorf("Unable to register agent. %s", err.Error()) - } - c.String(200, "PONG") -} - -// -// -// Below are alternate implementations for the Queue that use websockets. -// -// - -// PostLogs handles an http request from the agent to post build logs. These -// logs are posted at the end of the build process. -func PostLogs(c *gin.Context) { - id, _ := strconv.ParseInt(c.Param("id"), 10, 64) - job, err := store.GetJob(c, id) - if err != nil { - c.String(404, "Cannot upload logs. %s", err) - return - } - if err := store.WriteLog(c, job, c.Request.Body); err != nil { - c.String(500, "Cannot persist logs", err) - return - } - c.String(200, "") -} - -// WriteLogs handles an http request from the agent to stream build logs from -// the agent to the server to enable real time streamings to the client. -func WriteLogs(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - c.String(500, "Cannot upgrade to websocket. %s", err) - return - } - defer conn.Close() - - wc, err := stream.Writer(c, stream.ToKey(id)) - if err != nil { - c.String(500, "Cannot create stream writer. %s", err) - return - } - defer func() { - wc.Close() - stream.Delete(c, stream.ToKey(id)) - }() - - var msg []byte - for { - _, msg, err = conn.ReadMessage() - if err != nil { - break + done := make(chan bool) + dest := fmt.Sprintf("/topic/%d", job.ID) + client, _ := stomp.FromContext(c) + sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { + if len(m.Header.Get([]byte("eof"))) != 0 { + done <- true } - wc.Write(msg) - wc.Write(newline) - } - - if err != nil && err != io.EOF { - c.String(500, "Error reading logs. %s", err) + buf.Write(m.Body) + buf.WriteByte('\n') + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) return } - // - // rc, err := stream.Reader(c, stream.ToKey(id)) - // if err != nil { - // c.String(500, "Failed to create stream reader. %s", err) - // return + <-done + + if err := store.WriteLog(c, job, &buf); err != nil { + logrus.Errorf("Unable to write logs to store. %s", err) + return + } + // if build.Status == model.StatusRunning { + // bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) + // } else { + // bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) // } - // - // wg := sync.WaitGroup{} - // wg.Add(1) - // - // go func() { - // defer recover() - // store.WriteLog(c, &model.Job{ID: id}, rc) - // wg.Done() - // }() - // - // wc.Close() - // wg.Wait() -} - -// newline defines a newline constant to separate lines in the build output -var newline = []byte{'\n'} - -// upgrader defines the default behavior for upgrading the websocket. -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, + client.Unsubscribe(sub) + client.Send(dest, []byte{}, stomp.WithRetain("remove")) } diff --git a/server/stream.go b/server/stream.go index b80bbc73a..4c8786f75 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,121 +1,21 @@ package server import ( - "bufio" - "encoding/json" - "io" + "fmt" "strconv" "time" - "github.com/drone/drone/bus" "github.com/drone/drone/cache" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" "github.com/drone/drone/store" - "github.com/drone/drone/stream" + "github.com/drone/mq/stomp" "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" - "github.com/manucorporat/sse" ) -// GetRepoEvents will upgrade the connection to a Websocket and will stream -// event updates to the browser. -func GetRepoEvents(c *gin.Context) { - repo := session.Repo(c) - c.Writer.Header().Set("Content-Type", "text/event-stream") - - eventc := make(chan *bus.Event, 1) - bus.Subscribe(c, eventc) - defer func() { - bus.Unsubscribe(c, eventc) - close(eventc) - logrus.Infof("closed event stream") - }() - - c.Stream(func(w io.Writer) bool { - select { - case event := <-eventc: - if event == nil { - logrus.Infof("nil event received") - return false - } - - // TODO(bradrydzewski) This is a super hacky workaround until we improve - // the actual bus. Having a per-call database event is just plain stupid. - if event.Repo.FullName == repo.FullName { - - var payload = struct { - model.Build - Jobs []*model.Job `json:"jobs"` - }{} - payload.Build = event.Build - payload.Jobs, _ = store.GetJobList(c, &event.Build) - data, _ := json.Marshal(&payload) - - sse.Encode(w, sse.Event{ - Event: "message", - Data: string(data), - }) - } - case <-c.Writer.CloseNotify(): - return false - } - return true - }) -} - -func GetStream(c *gin.Context) { - - repo := session.Repo(c) - buildn, _ := strconv.Atoi(c.Param("build")) - jobn, _ := strconv.Atoi(c.Param("number")) - - c.Writer.Header().Set("Content-Type", "text/event-stream") - - build, err := store.GetBuildNumber(c, repo, buildn) - if err != nil { - logrus.Debugln("stream cannot get build number.", err) - c.AbortWithError(404, err) - return - } - job, err := store.GetJobNumber(c, build, jobn) - if err != nil { - logrus.Debugln("stream cannot get job number.", err) - c.AbortWithError(404, err) - return - } - - rc, err := stream.Reader(c, stream.ToKey(job.ID)) - if err != nil { - c.AbortWithError(404, err) - return - } - - go func() { - <-c.Writer.CloseNotify() - rc.Close() - }() - - var line int - var scanner = bufio.NewScanner(rc) - for scanner.Scan() { - line++ - var err = sse.Encode(c.Writer, sse.Event{ - Id: strconv.Itoa(line), - Event: "message", - Data: scanner.Text(), - }) - if err != nil { - break - } - c.Writer.Flush() - } - - logrus.Debugf("Closed stream %s#%d", repo.FullName, build.Number) -} - var ( // Time allowed to write the file to the client. writeWait = 5 * time.Second @@ -165,47 +65,35 @@ func LogStream(c *gin.Context) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() - rc, err := stream.Reader(c, stream.ToKey(job.ID)) - if err != nil { - c.AbortWithError(404, err) - return - } - - quitc := make(chan bool) - defer func() { - quitc <- true - close(quitc) - rc.Close() - ws.Close() - logrus.Debug("Successfully closed websocket") - }() - - go func() { - defer func() { - recover() - }() - for { - select { - case <-quitc: - return - case <-ticker.C: - err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) - if err != nil { - return - } - } - } - }() - - var scanner = bufio.NewScanner(rc) - var b []byte - for scanner.Scan() { - b = scanner.Bytes() - if len(b) == 0 { - continue + done := make(chan bool) + dest := fmt.Sprintf("/topic/%d", job.ID) + client, _ := stomp.FromContext(c) + sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { + if len(m.Header.Get([]byte("eof"))) != 0 { + done <- true } ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteMessage(websocket.TextMessage, b) + ws.WriteMessage(websocket.TextMessage, m.Body) + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) + return + } + defer func() { + client.Unsubscribe(sub) + }() + + for { + select { + case <-done: + return + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) + if err != nil { + return + } + } } } @@ -227,20 +115,34 @@ func EventStream(c *gin.Context) { repo, _ = cache.GetRepoMap(c, user) } - ticker := time.NewTicker(pingPeriod) + eventc := make(chan []byte, 10) quitc := make(chan bool) - eventc := make(chan *bus.Event, 10) - bus.Subscribe(c, eventc) + tick := time.NewTicker(pingPeriod) defer func() { - ticker.Stop() - bus.Unsubscribe(c, eventc) - quitc <- true - close(quitc) - close(eventc) + tick.Stop() ws.Close() logrus.Debug("Successfully closed websocket") }() + client := stomp.MustFromContext(c) + sub, err := client.Subscribe("/topic/events", stomp.HandlerFunc(func(m *stomp.Message) { + name := m.Header.GetString("repo") + priv := m.Header.GetBool("private") + if repo[name] || !priv { + eventc <- m.Body + } + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) + return + } + defer func() { + close(quitc) + close(eventc) + client.Unsubscribe(sub) + }() + go func() { defer func() { recover() @@ -249,15 +151,13 @@ func EventStream(c *gin.Context) { select { case <-quitc: return - case event := <-eventc: - if event == nil { + case event, ok := <-eventc: + if !ok { return } - if repo[event.Repo.FullName] || !event.Repo.IsPrivate { - ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteJSON(event) - } - case <-ticker.C: + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, event) + case <-tick.C: err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) if err != nil { return diff --git a/yaml/label.go b/yaml/label.go new file mode 100644 index 000000000..445a9b6b4 --- /dev/null +++ b/yaml/label.go @@ -0,0 +1,26 @@ +package yaml + +import ( + "gopkg.in/yaml.v2" + + "github.com/drone/drone/yaml/types" +) + +// ParseLabel parses the labels section of the Yaml document. +func ParseLabel(in []byte) map[string]string { + out := struct { + Labels types.MapEqualSlice `yaml:"labels"` + }{} + + yaml.Unmarshal(in, &out) + labels := out.Labels.Map() + if labels == nil { + labels = make(map[string]string) + } + return labels +} + +// ParseLabelString parses the labels section of the Yaml document. +func ParseLabelString(in string) map[string]string { + return ParseLabel([]byte(in)) +} diff --git a/yaml/label_test.go b/yaml/label_test.go new file mode 100644 index 000000000..3032f33ed --- /dev/null +++ b/yaml/label_test.go @@ -0,0 +1,32 @@ +package yaml + +import ( + "testing" + + "github.com/franela/goblin" +) + +func TestLabel(t *testing.T) { + + g := goblin.Goblin(t) + g.Describe("Label parser", func() { + + g.It("Should parse empty yaml", func() { + labels := ParseLabelString("") + g.Assert(len(labels)).Equal(0) + }) + + g.It("Should parse slice", func() { + labels := ParseLabelString("labels: [foo=bar, baz=boo]") + g.Assert(len(labels)).Equal(2) + g.Assert(labels["foo"]).Equal("bar") + g.Assert(labels["baz"]).Equal("boo") + }) + + g.It("Should parse map", func() { + labels := ParseLabelString("labels: {foo: bar, baz: boo}") + g.Assert(labels["foo"]).Equal("bar") + g.Assert(labels["baz"]).Equal("boo") + }) + }) +} From 0b2f1c8e51bde798fba623b503c0fa5e224260eb Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Mon, 26 Sep 2016 03:29:05 -0500 Subject: [PATCH 02/14] integrates drone/mq into agent --- agent/updater.go | 78 +++++++++++------------------- client/client.go | 24 ---------- client/client_impl.go | 108 ------------------------------------------ drone/agent/agent.go | 102 ++++++++++++++++++++++----------------- drone/agent/exec.go | 71 ++++++++++++--------------- server/queue.go | 36 ++++++++++---- server/stream.go | 7 +-- 7 files changed, 145 insertions(+), 281 deletions(-) diff --git a/agent/updater.go b/agent/updater.go index 15922e0e2..ba5c1aa4d 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -1,16 +1,13 @@ package agent import ( - "encoding/json" "fmt" - "io" - "sync" - "time" "github.com/Sirupsen/logrus" "github.com/drone/drone/build" - "github.com/drone/drone/client" + "github.com/drone/drone/model" "github.com/drone/drone/queue" + "github.com/drone/mq/stomp" ) // UpdateFunc handles buid pipeline status updates. @@ -27,65 +24,44 @@ var TermLoggerFunc = func(line *build.Line) { // NewClientUpdater returns an updater that sends updated build details // to the drone server. -func NewClientUpdater(client client.Client) UpdateFunc { +func NewClientUpdater(client *stomp.Client) UpdateFunc { return func(w *queue.Work) { - for { - err := client.Push(w) - if err == nil { - return - } - logrus.Errorf("Error updating %s/%s#%d.%d. Retry in 30s. %s", + err := client.SendJSON("/queue/updates", w) + if err != nil { + logrus.Errorf("Error updating %s/%s#%d.%d. %s", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) - logrus.Infof("Retry update in 30s") - time.Sleep(time.Second * 30) + } + if w.Job.Status != model.StatusRunning { + var dest = fmt.Sprintf("/topic/logs.%d", w.Job.ID) + var opts = []stomp.MessageOption{ + stomp.WithHeader("eof", "true"), + stomp.WithRetain("all"), + } + + if err := client.Send(dest, []byte("eof"), opts...); err != nil { + logrus.Errorf("Error sending eof %s/%s#%d.%d. %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) + } } } } -func NewStreamLogger(stream client.StreamWriter, w io.Writer, limit int64) LoggerFunc { - var err error - var size int64 - return func(line *build.Line) { +func NewClientLogger(client *stomp.Client, id int64, limit int64) LoggerFunc { + var size int64 + var dest = fmt.Sprintf("/topic/logs.%d", id) + var opts = []stomp.MessageOption{ + stomp.WithRetain("all"), + } + + return func(line *build.Line) { if size > limit { return } - - // TODO remove this double-serialization - linejson, _ := json.Marshal(line) - w.Write(linejson) - w.Write([]byte{'\n'}) - - if err = stream.WriteJSON(line); err != nil { + if err := client.SendJSON(dest, line, opts...); err != nil { logrus.Errorf("Error streaming build logs. %s", err) } size += int64(len(line.Out)) } } - -func NewClientLogger(client client.Client, id int64, rc io.ReadCloser, wc io.WriteCloser, limit int64) LoggerFunc { - var once sync.Once - var size int64 - return func(line *build.Line) { - // annoying hack to only start streaming once the first line is written - once.Do(func() { - go func() { - err := client.Stream(id, rc) - if err != nil && err != io.ErrClosedPipe { - logrus.Errorf("Error streaming build logs. %s", err) - } - }() - }) - - if size > limit { - return - } - - linejson, _ := json.Marshal(line) - wc.Write(linejson) - wc.Write([]byte{'\n'}) - - size += int64(len(line.Out)) - } -} diff --git a/client/client.go b/client/client.go index 8a423aad2..2d849cb7e 100644 --- a/client/client.go +++ b/client/client.go @@ -4,7 +4,6 @@ import ( "io" "github.com/drone/drone/model" - "github.com/drone/drone/queue" ) // Client is used to communicate with a Drone server. @@ -103,27 +102,4 @@ type Client interface { // AgentList returns a list of build agents. AgentList() ([]*model.Agent, error) - - // - // below items for Queue (internal use only) - // - - // Pull pulls work from the server queue. - Pull(os, arch string) (*queue.Work, error) - - // Push pushes an update to the server. - Push(*queue.Work) error - - // Stream streams the build logs to the server. - Stream(int64, io.ReadCloser) error - - LogStream(int64) (StreamWriter, error) - - LogPost(int64, io.ReadCloser) error - - // Wait waits for the job to the complete. - Wait(int64) *Wait - - // Ping the server - Ping() error } diff --git a/client/client_impl.go b/client/client_impl.go index ca661a88e..d877c4665 100644 --- a/client/client_impl.go +++ b/client/client_impl.go @@ -12,10 +12,6 @@ import ( "strconv" "github.com/drone/drone/model" - "github.com/drone/drone/queue" - "github.com/gorilla/websocket" - "golang.org/x/net/context" - "golang.org/x/net/context/ctxhttp" "golang.org/x/oauth2" ) @@ -323,110 +319,6 @@ func (c *client) AgentList() ([]*model.Agent, error) { return out, err } -// -// below items for Queue (internal use only) -// - -// Pull pulls work from the server queue. -func (c *client) Pull(os, arch string) (*queue.Work, error) { - out := new(queue.Work) - uri := fmt.Sprintf(pathPull, c.base, os, arch) - err := c.post(uri, nil, out) - return out, err -} - -// Push pushes an update to the server. -func (c *client) Push(p *queue.Work) error { - uri := fmt.Sprintf(pathPush, c.base, p.Job.ID) - err := c.post(uri, p, nil) - return err -} - -// Ping pings the server. -func (c *client) Ping() error { - uri := fmt.Sprintf(pathPing, c.base) - err := c.post(uri, nil, nil) - return err -} - -// Stream streams the build logs to the server. -func (c *client) Stream(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathStream, c.base, id) - err := c.post(uri, rc, nil) - - return err -} - -// LogPost sends the full build logs to the server. -func (c *client) LogPost(id int64, rc io.ReadCloser) error { - uri := fmt.Sprintf(pathLogs, c.base, id) - return c.post(uri, rc, nil) -} - -// StreamWriter implements a special writer for streaming log entries to the -// central Drone server. The standard implementation is the gorilla.Connection. -type StreamWriter interface { - Close() error - WriteJSON(interface{}) error -} - -// LogStream streams the build logs to the server. -func (c *client) LogStream(id int64) (StreamWriter, error) { - rawurl := fmt.Sprintf(pathLogsAuth, c.base, id, c.token) - uri, err := url.Parse(rawurl) - if err != nil { - return nil, err - } - if uri.Scheme == "https" { - uri.Scheme = "wss" - } else { - uri.Scheme = "ws" - } - - // TODO need TLS client configuration - - conn, _, err := websocket.DefaultDialer.Dial(uri.String(), nil) - return conn, err -} - -// Wait watches and waits for the build to cancel or finish. -func (c *client) Wait(id int64) *Wait { - ctx, cancel := context.WithCancel(context.Background()) - return &Wait{id, c, ctx, cancel} -} - -type Wait struct { - id int64 - client *client - - ctx context.Context - cancel context.CancelFunc -} - -func (w *Wait) Done() (*model.Job, error) { - uri := fmt.Sprintf(pathWait, w.client.base, w.id) - req, err := w.client.createRequest(uri, "POST", nil) - if err != nil { - return nil, err - } - - res, err := ctxhttp.Do(w.ctx, w.client.client, req) - if err != nil { - return nil, err - } - defer res.Body.Close() - job := &model.Job{} - err = json.NewDecoder(res.Body).Decode(&job) - if err != nil { - return nil, err - } - return job, nil -} - -func (w *Wait) Cancel() { - w.cancel() -} - // // http request helper functions // diff --git a/drone/agent/agent.go b/drone/agent/agent.go index 48f1657fa..df0059521 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -7,13 +7,14 @@ import ( "syscall" "time" - "github.com/drone/drone/client" - "github.com/drone/drone/shared/token" + "github.com/drone/drone/queue" + "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" + "strings" + "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" - "strings" ) // AgentCmd is the exported command for starting the drone agent. @@ -67,7 +68,7 @@ var AgentCmd = cli.Command{ EnvVar: "DRONE_SERVER", Name: "drone-server", Usage: "drone server address", - Value: "http://localhost:8000", + Value: "ws://localhost:8000/ws/broker", }, cli.StringFlag{ EnvVar: "DRONE_TOKEN", @@ -146,8 +147,9 @@ func start(c *cli.Context) { var accessToken string if c.String("drone-secret") != "" { - secretToken := c.String("drone-secret") - accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken) + // secretToken := c.String("drone-secret") + accessToken = c.String("drone-secret") + // accessToken, _ = token.New(token.AgentToken, "").Sign(secretToken) } else { accessToken = c.String("drone-token") } @@ -157,10 +159,17 @@ func start(c *cli.Context) { accessToken, ) - client := client.NewClientToken( - strings.TrimRight(c.String("drone-server"), "/"), - accessToken, - ) + server := strings.TrimRight(c.String("drone-server"), "/") + client, err := stomp.Dial(server) + if err != nil { + logrus.Fatalf("Cannot connect to host %s. %s", server, err) + } + opts := []stomp.MessageOption{ + stomp.WithCredentials("x-token", accessToken), + } + if err = client.Connect(opts...); err != nil { + logrus.Fatalf("Cannot connect to host %s. %s", server, err) + } tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) if err == nil { @@ -171,42 +180,49 @@ func start(c *cli.Context) { logrus.Fatal(err) } - go func() { - for { - if err := client.Ping(); err != nil { - logrus.Warnf("unable to ping the server. %s", err.Error()) - } - time.Sleep(c.Duration("ping")) - } - }() - - var wg sync.WaitGroup - for i := 0; i < c.Int("docker-max-procs"); i++ { - wg.Add(1) - go func() { - r := pipeline{ - drone: client, - docker: docker, - config: config{ - platform: c.String("docker-os") + "/" + c.String("docker-arch"), - timeout: c.Duration("timeout"), - namespace: c.String("namespace"), - privileged: c.StringSlice("privileged"), - pull: c.BoolT("pull"), - logs: int64(c.Int("max-log-size")) * 1000000, - }, - } - for { - if err := r.run(); err != nil { - dur := c.Duration("backoff") - logrus.Warnf("reconnect in %v. %s", dur, err.Error()) - time.Sleep(dur) - } - } + handler := func(m *stomp.Message) { + running.Add(1) + defer func() { + running.Done() + client.Ack(m.Ack) }() + + r := pipeline{ + drone: client, + docker: docker, + config: config{ + platform: c.String("docker-os") + "/" + c.String("docker-arch"), + timeout: c.Duration("timeout"), + namespace: c.String("namespace"), + privileged: c.StringSlice("privileged"), + pull: c.BoolT("pull"), + logs: int64(c.Int("max-log-size")) * 1000000, + }, + } + + work := new(queue.Work) + m.Unmarshal(work) + r.run(work) + } + + _, err = client.Subscribe("/queue/pending", stomp.HandlerFunc(handler), + stomp.WithAck("client"), + stomp.WithPrefetch( + c.Int("docker-max-procs"), + ), + // stomp.WithSelector( + // fmt.Sprintf("platorm == '%s/%s'", + // c.String("drone-os"), + // c.String("drone-arch"), + // ), + // ), + ) + if err != nil { + logrus.Fatalf("Unable to connect to queue. %s", err) } handleSignals() - wg.Wait() + + <-client.Done() } // tracks running builds diff --git a/drone/agent/exec.go b/drone/agent/exec.go index bd5381141..049485256 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -1,14 +1,13 @@ package agent import ( - "bytes" - "io/ioutil" "time" "github.com/Sirupsen/logrus" "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" - "github.com/drone/drone/client" + "github.com/drone/drone/queue" + "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" ) @@ -23,20 +22,16 @@ type config struct { } type pipeline struct { - drone client.Client + drone *stomp.Client docker dockerclient.Client config config } -func (r *pipeline) run() error { - w, err := r.drone.Pull("linux", "amd64") - if err != nil { - return err - } - running.Add(1) - defer func() { - running.Done() - }() +func (r *pipeline) run(w *queue.Work) { + + // defer func() { + // // r.drone.Ack(id, opts) + // }() logrus.Infof("Starting build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) @@ -44,24 +39,9 @@ func (r *pipeline) run() error { cancel := make(chan bool, 1) engine := docker.NewClient(r.docker) - // streaming the logs - // rc, wc := io.Pipe() - // defer func() { - // wc.Close() - // rc.Close() - // }() - - var buf bytes.Buffer - - stream, err := r.drone.LogStream(w.Job.ID) - if err != nil { - return err - } - a := agent.Agent{ - Update: agent.NewClientUpdater(r.drone), - // Logger: agent.NewClientLogger(r.drone, w.Job.ID, rc, wc, r.config.logs), - Logger: agent.NewStreamLogger(stream, &buf, r.config.logs), + Update: agent.NewClientUpdater(r.drone), + Logger: agent.NewClientLogger(r.drone, w.Job.ID, r.config.logs), Engine: engine, Timeout: r.config.timeout, Platform: r.config.platform, @@ -70,27 +50,34 @@ func (r *pipeline) run() error { Pull: r.config.pull, } - // signal for canceling the build. - wait := r.drone.Wait(w.Job.ID) - defer wait.Cancel() - go func() { - if _, err := wait.Done(); err == nil { + cancelFunc := func(m *stomp.Message) { + defer m.Release() + + id := m.Header.GetInt64("job-id") + if id == w.Job.ID { cancel <- true logrus.Infof("Cancel build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) } + } + + // signal for canceling the build. + sub, err := r.drone.Subscribe("/topic/cancels", stomp.HandlerFunc(cancelFunc)) + if err != nil { + logrus.Errorf("Error subscribing to /topic/cancels. %s", err) + } + defer func() { + r.drone.Unsubscribe(sub) }() a.Run(w, cancel) - if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { - logrus.Errorf("Error sending logs for %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - } - stream.Close() + // if err := r.drone.LogPost(w.Job.ID, ioutil.NopCloser(&buf)); err != nil { + // logrus.Errorf("Error sending logs for %s/%s#%d.%d", + // w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + // } + // stream.Close() logrus.Infof("Finished build %s/%s#%d.%d", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - - return nil } diff --git a/server/queue.go b/server/queue.go index adfbd11ff..23acf8111 100644 --- a/server/queue.go +++ b/server/queue.go @@ -4,10 +4,12 @@ import ( "bytes" "fmt" "net/http" + "strconv" "golang.org/x/net/context" "github.com/Sirupsen/logrus" + "github.com/drone/drone/bus" "github.com/drone/drone/model" "github.com/drone/drone/queue" "github.com/drone/drone/remote" @@ -360,7 +362,7 @@ func HandleUpdate(c context.Context, message *stomp.Message) { return } - if ok && build.Status != model.StatusRunning { + if ok { // get the user because we transfer the user form the server to agent // and back we lose the token which does not get serialized to json. user, uerr := store.GetUser(c, work.User.ID) @@ -372,20 +374,39 @@ func HandleUpdate(c context.Context, message *stomp.Message) { fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) } + client := stomp.MustFromContext(c) + err = client.SendJSON("/topic/events", bus.Event{ + Type: bus.Started, + Repo: *work.Repo, + Build: *build, + Job: *job, + }, + stomp.WithHeader("repo", work.Repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(work.Repo.IsPrivate)), + ) + if err != nil { + logrus.Errorf("Unable to publish to /topic/events. %s", err) + } + + if job.Status == model.StatusRunning { + return + } + var buf bytes.Buffer var sub []byte done := make(chan bool) - dest := fmt.Sprintf("/topic/%d", job.ID) - client, _ := stomp.FromContext(c) + dest := fmt.Sprintf("/topic/logs.%d", job.ID) sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { - if len(m.Header.Get([]byte("eof"))) != 0 { + defer m.Release() + if m.Header.GetBool("eof") { done <- true + return } buf.Write(m.Body) buf.WriteByte('\n') - m.Release() })) + if err != nil { logrus.Errorf("Unable to read logs from broker. %s", err) return @@ -396,11 +417,6 @@ func HandleUpdate(c context.Context, message *stomp.Message) { logrus.Errorf("Unable to write logs to store. %s", err) return } - // if build.Status == model.StatusRunning { - // bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) - // } else { - // bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) - // } client.Unsubscribe(sub) client.Send(dest, []byte{}, stomp.WithRetain("remove")) diff --git a/server/stream.go b/server/stream.go index 4c8786f75..9c88ebed9 100644 --- a/server/stream.go +++ b/server/stream.go @@ -66,15 +66,16 @@ func LogStream(c *gin.Context) { defer ticker.Stop() done := make(chan bool) - dest := fmt.Sprintf("/topic/%d", job.ID) + dest := fmt.Sprintf("/topic/logs.%d", job.ID) client, _ := stomp.FromContext(c) sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { - if len(m.Header.Get([]byte("eof"))) != 0 { + defer m.Release() + if m.Header.GetBool("eof") { done <- true + return } ws.SetWriteDeadline(time.Now().Add(writeWait)) ws.WriteMessage(websocket.TextMessage, m.Body) - m.Release() })) if err != nil { logrus.Errorf("Unable to read logs from broker. %s", err) From 6f44450ef8aa626eb567cff5a9b5c36025975dd9 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 27 Sep 2016 19:33:13 -0500 Subject: [PATCH 03/14] initial sort of working mq integration --- drone/agent/agent.go | 69 +++++++++++++++++++++++++------------------- drone/agent/exec.go | 4 +-- drone/server.go | 2 +- server/build.go | 2 +- server/queue.go | 17 ++++++++--- server/stream.go | 15 ++++++---- 6 files changed, 67 insertions(+), 42 deletions(-) diff --git a/drone/agent/agent.go b/drone/agent/agent.go index df0059521..7c35808fa 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -3,18 +3,17 @@ package agent import ( "os" "os/signal" + "strings" "sync" "syscall" "time" "github.com/drone/drone/queue" "github.com/drone/mq/stomp" - "github.com/samalba/dockerclient" - - "strings" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/samalba/dockerclient" ) // AgentCmd is the exported command for starting the drone agent. @@ -160,16 +159,6 @@ func start(c *cli.Context) { ) server := strings.TrimRight(c.String("drone-server"), "/") - client, err := stomp.Dial(server) - if err != nil { - logrus.Fatalf("Cannot connect to host %s. %s", server, err) - } - opts := []stomp.MessageOption{ - stomp.WithCredentials("x-token", accessToken), - } - if err = client.Connect(opts...); err != nil { - logrus.Fatalf("Cannot connect to host %s. %s", server, err) - } tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) if err == nil { @@ -180,6 +169,8 @@ func start(c *cli.Context) { logrus.Fatal(err) } + var client *stomp.Client + handler := func(m *stomp.Message) { running.Add(1) defer func() { @@ -205,24 +196,44 @@ func start(c *cli.Context) { r.run(work) } - _, err = client.Subscribe("/queue/pending", stomp.HandlerFunc(handler), - stomp.WithAck("client"), - stomp.WithPrefetch( - c.Int("docker-max-procs"), - ), - // stomp.WithSelector( - // fmt.Sprintf("platorm == '%s/%s'", - // c.String("drone-os"), - // c.String("drone-arch"), - // ), - // ), - ) - if err != nil { - logrus.Fatalf("Unable to connect to queue. %s", err) - } handleSignals() - <-client.Done() + backoff := c.Duration("backoff") + + for { + // dial the drone server to establish a TCP connection. + client, err = stomp.Dial(server) + if err != nil { + logrus.Errorf("Failed to establish server connection, %s, retry in %v", err, backoff) + <-time.After(backoff) + continue + } + opts := []stomp.MessageOption{ + stomp.WithCredentials("x-token", accessToken), + } + + // initialize the stomp session and authenticate. + if err = client.Connect(opts...); err != nil { + logrus.Errorf("Failed to establish server session, %s, retry in %v", err, backoff) + <-time.After(backoff) + continue + } + + // subscribe to the pending build queue. + client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) { + go handler(m) // HACK until we a channel based Subscribe implementation + }), + stomp.WithAck("client"), + stomp.WithPrefetch( + c.Int("docker-max-procs"), + ), + ) + + logrus.Infof("Server connection establish, ready to process builds.") + <-client.Done() + + logrus.Warnf("Server connection interrupted, attempting to reconnect.") + } } // tracks running builds diff --git a/drone/agent/exec.go b/drone/agent/exec.go index 049485256..269a6754f 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -62,9 +62,9 @@ func (r *pipeline) run(w *queue.Work) { } // signal for canceling the build. - sub, err := r.drone.Subscribe("/topic/cancels", stomp.HandlerFunc(cancelFunc)) + sub, err := r.drone.Subscribe("/topic/cancel", stomp.HandlerFunc(cancelFunc)) if err != nil { - logrus.Errorf("Error subscribing to /topic/cancels. %s", err) + logrus.Errorf("Error subscribing to /topic/cancel. %s", err) } defer func() { r.drone.Unsubscribe(sub) diff --git a/drone/server.go b/drone/server.go index 390642fbc..94c7cfe01 100644 --- a/drone/server.go +++ b/drone/server.go @@ -6,10 +6,10 @@ import ( "github.com/drone/drone/router" "github.com/drone/drone/router/middleware" - "github.com/gin-gonic/contrib/ginrus" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/gin-gonic/contrib/ginrus" ) var serverCmd = cli.Command{ diff --git a/server/build.go b/server/build.go index 3e9d7a4ac..3fdc81960 100644 --- a/server/build.go +++ b/server/build.go @@ -156,7 +156,7 @@ func DeleteBuild(c *gin.Context) { Repo: *repo, Build: *build, Job: *job, - }) + }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10))) c.String(204, "") } diff --git a/server/queue.go b/server/queue.go index 23acf8111..d9ca743c7 100644 --- a/server/queue.go +++ b/server/queue.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strconv" + "time" "golang.org/x/net/context" @@ -411,13 +412,21 @@ func HandleUpdate(c context.Context, message *stomp.Message) { logrus.Errorf("Unable to read logs from broker. %s", err) return } - <-done + + defer func() { + client.Unsubscribe(sub) + client.Send(dest, []byte{}, stomp.WithRetain("remove")) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + logrus.Errorf("Unable to read logs from broker. Timeout. %s", err) + return + } if err := store.WriteLog(c, job, &buf); err != nil { logrus.Errorf("Unable to write logs to store. %s", err) return } - - client.Unsubscribe(sub) - client.Send(dest, []byte{}, stomp.WithRetain("remove")) } diff --git a/server/stream.go b/server/stream.go index 9c88ebed9..d4a626a48 100644 --- a/server/stream.go +++ b/server/stream.go @@ -65,17 +65,17 @@ func LogStream(c *gin.Context) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() + logs := make(chan []byte) done := make(chan bool) dest := fmt.Sprintf("/topic/logs.%d", job.ID) client, _ := stomp.FromContext(c) sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { - defer m.Release() if m.Header.GetBool("eof") { done <- true - return + } else { + logs <- m.Body } - ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteMessage(websocket.TextMessage, m.Body) + m.Release() })) if err != nil { logrus.Errorf("Unable to read logs from broker. %s", err) @@ -83,10 +83,15 @@ func LogStream(c *gin.Context) { } defer func() { client.Unsubscribe(sub) + close(done) + close(logs) }() for { select { + case buf := <-logs: + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, buf) case <-done: return case <-ticker.C: @@ -139,9 +144,9 @@ func EventStream(c *gin.Context) { return } defer func() { + client.Unsubscribe(sub) close(quitc) close(eventc) - client.Unsubscribe(sub) }() go func() { From 584ec88b076d5f662584ef54de0d89eb789ae40e Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 27 Sep 2016 20:16:46 -0500 Subject: [PATCH 04/14] add platform field and selector capabilities --- drone/agent/agent.go | 26 +++++++++++++++----------- server/build.go | 10 +++++++++- server/hook.go | 10 +++++++++- yaml/platform.go | 26 ++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 13 deletions(-) create mode 100644 yaml/platform.go diff --git a/drone/agent/agent.go b/drone/agent/agent.go index 7c35808fa..ec94eada4 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -57,12 +57,6 @@ var AgentCmd = cli.Command{ Usage: "docker architecture system", Value: "amd64", }, - cli.StringFlag{ - EnvVar: "DRONE_STORAGE_DRIVER", - Name: "drone-storage-driver", - Usage: "docker storage driver", - Value: "overlay", - }, cli.StringFlag{ EnvVar: "DRONE_SERVER", Name: "drone-server", @@ -102,6 +96,11 @@ var AgentCmd = cli.Command{ Usage: "drone timeout due to log inactivity", Value: time.Minute * 5, }, + cli.StringFlag{ + EnvVar: "DRONE_FILTER", + Name: "filter", + Usage: "filter jobs processed by this agent", + }, cli.IntFlag{ EnvVar: "DRONE_MAX_LOGS", Name: "max-log-size", @@ -219,15 +218,20 @@ func start(c *cli.Context) { continue } - // subscribe to the pending build queue. - client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) { - go handler(m) // HACK until we a channel based Subscribe implementation - }), + opts = []stomp.MessageOption{ stomp.WithAck("client"), stomp.WithPrefetch( c.Int("docker-max-procs"), ), - ) + } + if filter := c.String("filter"); filter != "" { + opts = append(opts, stomp.WithSelector(filter)) + } + + // subscribe to the pending build queue. + client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) { + go handler(m) // HACK until we a channel based Subscribe implementation + }), opts...) logrus.Infof("Server connection establish, ready to process builds.") <-client.Done() diff --git a/server/build.go b/server/build.go index 3fdc81960..5a35f126d 100644 --- a/server/build.go +++ b/server/build.go @@ -351,7 +351,15 @@ func PostBuild(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }, stomp.WithHeaders(yaml.ParseLabel(raw))) + }, + stomp.WithHeader( + "platform", + yaml.ParsePlatformDefault(raw, "linux/amd64"), + ), + stomp.WithHeaders( + yaml.ParseLabel(raw), + ), + ) } } diff --git a/server/hook.go b/server/hook.go index 7f722c127..ef167f231 100644 --- a/server/hook.go +++ b/server/hook.go @@ -237,7 +237,15 @@ func PostHook(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }, stomp.WithHeaders(yaml.ParseLabel(raw))) + }, + stomp.WithHeader( + "platform", + yaml.ParsePlatformDefault(raw, "linux/amd64"), + ), + stomp.WithHeaders( + yaml.ParseLabel(raw), + ), + ) } } diff --git a/yaml/platform.go b/yaml/platform.go new file mode 100644 index 000000000..69e1cccc2 --- /dev/null +++ b/yaml/platform.go @@ -0,0 +1,26 @@ +package yaml + +import "gopkg.in/yaml.v2" + +// ParsePlatform parses the platform section of the Yaml document. +func ParsePlatform(in []byte) string { + out := struct { + Platform string `yaml:"platform"` + }{} + + yaml.Unmarshal(in, &out) + return out.Platform +} + +// ParsePlatformString parses the platform section of the Yaml document. +func ParsePlatformString(in string) string { + return ParsePlatform([]byte(in)) +} + +// ParsePlatformDefault parses the platform section of the Yaml document. +func ParsePlatformDefault(in []byte, platform string) string { + if p := ParsePlatform([]byte(in)); p != "" { + return p + } + return platform +} From 778971eb689dfa1c64c99669e86f61a96d6d9521 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 27 Sep 2016 20:30:28 -0500 Subject: [PATCH 05/14] removed unused packages --- agent/agent.go | 9 +- agent/updater.go | 7 +- bus/bus.go | 39 ----- bus/bus_impl.go | 46 ------ bus/bus_impl_test.go | 73 --------- bus/context.go | 21 --- drone/agent/agent.go | 4 +- drone/agent/exec.go | 4 +- drone/exec.go | 3 +- drone/server.go | 3 - bus/types.go => model/event.go | 16 +- model/work.go | 19 +++ queue/context.go | 23 --- queue/queue.go | 67 -------- queue/queue_impl.go | 85 ---------- queue/queue_impl_test.go | 94 ----------- queue/types.go | 21 --- router/middleware/bus.go | 17 -- router/middleware/queue.go | 17 -- router/middleware/stream.go | 17 -- server/build.go | 34 ++-- server/hook.go | 8 +- server/queue.go | 282 +-------------------------------- stream/context.go | 21 --- stream/reader.go | 54 ------- stream/reader_test.go | 7 - stream/stream.go | 60 ------- stream/stream_impl.go | 72 --------- stream/stream_impl_test.go | 7 - stream/writer.go | 52 ------ stream/writer_test.go | 7 - 31 files changed, 69 insertions(+), 1120 deletions(-) delete mode 100644 bus/bus.go delete mode 100644 bus/bus_impl.go delete mode 100644 bus/bus_impl_test.go delete mode 100644 bus/context.go rename bus/types.go => model/event.go (61%) create mode 100644 model/work.go delete mode 100644 queue/context.go delete mode 100644 queue/queue.go delete mode 100644 queue/queue_impl.go delete mode 100644 queue/queue_impl_test.go delete mode 100644 queue/types.go delete mode 100644 router/middleware/bus.go delete mode 100644 router/middleware/queue.go delete mode 100644 router/middleware/stream.go delete mode 100644 stream/context.go delete mode 100644 stream/reader.go delete mode 100644 stream/reader_test.go delete mode 100644 stream/stream.go delete mode 100644 stream/stream_impl.go delete mode 100644 stream/stream_impl_test.go delete mode 100644 stream/writer.go delete mode 100644 stream/writer_test.go diff --git a/agent/agent.go b/agent/agent.go index afe0e14df..651a616ad 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -11,7 +11,6 @@ import ( "github.com/drone/drone/build" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/version" "github.com/drone/drone/yaml" "github.com/drone/drone/yaml/expander" @@ -48,7 +47,7 @@ func (a *Agent) Poll() error { return nil } -func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error { +func (a *Agent) Run(payload *model.Work, cancel <-chan bool) error { payload.Job.Status = model.StatusRunning payload.Job.Started = time.Now().Unix() @@ -90,7 +89,7 @@ func (a *Agent) Run(payload *queue.Work, cancel <-chan bool) error { return err } -func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { +func (a *Agent) prep(w *model.Work) (*yaml.Config, error) { envs := toEnv(w) w.Yaml = expander.ExpandString(w.Yaml, envs) @@ -172,7 +171,7 @@ func (a *Agent) prep(w *queue.Work) (*yaml.Config, error) { return conf, nil } -func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) error { +func (a *Agent) exec(spec *yaml.Config, payload *model.Work, cancel <-chan bool) error { conf := build.Config{ Engine: a.Engine, @@ -231,7 +230,7 @@ func (a *Agent) exec(spec *yaml.Config, payload *queue.Work, cancel <-chan bool) } } -func toEnv(w *queue.Work) map[string]string { +func toEnv(w *model.Work) map[string]string { envs := map[string]string{ "CI": "drone", "DRONE": "true", diff --git a/agent/updater.go b/agent/updater.go index ba5c1aa4d..e2e1f43ad 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -6,17 +6,16 @@ import ( "github.com/Sirupsen/logrus" "github.com/drone/drone/build" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/mq/stomp" ) // UpdateFunc handles buid pipeline status updates. -type UpdateFunc func(*queue.Work) +type UpdateFunc func(*model.Work) // LoggerFunc handles buid pipeline logging updates. type LoggerFunc func(*build.Line) -var NoopUpdateFunc = func(*queue.Work) {} +var NoopUpdateFunc = func(*model.Work) {} var TermLoggerFunc = func(line *build.Line) { fmt.Println(line) @@ -25,7 +24,7 @@ var TermLoggerFunc = func(line *build.Line) { // NewClientUpdater returns an updater that sends updated build details // to the drone server. func NewClientUpdater(client *stomp.Client) UpdateFunc { - return func(w *queue.Work) { + return func(w *model.Work) { err := client.SendJSON("/queue/updates", w) if err != nil { logrus.Errorf("Error updating %s/%s#%d.%d. %s", diff --git a/bus/bus.go b/bus/bus.go deleted file mode 100644 index 5f858b954..000000000 --- a/bus/bus.go +++ /dev/null @@ -1,39 +0,0 @@ -package bus - -//go:generate mockery -name Bus -output mock -case=underscore - -// Bus represents an event bus implementation that -// allows a publisher to broadcast Event notifications -// to a list of subscribers. -type Bus interface { - // Publish broadcasts an event to all subscribers. - Publish(*Event) - - // Subscribe adds the channel to the list of - // subscribers. Each subscriber in the list will - // receive broadcast events. - Subscribe(chan *Event) - - // Unsubscribe removes the channel from the list - // of subscribers. - Unsubscribe(chan *Event) -} - -// -// // Publish broadcasts an event to all subscribers. -// func Publish(c context.Context, event *Event) { -// FromContext(c).Publish(event) -// } -// -// // Subscribe adds the channel to the list of -// // subscribers. Each subscriber in the list will -// // receive broadcast events. -// func Subscribe(c context.Context, eventc chan *Event) { -// FromContext(c).Subscribe(eventc) -// } -// -// // Unsubscribe removes the channel from the -// // list of subscribers. -// func Unsubscribe(c context.Context, eventc chan *Event) { -// FromContext(c).Unsubscribe(eventc) -// } diff --git a/bus/bus_impl.go b/bus/bus_impl.go deleted file mode 100644 index d0f0e6a64..000000000 --- a/bus/bus_impl.go +++ /dev/null @@ -1,46 +0,0 @@ -package bus - -import ( - "sync" -) - -type eventbus struct { - sync.Mutex - subs map[chan *Event]bool -} - -// New creates a simple event bus that manages a list of -// subscribers to which events are published. -func New() Bus { - return newEventbus() -} - -func newEventbus() *eventbus { - return &eventbus{ - subs: make(map[chan *Event]bool), - } -} - -func (b *eventbus) Subscribe(c chan *Event) { - b.Lock() - b.subs[c] = true - b.Unlock() -} - -func (b *eventbus) Unsubscribe(c chan *Event) { - b.Lock() - delete(b.subs, c) - b.Unlock() -} - -func (b *eventbus) Publish(event *Event) { - b.Lock() - defer b.Unlock() - - for s := range b.subs { - go func(c chan *Event) { - defer recover() - c <- event - }(s) - } -} diff --git a/bus/bus_impl_test.go b/bus/bus_impl_test.go deleted file mode 100644 index ffcb1e563..000000000 --- a/bus/bus_impl_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package bus - -import ( - "sync" - "testing" - - "github.com/drone/drone/model" - . "github.com/franela/goblin" - "github.com/gin-gonic/gin" -) - -func TestBus(t *testing.T) { - g := Goblin(t) - g.Describe("Event bus", func() { - - g.It("Should unsubscribe", func() { - c := new(gin.Context) - b := newEventbus() - ToContext(c, b) - - c1 := make(chan *Event) - c2 := make(chan *Event) - Subscribe(c, c1) - Subscribe(c, c2) - - g.Assert(len(b.subs)).Equal(2) - }) - - g.It("Should subscribe", func() { - c := new(gin.Context) - b := newEventbus() - ToContext(c, b) - - c1 := make(chan *Event) - c2 := make(chan *Event) - Subscribe(c, c1) - Subscribe(c, c2) - - g.Assert(len(b.subs)).Equal(2) - - Unsubscribe(c, c1) - Unsubscribe(c, c2) - - g.Assert(len(b.subs)).Equal(0) - }) - - g.It("Should publish", func() { - c := new(gin.Context) - b := New() - ToContext(c, b) - - e1 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) - e2 := NewEvent(Started, &model.Repo{}, &model.Build{}, &model.Job{}) - c1 := make(chan *Event) - - Subscribe(c, c1) - - var wg sync.WaitGroup - wg.Add(1) - - var r1, r2 *Event - go func() { - r1 = <-c1 - r2 = <-c1 - wg.Done() - }() - Publish(c, e1) - Publish(c, e2) - wg.Wait() - }) - }) - -} diff --git a/bus/context.go b/bus/context.go deleted file mode 100644 index 4eccfa7f0..000000000 --- a/bus/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package bus - -import "golang.org/x/net/context" - -const key = "bus" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Bus associated with this context. -func FromContext(c context.Context) Bus { - return c.Value(key).(Bus) -} - -// ToContext adds the Bus to this context if it supports -// the Setter interface. -func ToContext(c Setter, b Bus) { - c.Set(key, b) -} diff --git a/drone/agent/agent.go b/drone/agent/agent.go index ec94eada4..b3037a701 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -8,7 +8,7 @@ import ( "syscall" "time" - "github.com/drone/drone/queue" + "github.com/drone/drone/model" "github.com/drone/mq/stomp" "github.com/Sirupsen/logrus" @@ -190,7 +190,7 @@ func start(c *cli.Context) { }, } - work := new(queue.Work) + work := new(model.Work) m.Unmarshal(work) r.run(work) } diff --git a/drone/agent/exec.go b/drone/agent/exec.go index 269a6754f..7ad0b458c 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -6,7 +6,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" - "github.com/drone/drone/queue" + "github.com/drone/drone/model" "github.com/drone/mq/stomp" "github.com/samalba/dockerclient" @@ -27,7 +27,7 @@ type pipeline struct { config config } -func (r *pipeline) run(w *queue.Work) { +func (r *pipeline) run(w *model.Work) { // defer func() { // // r.drone.Ack(id, opts) diff --git a/drone/exec.go b/drone/exec.go index c766707df..f061f1bc9 100644 --- a/drone/exec.go +++ b/drone/exec.go @@ -13,7 +13,6 @@ import ( "github.com/drone/drone/agent" "github.com/drone/drone/build/docker" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/yaml" "github.com/codegangsta/cli" @@ -340,7 +339,7 @@ func exec(c *cli.Context) error { Pull: c.Bool("pull"), } - payload := &queue.Work{ + payload := &model.Work{ Yaml: string(file), Verified: c.BoolT("yaml.verified"), Signed: c.BoolT("yaml.signed"), diff --git a/drone/server.go b/drone/server.go index 94c7cfe01..465595ce3 100644 --- a/drone/server.go +++ b/drone/server.go @@ -288,9 +288,6 @@ func server(c *cli.Context) error { ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), middleware.Version, middleware.Config(c), - middleware.Queue(c), - middleware.Stream(c), - middleware.Bus(c), middleware.Cache(c), middleware.Store(c), middleware.Remote(c), diff --git a/bus/types.go b/model/event.go similarity index 61% rename from bus/types.go rename to model/event.go index 6be95a3d7..efee037cc 100644 --- a/bus/types.go +++ b/model/event.go @@ -1,6 +1,4 @@ -package bus - -import "github.com/drone/drone/model" +package model // EventType defines the possible types of build events. type EventType string @@ -14,15 +12,15 @@ const ( // Event represents a build event. type Event struct { - Type EventType `json:"type"` - Repo model.Repo `json:"repo"` - Build model.Build `json:"build"` - Job model.Job `json:"job"` + Type EventType `json:"type"` + Repo Repo `json:"repo"` + Build Build `json:"build"` + Job Job `json:"job"` } // NewEvent creates a new Event for the build, using copies of // the build data to avoid possible mutation or race conditions. -func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { +func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event { return &Event{ Type: t, Repo: *r, @@ -31,7 +29,7 @@ func NewEvent(t EventType, r *model.Repo, b *model.Build, j *model.Job) *Event { } } -func NewBuildEvent(t EventType, r *model.Repo, b *model.Build) *Event { +func NewBuildEvent(t EventType, r *Repo, b *Build) *Event { return &Event{ Type: t, Repo: *r, diff --git a/model/work.go b/model/work.go new file mode 100644 index 000000000..06c69d91c --- /dev/null +++ b/model/work.go @@ -0,0 +1,19 @@ +package model + +// Work represents an item for work to be +// processed by a worker. +type Work struct { + Signed bool `json:"signed"` + Verified bool `json:"verified"` + Yaml string `json:"config"` + YamlEnc string `json:"secret"` + Repo *Repo `json:"repo"` + Build *Build `json:"build"` + BuildLast *Build `json:"build_last"` + Job *Job `json:"job"` + Netrc *Netrc `json:"netrc"` + Keys *Key `json:"keys"` + System *System `json:"system"` + Secrets []*Secret `json:"secrets"` + User *User `json:"user"` +} diff --git a/queue/context.go b/queue/context.go deleted file mode 100644 index 98a78e9ec..000000000 --- a/queue/context.go +++ /dev/null @@ -1,23 +0,0 @@ -package queue - -import ( - "golang.org/x/net/context" -) - -const key = "queue" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Queue associated with this context. -func FromContext(c context.Context) Queue { - return c.Value(key).(Queue) -} - -// ToContext adds the Queue to this context if it supports -// the Setter interface. -func ToContext(c Setter, q Queue) { - c.Set(key, q) -} diff --git a/queue/queue.go b/queue/queue.go deleted file mode 100644 index 0cdf2e6fc..000000000 --- a/queue/queue.go +++ /dev/null @@ -1,67 +0,0 @@ -package queue - -//go:generate mockery -name Queue -output mock -case=underscore - -import ( - "errors" - - "golang.org/x/net/context" -) - -// ErrNotFound indicates the requested work item does not -// exist in the queue. -var ErrNotFound = errors.New("queue item not found") - -type Queue interface { - // Publish inserts work at the tail of this queue, waiting for - // space to become available if the queue is full. - Publish(*Work) error - - // Remove removes the specified work item from this queue, - // if it is present. - Remove(*Work) error - - // PullClose retrieves and removes the head of this queue, - // waiting if necessary until work becomes available. - Pull() *Work - - // PullClose retrieves and removes the head of this queue, - // waiting if necessary until work becomes available. The - // CloseNotifier should be provided to clone the channel - // if the subscribing client terminates its connection. - PullClose(CloseNotifier) *Work -} - -// Publish inserts work at the tail of this queue, waiting for -// space to become available if the queue is full. -func Publish(c context.Context, w *Work) error { - return FromContext(c).Publish(w) -} - -// Remove removes the specified work item from this queue, -// if it is present. -func Remove(c context.Context, w *Work) error { - return FromContext(c).Remove(w) -} - -// Pull retrieves and removes the head of this queue, -// waiting if necessary until work becomes available. -func Pull(c context.Context) *Work { - return FromContext(c).Pull() -} - -// PullClose retrieves and removes the head of this queue, -// waiting if necessary until work becomes available. The -// CloseNotifier should be provided to clone the channel -// if the subscribing client terminates its connection. -func PullClose(c context.Context, cn CloseNotifier) *Work { - return FromContext(c).PullClose(cn) -} - -// CloseNotifier defines a datastructure that is capable of notifying -// a subscriber when its connection is closed. -type CloseNotifier interface { - // CloseNotify returns a channel that receives a single value - // when the client connection has gone away. - CloseNotify() <-chan bool -} diff --git a/queue/queue_impl.go b/queue/queue_impl.go deleted file mode 100644 index 8882bc24d..000000000 --- a/queue/queue_impl.go +++ /dev/null @@ -1,85 +0,0 @@ -package queue - -import "sync" - -type queue struct { - sync.Mutex - - items map[*Work]struct{} - itemc chan *Work -} - -func New() Queue { - return newQueue() -} - -func newQueue() *queue { - return &queue{ - items: make(map[*Work]struct{}), - itemc: make(chan *Work, 999), - } -} - -func (q *queue) Publish(work *Work) error { - q.Lock() - q.items[work] = struct{}{} - q.Unlock() - q.itemc <- work - return nil -} - -func (q *queue) Remove(work *Work) error { - q.Lock() - defer q.Unlock() - - _, ok := q.items[work] - if !ok { - return ErrNotFound - } - var items []*Work - - // loop through and drain all items - // from the -drain: - for { - select { - case item := <-q.itemc: - items = append(items, item) - default: - break drain - } - } - - // re-add all items to the queue except - // the item we're trying to remove - for _, item := range items { - if item == work { - delete(q.items, work) - continue - } - q.itemc <- item - } - return nil -} - -func (q *queue) Pull() *Work { - work := <-q.itemc - q.Lock() - delete(q.items, work) - q.Unlock() - return work -} - -func (q *queue) PullClose(cn CloseNotifier) *Work { - for { - select { - case <-cn.CloseNotify(): - return nil - case work := <-q.itemc: - q.Lock() - delete(q.items, work) - q.Unlock() - return work - } - } -} diff --git a/queue/queue_impl_test.go b/queue/queue_impl_test.go deleted file mode 100644 index 45f38bff6..000000000 --- a/queue/queue_impl_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package queue - -// -// import ( -// "sync" -// "testing" -// -// . "github.com/franela/goblin" -// "github.com/gin-gonic/gin" -// ) -// -// func TestBuild(t *testing.T) { -// g := Goblin(t) -// g.Describe("Queue", func() { -// -// g.It("Should publish item", func() { -// c := new(gin.Context) -// q := newQueue() -// ToContext(c, q) -// -// w1 := &Work{} -// w2 := &Work{} -// Publish(c, w1) -// Publish(c, w2) -// g.Assert(len(q.items)).Equal(2) -// g.Assert(len(q.itemc)).Equal(2) -// }) -// -// g.It("Should remove item", func() { -// c := new(gin.Context) -// q := newQueue() -// ToContext(c, q) -// -// w1 := &Work{} -// w2 := &Work{} -// w3 := &Work{} -// Publish(c, w1) -// Publish(c, w2) -// Publish(c, w3) -// Remove(c, w2) -// g.Assert(len(q.items)).Equal(2) -// g.Assert(len(q.itemc)).Equal(2) -// -// g.Assert(Pull(c)).Equal(w1) -// g.Assert(Pull(c)).Equal(w3) -// g.Assert(Remove(c, w2)).Equal(ErrNotFound) -// }) -// -// g.It("Should pull item", func() { -// c := new(gin.Context) -// q := New() -// ToContext(c, q) -// -// cn := new(closeNotifier) -// cn.closec = make(chan bool, 1) -// w1 := &Work{} -// w2 := &Work{} -// -// Publish(c, w1) -// g.Assert(Pull(c)).Equal(w1) -// -// Publish(c, w2) -// g.Assert(PullClose(c, cn)).Equal(w2) -// }) -// -// g.It("Should cancel pulling item", func() { -// c := new(gin.Context) -// q := New() -// ToContext(c, q) -// -// cn := new(closeNotifier) -// cn.closec = make(chan bool, 1) -// var wg sync.WaitGroup -// go func() { -// wg.Add(1) -// g.Assert(PullClose(c, cn) == nil).IsTrue() -// wg.Done() -// }() -// go func() { -// cn.closec <- true -// }() -// wg.Wait() -// -// }) -// }) -// } -// -// type closeNotifier struct { -// closec chan bool -// } -// -// func (c *closeNotifier) CloseNotify() <-chan bool { -// return c.closec -// } diff --git a/queue/types.go b/queue/types.go deleted file mode 100644 index 48fc41942..000000000 --- a/queue/types.go +++ /dev/null @@ -1,21 +0,0 @@ -package queue - -import "github.com/drone/drone/model" - -// Work represents an item for work to be -// processed by a worker. -type Work struct { - Signed bool `json:"signed"` - Verified bool `json:"verified"` - Yaml string `json:"config"` - YamlEnc string `json:"secret"` - Repo *model.Repo `json:"repo"` - Build *model.Build `json:"build"` - BuildLast *model.Build `json:"build_last"` - Job *model.Job `json:"job"` - Netrc *model.Netrc `json:"netrc"` - Keys *model.Key `json:"keys"` - System *model.System `json:"system"` - Secrets []*model.Secret `json:"secrets"` - User *model.User `json:"user"` -} diff --git a/router/middleware/bus.go b/router/middleware/bus.go deleted file mode 100644 index 25665da1d..000000000 --- a/router/middleware/bus.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/bus" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Bus is a middleware function that initializes the Event Bus and attaches to -// the context of every http.Request. -func Bus(cli *cli.Context) gin.HandlerFunc { - v := bus.New() - return func(c *gin.Context) { - bus.ToContext(c, v) - } -} diff --git a/router/middleware/queue.go b/router/middleware/queue.go deleted file mode 100644 index d2791033e..000000000 --- a/router/middleware/queue.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/queue" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Queue is a middleware function that initializes the Queue and attaches to -// the context of every http.Request. -func Queue(cli *cli.Context) gin.HandlerFunc { - v := queue.New() - return func(c *gin.Context) { - queue.ToContext(c, v) - } -} diff --git a/router/middleware/stream.go b/router/middleware/stream.go deleted file mode 100644 index d78a119c2..000000000 --- a/router/middleware/stream.go +++ /dev/null @@ -1,17 +0,0 @@ -package middleware - -import ( - "github.com/drone/drone/stream" - - "github.com/codegangsta/cli" - "github.com/gin-gonic/gin" -) - -// Stream is a middleware function that initializes the Stream and attaches to -// the context of every http.Request. -func Stream(cli *cli.Context) gin.HandlerFunc { - v := stream.New() - return func(c *gin.Context) { - stream.ToContext(c, v) - } -} diff --git a/server/build.go b/server/build.go index 5a35f126d..e8e883b35 100644 --- a/server/build.go +++ b/server/build.go @@ -1,17 +1,16 @@ package server import ( + "bufio" + "io" "net/http" "strconv" "time" log "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" - "github.com/drone/drone/stream" "github.com/drone/drone/yaml" "github.com/gin-gonic/gin" "github.com/square/go-jose" @@ -114,7 +113,7 @@ func GetBuildLogs(c *gin.Context) { } c.Header("Content-Type", "application/json") - stream.Copy(c.Writer, r) + copyLogs(c.Writer, r) } func DeleteBuild(c *gin.Context) { @@ -151,8 +150,8 @@ func DeleteBuild(c *gin.Context) { store.UpdateBuildJob(c, build, job) client := stomp.MustFromContext(c) - client.SendJSON("/topic/cancel", bus.Event{ - Type: bus.Cancelled, + client.SendJSON("/topic/cancel", model.Event{ + Type: model.Cancelled, Repo: *repo, Build: *build, Job: *job, @@ -328,8 +327,8 @@ func PostBuild(c *gin.Context) { log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) client := stomp.MustFromContext(c) - client.SendJSON("/topic/events", bus.Event{ - Type: bus.Enqueued, + client.SendJSON("/topic/events", model.Event{ + Type: model.Enqueued, Repo: *repo, Build: *build, }, @@ -339,7 +338,7 @@ func PostBuild(c *gin.Context) { for _, job := range jobs { broker, _ := stomp.FromContext(c) - broker.SendJSON("/queue/pending", &queue.Work{ + broker.SendJSON("/queue/pending", &model.Work{ Signed: signed, Verified: verified, User: user, @@ -371,3 +370,20 @@ func GetBuildQueue(c *gin.Context) { } c.JSON(200, out) } + +// copyLogs copies the stream from the source to the destination in valid JSON +// format. This converts the logs, which are per-line JSON objects, to a +// proper JSON array. +func copyLogs(dest io.Writer, src io.Reader) error { + io.WriteString(dest, "[") + + scanner := bufio.NewScanner(src) + for scanner.Scan() { + io.WriteString(dest, scanner.Text()) + io.WriteString(dest, ",\n") + } + + io.WriteString(dest, "{}]") + + return nil +} diff --git a/server/hook.go b/server/hook.go index ef167f231..9f87437e6 100644 --- a/server/hook.go +++ b/server/hook.go @@ -9,9 +9,7 @@ import ( "github.com/square/go-jose" log "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/shared/token" @@ -214,8 +212,8 @@ func PostHook(c *gin.Context) { } client := stomp.MustFromContext(c) - client.SendJSON("/topic/events", bus.Event{ - Type: bus.Enqueued, + client.SendJSON("/topic/events", model.Event{ + Type: model.Enqueued, Repo: *repo, Build: *build, }, @@ -225,7 +223,7 @@ func PostHook(c *gin.Context) { for _, job := range jobs { broker, _ := stomp.FromContext(c) - broker.SendJSON("/queue/pending", &queue.Work{ + broker.SendJSON("/queue/pending", &model.Work{ Signed: build.Signed, Verified: build.Verified, User: user, diff --git a/server/queue.go b/server/queue.go index d9ca743c7..25d44cb82 100644 --- a/server/queue.go +++ b/server/queue.go @@ -10,289 +10,13 @@ import ( "golang.org/x/net/context" "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" - "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/store" "github.com/drone/mq/stomp" "github.com/gorilla/websocket" ) -// -// // Pull is a long request that polls and attemts to pull work off the queue stack. -// func Pull(c *gin.Context) { -// logrus.Debugf("Agent %s connected.", c.ClientIP()) -// -// w := queue.PullClose(c, c.Writer) -// if w == nil { -// logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) -// } else { -// -// // setup the channel to stream logs -// if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { -// logrus.Errorf("Unable to create stream. %s", err) -// } -// -// c.JSON(202, w) -// -// logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", -// c.ClientIP(), -// w.Repo.Owner, -// w.Repo.Name, -// w.Build.Number, -// w.Job.Number, -// ) -// } -// } -// -// // Wait is a long request that polls and waits for cancelled build requests. -// func Wait(c *gin.Context) { -// id, err := strconv.ParseInt(c.Param("id"), 10, 64) -// if err != nil { -// c.String(500, "Invalid input. %s", err) -// return -// } -// -// eventc := make(chan *bus.Event, 1) -// -// bus.Subscribe(c, eventc) -// defer bus.Unsubscribe(c, eventc) -// -// for { -// select { -// case event := <-eventc: -// if event.Job.ID == id && event.Type == bus.Cancelled { -// c.JSON(200, event.Job) -// return -// } -// case <-c.Writer.CloseNotify(): -// return -// } -// } -// } -// -// // Update handles build updates from the agent and persists to the database. -// func Update(c *gin.Context) { -// work := &queue.Work{} -// if err := c.BindJSON(work); err != nil { -// logrus.Errorf("Invalid input. %s", err) -// return -// } -// -// // TODO(bradrydzewski) it is really annoying that we have to do this lookup -// // and I'd prefer not to. The reason we do this is because the Build and Job -// // have fields that aren't serialized to json and would be reset to their -// // empty values if we just saved what was coming in the http.Request body. -// build, err := store.GetBuild(c, work.Build.ID) -// if err != nil { -// c.String(404, "Unable to find build. %s", err) -// return -// } -// job, err := store.GetJob(c, work.Job.ID) -// if err != nil { -// c.String(404, "Unable to find job. %s", err) -// return -// } -// build.Started = work.Build.Started -// build.Finished = work.Build.Finished -// build.Status = work.Build.Status -// job.Started = work.Job.Started -// job.Finished = work.Job.Finished -// job.Status = work.Job.Status -// job.ExitCode = work.Job.ExitCode -// job.Error = work.Job.Error -// -// if build.Status == model.StatusPending { -// build.Started = work.Job.Started -// build.Status = model.StatusRunning -// store.UpdateBuild(c, build) -// } -// -// // if job.Status == model.StatusRunning { -// // err := stream.Create(c, stream.ToKey(job.ID)) -// // if err != nil { -// // logrus.Errorf("Unable to create stream. %s", err) -// // } -// // } -// -// ok, err := store.UpdateBuildJob(c, build, job) -// if err != nil { -// c.String(500, "Unable to update job. %s", err) -// return -// } -// -// if ok && build.Status != model.StatusRunning { -// // get the user because we transfer the user form the server to agent -// // and back we lose the token which does not get serialized to json. -// user, err := store.GetUser(c, work.User.ID) -// if err != nil { -// c.String(500, "Unable to find user. %s", err) -// return -// } -// remote.Status(c, user, work.Repo, build, -// fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) -// } -// -// if build.Status == model.StatusRunning { -// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) -// } else { -// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) -// } -// -// c.JSON(200, work) -// } -// -// // Stream streams the logs to disk or memory for broadcasing to listeners. Once -// // the stream is closed it is moved to permanent storage in the database. -// func Stream(c *gin.Context) { -// id, err := strconv.ParseInt(c.Param("id"), 10, 64) -// if err != nil { -// c.String(500, "Invalid input. %s", err) -// return -// } -// -// key := c.Param("id") -// logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) -// -// wc, err := stream.Writer(c, key) -// if err != nil { -// c.String(500, "Failed to create stream writer. %s", err) -// return -// } -// -// defer func() { -// wc.Close() -// stream.Delete(c, key) -// }() -// -// io.Copy(wc, c.Request.Body) -// -// rc, err := stream.Reader(c, key) -// if err != nil { -// c.String(500, "Failed to create stream reader. %s", err) -// return -// } -// -// wg := sync.WaitGroup{} -// wg.Add(1) -// -// go func() { -// defer recover() -// store.WriteLog(c, &model.Job{ID: id}, rc) -// wg.Done() -// }() -// -// wc.Close() -// wg.Wait() -// c.String(200, "") -// -// logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) -// } -// -// func Ping(c *gin.Context) { -// agent, err := store.GetAgentAddr(c, c.ClientIP()) -// if err == nil { -// agent.Updated = time.Now().Unix() -// err = store.UpdateAgent(c, agent) -// } else { -// err = store.CreateAgent(c, &model.Agent{ -// Address: c.ClientIP(), -// Platform: "linux/amd64", -// Capacity: 2, -// Created: time.Now().Unix(), -// Updated: time.Now().Unix(), -// }) -// } -// if err != nil { -// logrus.Errorf("Unable to register agent. %s", err.Error()) -// } -// c.String(200, "PONG") -// } - -// -// -// Below are alternate implementations for the Queue that use websockets. -// -// -// -// // PostLogs handles an http request from the agent to post build logs. These -// // logs are posted at the end of the build process. -// func PostLogs(c *gin.Context) { -// id, _ := strconv.ParseInt(c.Param("id"), 10, 64) -// job, err := store.GetJob(c, id) -// if err != nil { -// c.String(404, "Cannot upload logs. %s", err) -// return -// } -// if err := store.WriteLog(c, job, c.Request.Body); err != nil { -// c.String(500, "Cannot persist logs", err) -// return -// } -// c.String(200, "") -// } -// -// // WriteLogs handles an http request from the agent to stream build logs from -// // the agent to the server to enable real time streamings to the client. -// func WriteLogs(c *gin.Context) { -// id, err := strconv.ParseInt(c.Param("id"), 10, 64) -// if err != nil { -// c.String(500, "Invalid input. %s", err) -// return -// } -// -// conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) -// if err != nil { -// c.String(500, "Cannot upgrade to websocket. %s", err) -// return -// } -// defer conn.Close() -// -// wc, err := stream.Writer(c, stream.ToKey(id)) -// if err != nil { -// c.String(500, "Cannot create stream writer. %s", err) -// return -// } -// defer func() { -// wc.Close() -// stream.Delete(c, stream.ToKey(id)) -// }() -// -// var msg []byte -// for { -// _, msg, err = conn.ReadMessage() -// if err != nil { -// break -// } -// wc.Write(msg) -// wc.Write(newline) -// } -// -// if err != nil && err != io.EOF { -// c.String(500, "Error reading logs. %s", err) -// return -// } -// // -// // rc, err := stream.Reader(c, stream.ToKey(id)) -// // if err != nil { -// // c.String(500, "Failed to create stream reader. %s", err) -// // return -// // } -// // -// // wg := sync.WaitGroup{} -// // wg.Add(1) -// // -// // go func() { -// // defer recover() -// // store.WriteLog(c, &model.Job{ID: id}, rc) -// // wg.Done() -// // }() -// // -// // wc.Close() -// // wg.Wait() -// -// } - // newline defines a newline constant to separate lines in the build output var newline = []byte{'\n'} @@ -315,7 +39,7 @@ func HandleUpdate(c context.Context, message *stomp.Message) { } }() - work := new(queue.Work) + work := new(model.Work) if err := message.Unmarshal(work); err != nil { logrus.Errorf("Invalid input. %s", err) return @@ -376,8 +100,8 @@ func HandleUpdate(c context.Context, message *stomp.Message) { } client := stomp.MustFromContext(c) - err = client.SendJSON("/topic/events", bus.Event{ - Type: bus.Started, + err = client.SendJSON("/topic/events", model.Event{ + Type: model.Started, Repo: *work.Repo, Build: *build, Job: *job, diff --git a/stream/context.go b/stream/context.go deleted file mode 100644 index e1202cd1b..000000000 --- a/stream/context.go +++ /dev/null @@ -1,21 +0,0 @@ -package stream - -import "golang.org/x/net/context" - -const key = "stream" - -// Setter defines a context that enables setting values. -type Setter interface { - Set(string, interface{}) -} - -// FromContext returns the Stream associated with this context. -func FromContext(c context.Context) Stream { - return c.Value(key).(Stream) -} - -// ToContext adds the Stream to this context if it supports the -// Setter interface. -func ToContext(c Setter, s Stream) { - c.Set(key, s) -} diff --git a/stream/reader.go b/stream/reader.go deleted file mode 100644 index 935f0f93c..000000000 --- a/stream/reader.go +++ /dev/null @@ -1,54 +0,0 @@ -package stream - -import ( - "bytes" - "io" - "sync/atomic" -) - -type reader struct { - w *writer - off int - closed uint32 -} - -// Read reads from the Buffer -func (r *reader) Read(p []byte) (n int, err error) { - r.w.RLock() - defer r.w.RUnlock() - - var m int - - for len(p) > 0 { - - m, _ = bytes.NewReader(r.w.buffer.Bytes()[r.off:]).Read(p) - n += m - r.off += n - - if n > 0 { - break - } - - if r.w.Closed() { - err = io.EOF - break - } - if r.Closed() { - err = io.EOF - break - } - - r.w.Wait() - } - - return -} - -func (r *reader) Close() error { - atomic.StoreUint32(&r.closed, 1) - return nil -} - -func (r *reader) Closed() bool { - return atomic.LoadUint32(&r.closed) != 0 -} diff --git a/stream/reader_test.go b/stream/reader_test.go deleted file mode 100644 index e113cc4a4..000000000 --- a/stream/reader_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsReader(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement reader tests -} diff --git a/stream/stream.go b/stream/stream.go deleted file mode 100644 index 2619b53cb..000000000 --- a/stream/stream.go +++ /dev/null @@ -1,60 +0,0 @@ -package stream - -import ( - "bufio" - "io" - "strconv" - - "golang.org/x/net/context" -) - -// Stream manages the stream of build logs. -type Stream interface { - Create(string) error - Delete(string) error - Reader(string) (io.ReadCloser, error) - Writer(string) (io.WriteCloser, error) -} - -// Create creates a new stream. -func Create(c context.Context, key string) error { - return FromContext(c).Create(key) -} - -// Reader opens the stream for reading. -func Reader(c context.Context, key string) (io.ReadCloser, error) { - return FromContext(c).Reader(key) -} - -// Writer opens the stream for writing. -func Writer(c context.Context, key string) (io.WriteCloser, error) { - return FromContext(c).Writer(key) -} - -// Delete deletes the stream by key. -func Delete(c context.Context, key string) error { - return FromContext(c).Delete(key) -} - -// ToKey is a helper function that converts a unique identifier -// of type int64 into a string. -func ToKey(i int64) string { - return strconv.FormatInt(i, 10) -} - -// Copy copies the stream from the source to the destination in valid JSON -// format. This converts the logs, which are per-line JSON objects, to a -// proper JSON array. -func Copy(dest io.Writer, src io.Reader) error { - io.WriteString(dest, "[") - - scanner := bufio.NewScanner(src) - for scanner.Scan() { - io.WriteString(dest, scanner.Text()) - io.WriteString(dest, ",\n") - } - - io.WriteString(dest, "{}]") - - return nil -} diff --git a/stream/stream_impl.go b/stream/stream_impl.go deleted file mode 100644 index 8e21aaf9b..000000000 --- a/stream/stream_impl.go +++ /dev/null @@ -1,72 +0,0 @@ -package stream - -import ( - "fmt" - "io" - "sync" -) - -type stream struct { - sync.Mutex - writers map[string]*writer -} - -// New returns a new in-memory implementation of Stream. -func New() Stream { - return &stream{ - writers: map[string]*writer{}, - } -} - -// Reader returns an io.Reader for reading from to the stream. -func (s *stream) Reader(name string) (io.ReadCloser, error) { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return nil, fmt.Errorf("stream: cannot read stream %s, not found", name) - } - return s.writers[name].Reader() -} - -// Writer returns an io.WriteCloser for writing to the stream. -func (s *stream) Writer(name string) (io.WriteCloser, error) { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return nil, fmt.Errorf("stream: cannot write stream %s, not found", name) - } - return s.writers[name], nil -} - -// Create creates a new stream. -func (s *stream) Create(name string) error { - s.Lock() - defer s.Unlock() - - if s.exists(name) { - return fmt.Errorf("stream: cannot create stream %s, already exists", name) - } - - s.writers[name] = newWriter() - return nil -} - -// Delete deletes the stream by key. -func (s *stream) Delete(name string) error { - s.Lock() - defer s.Unlock() - - if !s.exists(name) { - return fmt.Errorf("stream: cannot delete stream %s, not found", name) - } - w := s.writers[name] - delete(s.writers, name) - return w.Close() -} - -func (s *stream) exists(name string) bool { - _, exists := s.writers[name] - return exists -} diff --git a/stream/stream_impl_test.go b/stream/stream_impl_test.go deleted file mode 100644 index fdc29fce4..000000000 --- a/stream/stream_impl_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsStream(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement stream tests -} diff --git a/stream/writer.go b/stream/writer.go deleted file mode 100644 index 15a873604..000000000 --- a/stream/writer.go +++ /dev/null @@ -1,52 +0,0 @@ -package stream - -import ( - "bytes" - "io" - "sync" - "sync/atomic" -) - -type writer struct { - sync.RWMutex - *sync.Cond - - buffer bytes.Buffer - closed uint32 -} - -func newWriter() *writer { - var w writer - w.Cond = sync.NewCond(w.RWMutex.RLocker()) - return &w -} - -func (w *writer) Write(p []byte) (n int, err error) { - defer w.Broadcast() - w.Lock() - defer w.Unlock() - if w.Closed() { - return 0, io.EOF - } - return w.buffer.Write(p) -} - -func (w *writer) Reader() (io.ReadCloser, error) { - return &reader{w: w}, nil -} - -func (w *writer) Wait() { - if !w.Closed() { - w.Cond.Wait() - } -} - -func (w *writer) Close() error { - atomic.StoreUint32(&w.closed, 1) - w.Cond.Broadcast() - return nil -} - -func (w *writer) Closed() bool { - return atomic.LoadUint32(&w.closed) != 0 -} diff --git a/stream/writer_test.go b/stream/writer_test.go deleted file mode 100644 index c0c757e10..000000000 --- a/stream/writer_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package stream - -import "testing" - -func TetsWriter(t *testing.T) { - t.Skip() //TODO(bradrydzewski) implement writer tests -} From 9f937e24251b555a0d2b75a3bcce34e2f54cd66b Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Tue, 27 Sep 2016 20:40:59 -0500 Subject: [PATCH 06/14] removed default image namespacing --- agent/agent.go | 2 -- yaml/transform/clone.go | 2 +- yaml/transform/image.go | 22 ---------------------- 3 files changed, 1 insertion(+), 25 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 651a616ad..74e364c17 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -154,8 +154,6 @@ func (a *Agent) prep(w *model.Work) (*yaml.Config, error) { transform.CommandTransform(conf) transform.ImagePull(conf, a.Pull) transform.ImageTag(conf) - transform.ImageName(conf) - transform.ImageNamespace(conf, a.Namespace) if err := transform.ImageEscalate(conf, a.Escalate); err != nil { return nil, err } diff --git a/yaml/transform/clone.go b/yaml/transform/clone.go index f47abf41f..a54406714 100644 --- a/yaml/transform/clone.go +++ b/yaml/transform/clone.go @@ -7,7 +7,7 @@ const clone = "clone" // Clone transforms the Yaml to include a clone step. func Clone(c *yaml.Config, plugin string) error { if plugin == "" { - plugin = "git" + plugin = "plugins/git" } for _, p := range c.Pipeline { diff --git a/yaml/transform/image.go b/yaml/transform/image.go index a13a983cc..69aee9746 100644 --- a/yaml/transform/image.go +++ b/yaml/transform/image.go @@ -34,28 +34,6 @@ func ImageTag(conf *yaml.Config) error { return nil } -// ImageName transforms the Yaml to replace underscores with dashes. -func ImageName(conf *yaml.Config) error { - for _, image := range conf.Pipeline { - image.Image = strings.Replace(image.Image, "_", "-", -1) - } - return nil -} - -// ImageNamespace transforms the Yaml to use a default namepsace for plugins. -func ImageNamespace(conf *yaml.Config, namespace string) error { - for _, image := range conf.Pipeline { - if strings.Contains(image.Image, "/") { - continue - } - if !isPlugin(image) { - continue - } - image.Image = filepath.Join(namespace, image.Image) - } - return nil -} - // ImageEscalate transforms the Yaml to automatically enable privileged mode // for a subset of white-listed plugins matching the given patterns. func ImageEscalate(conf *yaml.Config, patterns []string) error { From 7c19801bfa77b6f6716ac8ab84339c446594f292 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Wed, 28 Sep 2016 15:52:59 -0400 Subject: [PATCH 07/14] setup docker 0.5.0-rc with queue --- .drone.yml | 5 ++--- .drone.yml.sig | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.drone.yml b/.drone.yml index b00d04a34..13c3f85d6 100644 --- a/.drone.yml +++ b/.drone.yml @@ -33,10 +33,9 @@ pipeline: docker: repo: drone/drone - tag: [ "0.5.0", "0.5" ] - storage_driver: overlay + tag: [ "0.5.0-rc" ] when: - branch: master + branch: feature/mq event: push services: diff --git a/.drone.yml.sig b/.drone.yml.sig index 4f1196374..9519ba37a 100644 --- a/.drone.yml.sig +++ b/.drone.yml.sig @@ -1 +1 @@ -eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICBiYWNrZW5kOgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBzMwogICAgYWNsOiBwdWJsaWMtcmVhZAogICAgYnVja2V0OiBkb3dubG9hZHMuZHJvbmUuaW8KICAgIHNvdXJjZTogcmVsZWFzZS8qKi8qLioKICAgIHdoZW46CiAgICAgIGV2ZW50OiBwdXNoCiAgICAgIGJyYW5jaDogbWFzdGVyCgogIGRvY2tlcjoKICAgIHJlcG86IGRyb25lL2Ryb25lCiAgICB0YWc6IFsgIjAuNS4wIiwgIjAuNSIgXQogICAgc3RvcmFnZV9kcml2ZXI6IG92ZXJsYXkKICAgIHdoZW46CiAgICAgIGJyYW5jaDogbWFzdGVyCiAgICAgIGV2ZW50OiBwdXNoCgpzZXJ2aWNlczoKICBwb3N0Z3JlczoKICAgIGltYWdlOiBwb3N0Z3Jlczo5LjQuNQogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gUE9TVEdSRVNfVVNFUj1wb3N0Z3JlcwogIG15c3FsOgogICAgaW1hZ2U6IG15c3FsOjUuNi4yNwogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gTVlTUUxfREFUQUJBU0U9dGVzdAogICAgICAtIE1ZU1FMX0FMTE9XX0VNUFRZX1BBU1NXT1JEPXllcwo.kQIwqIgs7PnoKIGmzJ6hlbWTbV5zK0w4HVWsux79P3s \ No newline at end of file +eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICBiYWNrZW5kOgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBzMwogICAgYWNsOiBwdWJsaWMtcmVhZAogICAgYnVja2V0OiBkb3dubG9hZHMuZHJvbmUuaW8KICAgIHNvdXJjZTogcmVsZWFzZS8qKi8qLioKICAgIHdoZW46CiAgICAgIGV2ZW50OiBwdXNoCiAgICAgIGJyYW5jaDogbWFzdGVyCgogIGRvY2tlcjoKICAgIHJlcG86IGRyb25lL2Ryb25lCiAgICB0YWc6IFsgIjAuNS4wLXJjIiBdCiAgICB3aGVuOgogICAgICBicmFuY2g6IGZlYXR1cmUvbXEKICAgICAgZXZlbnQ6IHB1c2gKCnNlcnZpY2VzOgogIHBvc3RncmVzOgogICAgaW1hZ2U6IHBvc3RncmVzOjkuNC41CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBQT1NUR1JFU19VU0VSPXBvc3RncmVzCiAgbXlzcWw6CiAgICBpbWFnZTogbXlzcWw6NS42LjI3CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBNWVNRTF9EQVRBQkFTRT10ZXN0CiAgICAgIC0gTVlTUUxfQUxMT1dfRU1QVFlfUEFTU1dPUkQ9eWVzCg.UUuQ6nvpe9gn78FrfnlNZ2wEKf1Tcl3CTFqKhJKKiCE \ No newline at end of file From 2bb47ed7cb6a2f94b890b0bd5ae79780c8c23506 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Wed, 28 Sep 2016 15:56:19 -0400 Subject: [PATCH 08/14] fix Makefile to pull drone/mq --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 03a8807b0..894d7fa29 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ deps_backend: go get -u golang.org/x/tools/cmd/cover go get -u github.com/jteeuwen/go-bindata/... go get -u github.com/elazarl/go-bindata-assetfs/... + go get -u github.com/drone/mq/... gen: gen_template gen_migrations From 66aa18922f68671da49b9f470071b3c817f3b74c Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Wed, 28 Sep 2016 15:59:58 -0400 Subject: [PATCH 09/14] fixed yaml unit tests --- yaml/transform/image_test.go | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/yaml/transform/image_test.go b/yaml/transform/image_test.go index d5abbd6dc..26ad37dc3 100644 --- a/yaml/transform/image_test.go +++ b/yaml/transform/image_test.go @@ -128,35 +128,5 @@ func Test_normalize(t *testing.T) { g.Assert(c.Pipeline[0].Image).Equal("golang:1.5") }) }) - - g.Describe("plugins", func() { - - g.It("should prepend namespace", func() { - c := newConfig(&yaml.Container{ - Image: "slack", - }) - - ImageNamespace(c, "plugins") - g.Assert(c.Pipeline[0].Image).Equal("plugins/slack") - }) - - g.It("should not override existing namespace", func() { - c := newConfig(&yaml.Container{ - Image: "index.docker.io/drone/git", - }) - - ImageNamespace(c, "plugins") - g.Assert(c.Pipeline[0].Image).Equal("index.docker.io/drone/git") - }) - - g.It("should replace underscores with dashes", func() { - c := newConfig(&yaml.Container{ - Image: "gh_pages", - }) - - ImageName(c) - g.Assert(c.Pipeline[0].Image).Equal("gh-pages") - }) - }) }) } From 17a77127b6ddeddc1c3b317db975b2e8b1917fdb Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 29 Sep 2016 17:23:26 -0400 Subject: [PATCH 10/14] fixed git clone issue --- server/queue.go | 9 ++++++++- yaml/transform/clone.go | 7 +++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/server/queue.go b/server/queue.go index 25d44cb82..155b9c039 100644 --- a/server/queue.go +++ b/server/queue.go @@ -101,7 +101,14 @@ func HandleUpdate(c context.Context, message *stomp.Message) { client := stomp.MustFromContext(c) err = client.SendJSON("/topic/events", model.Event{ - Type: model.Started, + Type: func() model.EventType { + // HACK we don't even really care about the event type. + // so we should just simplify how events are triggered. + if job.Status == model.StatusRunning { + return model.Started + } + return model.Finished + }(), Repo: *work.Repo, Build: *build, Job: *job, diff --git a/yaml/transform/clone.go b/yaml/transform/clone.go index a54406714..60c1e2094 100644 --- a/yaml/transform/clone.go +++ b/yaml/transform/clone.go @@ -6,8 +6,11 @@ const clone = "clone" // Clone transforms the Yaml to include a clone step. func Clone(c *yaml.Config, plugin string) error { - if plugin == "" { - plugin = "plugins/git" + switch plugin { + case "", "git": + plugin = "plugins/git:latest" + case "hg": + plugin = "plugins/hg:latest" } for _, p := range c.Pipeline { From 76b0286b68b5c5554278ddd3a7b24ffeff3425af Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 29 Sep 2016 17:45:13 -0400 Subject: [PATCH 11/14] update agent logs --- agent/updater.go | 5 +++-- build/pipeline.go | 4 ---- drone/agent/agent.go | 27 ++++++++++++++++----------- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/agent/updater.go b/agent/updater.go index e2e1f43ad..90fc999f7 100644 --- a/agent/updater.go +++ b/agent/updater.go @@ -6,6 +6,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/drone/drone/build" "github.com/drone/drone/model" + "github.com/drone/mq/logger" "github.com/drone/mq/stomp" ) @@ -27,7 +28,7 @@ func NewClientUpdater(client *stomp.Client) UpdateFunc { return func(w *model.Work) { err := client.SendJSON("/queue/updates", w) if err != nil { - logrus.Errorf("Error updating %s/%s#%d.%d. %s", + logger.Warningf("Error updating %s/%s#%d.%d. %s", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) } if w.Job.Status != model.StatusRunning { @@ -38,7 +39,7 @@ func NewClientUpdater(client *stomp.Client) UpdateFunc { } if err := client.Send(dest, []byte("eof"), opts...); err != nil { - logrus.Errorf("Error sending eof %s/%s#%d.%d. %s", + logger.Warningf("Error sending eof %s/%s#%d.%d. %s", w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) } } diff --git a/build/pipeline.go b/build/pipeline.go index ddeca757b..7b83d1f46 100644 --- a/build/pipeline.go +++ b/build/pipeline.go @@ -175,7 +175,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { } p.containers = append(p.containers, name) - logrus.Debugf("wait.add(1) for %s logs", name) p.wait.Add(1) go func() { defer func() { @@ -183,7 +182,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { logrus.Errorln("recover writing build output", r) } - logrus.Debugf("wait.done() for %s logs", name) p.wait.Done() }() @@ -217,7 +215,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { return err } - logrus.Debugf("wait.add(1) for %s exit code", name) p.wait.Add(1) go func() { defer func() { @@ -225,7 +222,6 @@ func (p *Pipeline) exec(c *yaml.Container) error { logrus.Errorln("recover writing exit code to output", r) } p.wait.Done() - logrus.Debugf("wait.done() for %s exit code", name) }() p.pipe <- &Line{ diff --git a/drone/agent/agent.go b/drone/agent/agent.go index b3037a701..c9a51eaaa 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -9,7 +9,9 @@ import ( "time" "github.com/drone/drone/model" + "github.com/drone/mq/logger" "github.com/drone/mq/stomp" + "github.com/tidwall/redlog" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -136,9 +138,15 @@ var AgentCmd = cli.Command{ func start(c *cli.Context) { + log := redlog.New(os.Stderr) + log.SetLevel(0) + logger.SetLogger(log) + // debug level if requested by user if c.Bool("debug") { logrus.SetLevel(logrus.DebugLevel) + + log.SetLevel(1) } else { logrus.SetLevel(logrus.WarnLevel) } @@ -152,10 +160,7 @@ func start(c *cli.Context) { accessToken = c.String("drone-token") } - logrus.Infof("Connecting to %s with token %s", - c.String("drone-server"), - accessToken, - ) + logger.Noticef("connecting to server%s", c.String("drone-server")) server := strings.TrimRight(c.String("drone-server"), "/") @@ -203,7 +208,7 @@ func start(c *cli.Context) { // dial the drone server to establish a TCP connection. client, err = stomp.Dial(server) if err != nil { - logrus.Errorf("Failed to establish server connection, %s, retry in %v", err, backoff) + logger.Warningf("connection failed, retry in %v. %s", backoff, err) <-time.After(backoff) continue } @@ -213,7 +218,7 @@ func start(c *cli.Context) { // initialize the stomp session and authenticate. if err = client.Connect(opts...); err != nil { - logrus.Errorf("Failed to establish server session, %s, retry in %v", err, backoff) + logger.Warningf("session failed, retry in %v", backoff, err) <-time.After(backoff) continue } @@ -233,10 +238,10 @@ func start(c *cli.Context) { go handler(m) // HACK until we a channel based Subscribe implementation }), opts...) - logrus.Infof("Server connection establish, ready to process builds.") + logger.Noticef("connection establish, ready to process builds.") <-client.Done() - logrus.Warnf("Server connection interrupted, attempting to reconnect.") + logger.Warningf("connection interrupted, attempting to reconnect.") } } @@ -251,10 +256,10 @@ func handleSignals() { go func() { <-c - logrus.Debugln("SIGTERM received.") - logrus.Debugln("wait for running builds to finish.") + logger.Warningf("SIGTERM received.") + logger.Warningf("wait for running builds to finish.") running.Wait() - logrus.Debugln("done.") + logger.Warningf("done.") os.Exit(0) }() } From 3631c4d4cd4c42bf8b5fcc90d32a44d2db373f56 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Thu, 29 Sep 2016 17:48:17 -0400 Subject: [PATCH 12/14] added log dep --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 894d7fa29..f40359b2f 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,7 @@ deps_backend: go get -u github.com/jteeuwen/go-bindata/... go get -u github.com/elazarl/go-bindata-assetfs/... go get -u github.com/drone/mq/... + go get -u github.com/tidwall/redlog gen: gen_template gen_migrations From 0c38443418c9f10512f9179aacba5dcd653b60a5 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sun, 23 Oct 2016 23:00:55 +0200 Subject: [PATCH 13/14] updated yaml for release --- .drone.yml | 9 +++++---- .drone.yml.sig | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.drone.yml b/.drone.yml index 13c3f85d6..5e81c9d6d 100644 --- a/.drone.yml +++ b/.drone.yml @@ -3,7 +3,7 @@ workspace: path: src/github.com/drone/drone pipeline: - backend: + test: image: golang:1.6 environment: - GO15VENDOREXPERIMENT=1 @@ -23,7 +23,7 @@ pipeline: event: push publish: - image: s3 + image: plugins/s3 acl: public-read bucket: downloads.drone.io source: release/**/*.* @@ -32,10 +32,11 @@ pipeline: branch: master docker: + image: plugins/docker repo: drone/drone - tag: [ "0.5.0-rc" ] + tag: [ "0.5", "0.5.0" "0.5.0-rc" ] when: - branch: feature/mq + branch: master event: push services: diff --git a/.drone.yml.sig b/.drone.yml.sig index 9519ba37a..38ce14128 100644 --- a/.drone.yml.sig +++ b/.drone.yml.sig @@ -1 +1 @@ -eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICBiYWNrZW5kOgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBzMwogICAgYWNsOiBwdWJsaWMtcmVhZAogICAgYnVja2V0OiBkb3dubG9hZHMuZHJvbmUuaW8KICAgIHNvdXJjZTogcmVsZWFzZS8qKi8qLioKICAgIHdoZW46CiAgICAgIGV2ZW50OiBwdXNoCiAgICAgIGJyYW5jaDogbWFzdGVyCgogIGRvY2tlcjoKICAgIHJlcG86IGRyb25lL2Ryb25lCiAgICB0YWc6IFsgIjAuNS4wLXJjIiBdCiAgICB3aGVuOgogICAgICBicmFuY2g6IGZlYXR1cmUvbXEKICAgICAgZXZlbnQ6IHB1c2gKCnNlcnZpY2VzOgogIHBvc3RncmVzOgogICAgaW1hZ2U6IHBvc3RncmVzOjkuNC41CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBQT1NUR1JFU19VU0VSPXBvc3RncmVzCiAgbXlzcWw6CiAgICBpbWFnZTogbXlzcWw6NS42LjI3CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBNWVNRTF9EQVRBQkFTRT10ZXN0CiAgICAgIC0gTVlTUUxfQUxMT1dfRU1QVFlfUEFTU1dPUkQ9eWVzCg.UUuQ6nvpe9gn78FrfnlNZ2wEKf1Tcl3CTFqKhJKKiCE \ No newline at end of file +eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICB0ZXN0OgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBwbHVnaW5zL3MzCiAgICBhY2w6IHB1YmxpYy1yZWFkCiAgICBidWNrZXQ6IGRvd25sb2Fkcy5kcm9uZS5pbwogICAgc291cmNlOiByZWxlYXNlLyoqLyouKgogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKICAgICAgYnJhbmNoOiBtYXN0ZXIKCiAgZG9ja2VyOgogICAgaW1hZ2U6IHBsdWdpbnMvZG9ja2VyCiAgICByZXBvOiBkcm9uZS9kcm9uZQogICAgdGFnOiBbICIwLjUiLCAiMC41LjAiICIwLjUuMC1yYyIgXQogICAgd2hlbjoKICAgICAgYnJhbmNoOiBtYXN0ZXIKICAgICAgZXZlbnQ6IHB1c2gKCnNlcnZpY2VzOgogIHBvc3RncmVzOgogICAgaW1hZ2U6IHBvc3RncmVzOjkuNC41CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBQT1NUR1JFU19VU0VSPXBvc3RncmVzCiAgbXlzcWw6CiAgICBpbWFnZTogbXlzcWw6NS42LjI3CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBNWVNRTF9EQVRBQkFTRT10ZXN0CiAgICAgIC0gTVlTUUxfQUxMT1dfRU1QVFlfUEFTU1dPUkQ9eWVzCg.90S0zjA99wZGnLxd17e20YqMTLRy8qSNdpz5qwdarmA \ No newline at end of file From 6e9c2f4237c016da75f41c39d8d57b60d51b4e78 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sun, 23 Oct 2016 23:05:44 +0200 Subject: [PATCH 14/14] fixed yaml file --- .drone.yml | 2 +- .drone.yml.sig | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.drone.yml b/.drone.yml index 5e81c9d6d..2c364fc92 100644 --- a/.drone.yml +++ b/.drone.yml @@ -34,7 +34,7 @@ pipeline: docker: image: plugins/docker repo: drone/drone - tag: [ "0.5", "0.5.0" "0.5.0-rc" ] + tag: [ "0.5", "0.5.0", "0.5.0-rc" ] when: branch: master event: push diff --git a/.drone.yml.sig b/.drone.yml.sig index 38ce14128..42b618e41 100644 --- a/.drone.yml.sig +++ b/.drone.yml.sig @@ -1 +1 @@ -eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICB0ZXN0OgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBwbHVnaW5zL3MzCiAgICBhY2w6IHB1YmxpYy1yZWFkCiAgICBidWNrZXQ6IGRvd25sb2Fkcy5kcm9uZS5pbwogICAgc291cmNlOiByZWxlYXNlLyoqLyouKgogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKICAgICAgYnJhbmNoOiBtYXN0ZXIKCiAgZG9ja2VyOgogICAgaW1hZ2U6IHBsdWdpbnMvZG9ja2VyCiAgICByZXBvOiBkcm9uZS9kcm9uZQogICAgdGFnOiBbICIwLjUiLCAiMC41LjAiICIwLjUuMC1yYyIgXQogICAgd2hlbjoKICAgICAgYnJhbmNoOiBtYXN0ZXIKICAgICAgZXZlbnQ6IHB1c2gKCnNlcnZpY2VzOgogIHBvc3RncmVzOgogICAgaW1hZ2U6IHBvc3RncmVzOjkuNC41CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBQT1NUR1JFU19VU0VSPXBvc3RncmVzCiAgbXlzcWw6CiAgICBpbWFnZTogbXlzcWw6NS42LjI3CiAgICBlbnZpcm9ubWVudDoKICAgICAgLSBNWVNRTF9EQVRBQkFTRT10ZXN0CiAgICAgIC0gTVlTUUxfQUxMT1dfRU1QVFlfUEFTU1dPUkQ9eWVzCg.90S0zjA99wZGnLxd17e20YqMTLRy8qSNdpz5qwdarmA \ No newline at end of file +eyJhbGciOiJIUzI1NiJ9.d29ya3NwYWNlOgogIGJhc2U6IC9nbwogIHBhdGg6IHNyYy9naXRodWIuY29tL2Ryb25lL2Ryb25lCgpwaXBlbGluZToKICB0ZXN0OgogICAgaW1hZ2U6IGdvbGFuZzoxLjYKICAgIGVudmlyb25tZW50OgogICAgICAtIEdPMTVWRU5ET1JFWFBFUklNRU5UPTEKICAgIGNvbW1hbmRzOgogICAgICAtIG1ha2UgZGVwcyBnZW4KICAgICAgLSBtYWtlIHRlc3QgdGVzdF9wb3N0Z3JlcyB0ZXN0X215c3FsCgogIGNvbXBpbGU6CiAgICBpbWFnZTogZ29sYW5nOjEuNgogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gR08xNVZFTkRPUkVYUEVSSU1FTlQ9MQogICAgICAtIEdPUEFUSD0vZ28KICAgIGNvbW1hbmRzOgogICAgICAtIGV4cG9ydCBQQVRIPSRQQVRIOiRHT1BBVEgvYmluCiAgICAgIC0gbWFrZSBidWlsZAogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKCiAgcHVibGlzaDoKICAgIGltYWdlOiBwbHVnaW5zL3MzCiAgICBhY2w6IHB1YmxpYy1yZWFkCiAgICBidWNrZXQ6IGRvd25sb2Fkcy5kcm9uZS5pbwogICAgc291cmNlOiByZWxlYXNlLyoqLyouKgogICAgd2hlbjoKICAgICAgZXZlbnQ6IHB1c2gKICAgICAgYnJhbmNoOiBtYXN0ZXIKCiAgZG9ja2VyOgogICAgaW1hZ2U6IHBsdWdpbnMvZG9ja2VyCiAgICByZXBvOiBkcm9uZS9kcm9uZQogICAgdGFnOiBbICIwLjUiLCAiMC41LjAiLCAiMC41LjAtcmMiIF0KICAgIHdoZW46CiAgICAgIGJyYW5jaDogbWFzdGVyCiAgICAgIGV2ZW50OiBwdXNoCgpzZXJ2aWNlczoKICBwb3N0Z3JlczoKICAgIGltYWdlOiBwb3N0Z3Jlczo5LjQuNQogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gUE9TVEdSRVNfVVNFUj1wb3N0Z3JlcwogIG15c3FsOgogICAgaW1hZ2U6IG15c3FsOjUuNi4yNwogICAgZW52aXJvbm1lbnQ6CiAgICAgIC0gTVlTUUxfREFUQUJBU0U9dGVzdAogICAgICAtIE1ZU1FMX0FMTE9XX0VNUFRZX1BBU1NXT1JEPXllcwo.0lD1m6yILbU8ZrSJcZv7Y1CcGEG5zIaJma1C1lUTc7o \ No newline at end of file