diff --git a/.drone.yml b/.drone.yml index 8465f6ee6..6c5921563 100644 --- a/.drone.yml +++ b/.drone.yml @@ -2,13 +2,22 @@ clone: path: github.com/drone/drone build: - image: drone/golang:1.5 - environment: - - GO15VENDOREXPERIMENT=1 - commands: - - make deps gen - - make test test_postgres test_mysql - - make build build_static deb docs + test: + image: drone/golang:1.5 + environment: + - GO15VENDOREXPERIMENT=1 + commands: + - make deps gen + - make test test_postgres test_mysql + - make build + dist: + image: drone/golang:1.5 + environment: + - GO15VENDOREXPERIMENT=1 + commands: + - make build + when: + event: push compose: postgres: @@ -27,10 +36,7 @@ publish: password: $$DOCKER_PASS email: $$DOCKER_EMAIL repo: drone/drone - tag: - - "latest" - - "0.4.1" - - "0.4" + tag: [ "latest" ] when: repo: drone/drone branch: master diff --git a/.gitignore b/.gitignore index 72de66e21..01abb6461 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,4 @@ drone/drone -drone_* *.sqlite *_gen.go *.html diff --git a/Dockerfile b/Dockerfile index ea484565d..de1310eca 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,11 +19,12 @@ ADD contrib/docker/etc/nsswitch.conf /etc/ ENV DATABASE_DRIVER=sqlite3 ENV DATABASE_CONFIG=/var/lib/drone/drone.sqlite -ADD drone_static /drone_static +ADD drone/drone /drone # Alpine Linux doesn't use pam, which means that there is no /etc/nsswitch.conf, # but Go and CGO rely on /etc/nsswitch.conf to check the order of DNS resolving. # To fix this we just create /etc/nsswitch.conf and add the following line: #RUN echo 'hosts: files mdns4_minimal [NOTFOUND=return] dns mdns4' >> /etc/nsswitch.conf -ENTRYPOINT ["/drone_static"] +ENTRYPOINT ["/drone"] +CMD ["serve"] diff --git a/Makefile b/Makefile index 632c8bfe9..b5d68b778 100644 --- a/Makefile +++ b/Makefile @@ -24,11 +24,10 @@ gen_template: gen_migrations: go generate github.com/drone/drone/store/datastore/ddl -build: - go build +build: build_static build_static: - go build --ldflags '-extldflags "-static" -X github.com/drone/drone/version.VersionDev=$(CI_BUILD_NUMBER)' -o drone_static + cd drone && go build --ldflags '-extldflags "-static" -X github.com/drone/drone/version.VersionDev=$(CI_BUILD_NUMBER)' -o drone test: go test -cover $(PACKAGES) diff --git a/api/queue.go b/api/queue.go new file mode 100644 index 000000000..e7176ed10 --- /dev/null +++ b/api/queue.go @@ -0,0 +1,155 @@ +package api + +import ( + "fmt" + "io" + "strconv" + + "github.com/Sirupsen/logrus" + "github.com/drone/drone/bus" + "github.com/drone/drone/model" + "github.com/drone/drone/queue" + "github.com/drone/drone/remote" + "github.com/drone/drone/store" + "github.com/drone/drone/stream" + "github.com/gin-gonic/gin" +) + +// Pull is a long request that polls and attemts to pull work off the queue stack. +func Pull(c *gin.Context) { + logrus.Debugf("Agent %s connected.", c.ClientIP()) + + w := queue.PullClose(c, c.Writer) + if w == nil { + logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) + } else { + c.JSON(202, w) + + logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", + c.ClientIP(), + w.Repo.Owner, + w.Repo.Name, + w.Build.Number, + w.Job.Number, + ) + } +} + +// Wait is a long request that polls and waits for cancelled build requests. +func Wait(c *gin.Context) { + id, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + c.String(500, "Invalid input. %s", err) + return + } + + eventc := make(chan *bus.Event, 1) + + bus.Subscribe(c, eventc) + defer bus.Unsubscribe(c, eventc) + + for { + select { + case event := <-eventc: + if event.Job.ID == id && + event.Job.Status != model.StatusPending && + event.Job.Status != model.StatusRunning { + c.JSON(200, event.Job) + return + } + case <-c.Writer.CloseNotify(): + return + } + } +} + +// Update handles build updates from the agent and persists to the database. +func Update(c *gin.Context) { + work := &queue.Work{} + if err := c.BindJSON(work); err != nil { + logrus.Errorf("Invalid input. %s", err) + return + } + + // TODO(bradrydzewski) it is really annoying that we have to do this lookup + // and I'd prefer not to. The reason we do this is because the Build and Job + // have fields that aren't serialized to json and would be reset to their + // empty values if we just saved what was coming in the http.Request body. + build, err := store.GetBuild(c, work.Build.ID) + if err != nil { + c.String(404, "Unable to find build. %s", err) + return + } + job, err := store.GetJob(c, work.Job.ID) + if err != nil { + c.String(404, "Unable to find job. %s", err) + return + } + build.Started = work.Build.Started + build.Finished = work.Build.Finished + build.Status = work.Build.Status + job.Started = work.Job.Started + job.Finished = work.Job.Finished + job.Status = work.Job.Status + job.ExitCode = work.Job.ExitCode + + ok, err := store.UpdateBuildJob(c, build, job) + if err != nil { + c.String(500, "Unable to update job. %s", err) + return + } + + if ok { + // get the user because we transfer the user form the server to agent + // and back we lose the token which does not get serialized to json. + user, err := store.GetUser(c, work.User.ID) + if err != nil { + c.String(500, "Unable to find user. %s", err) + return + } + bus.Publish(c, &bus.Event{}) + remote.Status(c, user, work.Repo, build, + fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) + } + c.JSON(200, work) +} + +// Stream streams the logs to disk or memory for broadcasing to listeners. Once +// the stream is closed it is moved to permanent storage in the database. +func Stream(c *gin.Context) { + id, err := strconv.ParseInt(c.Param("id"), 10, 64) + if err != nil { + c.String(500, "Invalid input. %s", err) + return + } + + key := stream.ToKey(id) + rc, wc, err := stream.Create(c, key) + if err != nil { + logrus.Errorf("Agent %s failed to create stream. %s.", c.ClientIP(), err) + return + } + + defer func() { + wc.Close() + rc.Close() + stream.Remove(c, key) + }() + + io.Copy(wc, c.Request.Body) + wc.Close() + + rcc, _, err := stream.Open(c, key) + if err != nil { + logrus.Errorf("Agent %s failed to read cache. %s.", c.ClientIP(), err) + return + } + defer func() { + rcc.Close() + }() + + store.WriteLog(c, &model.Job{ID: id}, rcc) + c.String(200, "") + + logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 000000000..db029df99 --- /dev/null +++ b/client/client.go @@ -0,0 +1,235 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strconv" + + "github.com/drone/drone/model" + "github.com/drone/drone/queue" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" + "golang.org/x/oauth2" +) + +const ( + pathPull = "%s/api/queue/pull" + pathWait = "%s/api/queue/wait/%d" + pathStream = "%s/api/queue/stream/%d" + pathPush = "%s/api/queue/status/%d" +) + +type client struct { + client *http.Client + base string // base url +} + +// NewClient returns a client at the specified url. +func NewClient(uri string) Client { + return &client{http.DefaultClient, uri} +} + +// NewClientToken returns a client at the specified url that +// authenticates all outbound requests with the given token. +func NewClientToken(uri, token string) Client { + config := new(oauth2.Config) + auther := config.Client(oauth2.NoContext, &oauth2.Token{AccessToken: token}) + return &client{auther, uri} +} + +// Pull pulls work from the server queue. +func (c *client) Pull() (*queue.Work, error) { + out := new(queue.Work) + uri := fmt.Sprintf(pathPull, c.base) + err := c.post(uri, nil, out) + return out, err +} + +// Push pushes an update to the server. +func (c *client) Push(p *queue.Work) error { + uri := fmt.Sprintf(pathPush, c.base, p.Job.ID) + err := c.post(uri, p, nil) + return err +} + +// Stream streams the build logs to the server. +func (c *client) Stream(id int64, rc io.ReadCloser) error { + uri := fmt.Sprintf(pathStream, c.base, id) + err := c.post(uri, rc, nil) + return err +} + +// Wait watches and waits for the build to cancel or finish. +func (c *client) Wait(id int64) *Wait { + ctx, cancel := context.WithCancel(context.Background()) + return &Wait{id, c, ctx, cancel} +} + +//////// + +type CancelNotifier interface { + Canecel() + CancelNotify() bool + IsCancelled() bool +} + +//////// + +type Wait struct { + id int64 + client *client + + ctx context.Context + cancel context.CancelFunc +} + +func (w *Wait) Done() (*model.Job, error) { + uri := fmt.Sprintf(pathWait, w.client.base, w.id) + req, err := w.client.createRequest(uri, "POST", nil) + if err != nil { + return nil, err + } + + res, err := ctxhttp.Do(w.ctx, w.client.client, req) + if err != nil { + return nil, err + } + defer res.Body.Close() + job := &model.Job{} + err = json.NewDecoder(res.Body).Decode(&job) + if err != nil { + return nil, err + } + return job, nil +} + +func (w *Wait) Cancel() { + w.cancel() +} + +// +// http request helper functions +// + +// helper function for making an http GET request. +func (c *client) get(rawurl string, out interface{}) error { + return c.do(rawurl, "GET", nil, out) +} + +// helper function for making an http POST request. +func (c *client) post(rawurl string, in, out interface{}) error { + return c.do(rawurl, "POST", in, out) +} + +// helper function for making an http PUT request. +func (c *client) put(rawurl string, in, out interface{}) error { + return c.do(rawurl, "PUT", in, out) +} + +// helper function for making an http PATCH request. +func (c *client) patch(rawurl string, in, out interface{}) error { + return c.do(rawurl, "PATCH", in, out) +} + +// helper function for making an http DELETE request. +func (c *client) delete(rawurl string) error { + return c.do(rawurl, "DELETE", nil, nil) +} + +// helper function to make an http request +func (c *client) do(rawurl, method string, in, out interface{}) error { + // executes the http request and returns the body as + // and io.ReadCloser + body, err := c.open(rawurl, method, in, out) + if err != nil { + return err + } + defer body.Close() + + // if a json response is expected, parse and return + // the json response. + if out != nil { + return json.NewDecoder(body).Decode(out) + } + return nil +} + +// helper function to open an http request +func (c *client) open(rawurl, method string, in, out interface{}) (io.ReadCloser, error) { + uri, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + // creates a new http request to bitbucket. + req, err := http.NewRequest(method, uri.String(), nil) + if err != nil { + return nil, err + } + + // if we are posting or putting data, we need to + // write it to the body of the request. + if in != nil { + rc, ok := in.(io.ReadCloser) + if ok { + req.Body = rc + req.Header.Set("Content-Type", "plain/text") + } else { + inJson, err := json.Marshal(in) + if err != nil { + return nil, err + } + + buf := bytes.NewBuffer(inJson) + req.Body = ioutil.NopCloser(buf) + + req.ContentLength = int64(len(inJson)) + req.Header.Set("Content-Length", strconv.Itoa(len(inJson))) + req.Header.Set("Content-Type", "application/json") + } + } + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode > http.StatusPartialContent { + defer resp.Body.Close() + out, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf(string(out)) + } + return resp.Body, nil +} + +// createRequest is a helper function that builds an http.Request. +func (c *client) createRequest(rawurl, method string, in interface{}) (*http.Request, error) { + uri, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + // if we are posting or putting data, we need to + // write it to the body of the request. + var buf io.ReadWriter + if in != nil { + buf = new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(in) + if err != nil { + return nil, err + } + } + + // creates a new http request to bitbucket. + req, err := http.NewRequest(method, uri.String(), buf) + if err != nil { + return nil, err + } + if in != nil { + req.Header.Set("Content-Type", "application/json") + } + return req, nil +} diff --git a/client/interface.go b/client/interface.go new file mode 100644 index 000000000..f307d3146 --- /dev/null +++ b/client/interface.go @@ -0,0 +1,22 @@ +package client + +import ( + "io" + + "github.com/drone/drone/queue" +) + +// Client is used to communicate with a Drone server. +type Client interface { + // Pull pulls work from the server queue. + Pull() (*queue.Work, error) + + // Push pushes an update to the server. + Push(*queue.Work) error + + // Stream streams the build logs to the server. + Stream(int64, io.ReadCloser) error + + // Wait waits for the job to the complete. + Wait(int64) *Wait +} diff --git a/drone/agent/agent.go b/drone/agent/agent.go new file mode 100644 index 000000000..17d5e3690 --- /dev/null +++ b/drone/agent/agent.go @@ -0,0 +1,110 @@ +package agent + +import ( + "sync" + "time" + + "github.com/drone/drone/client" + "github.com/samalba/dockerclient" + + "github.com/Sirupsen/logrus" + "github.com/codegangsta/cli" +) + +// AgentCmd is the exported command for starting the drone agent. +var AgentCmd = cli.Command{ + Name: "agent", + Usage: "starts the drone agent", + Action: start, + Flags: []cli.Flag{ + cli.StringFlag{ + EnvVar: "DOCKER_HOST", + Name: "docker-host", + Usage: "docker deamon address", + Value: "unix:///var/run/docker.sock", + }, + cli.BoolFlag{ + EnvVar: "DOCKER_TLS_VERIFY", + Name: "docker-tls-verify", + Usage: "docker daemon supports tlsverify", + }, + cli.StringFlag{ + EnvVar: "DOCKER_CERT_PATH", + Name: "docker-cert-path", + Usage: "docker certificate directory", + Value: "", + }, + cli.IntFlag{ + EnvVar: "DOCKER_MAX_PROCS", + Name: "docker-max-procs", + Usage: "limit number of running docker processes", + Value: 2, + }, + cli.StringFlag{ + EnvVar: "DRONE_SERVER", + Name: "drone-server", + Usage: "drone server address", + Value: "http://localhost:8000", + }, + cli.StringFlag{ + EnvVar: "DRONE_TOKEN", + Name: "drone-token", + Usage: "drone authorization token", + }, + cli.DurationFlag{ + EnvVar: "BACKOFF", + Name: "drone-backoff", + Usage: "drone server backoff interval", + Value: time.Second * 15, + }, + cli.BoolFlag{ + EnvVar: "DEBUG", + Name: "debug", + Usage: "start the agent in debug mode", + }, + cli.BoolFlag{ + EnvVar: "EXPERIMENTAL", + Name: "experimental", + Usage: "start the agent with experimental features", + }, + }, +} + +func start(c *cli.Context) { + + // debug level if requested by user + if c.Bool("debug") { + logrus.SetLevel(logrus.DebugLevel) + } else { + logrus.SetLevel(logrus.WarnLevel) + } + + client := client.NewClientToken( + c.String("drone-server"), + c.String("drone-token"), + ) + + tls, _ := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) + if c.Bool("docker-host") { + tls.InsecureSkipVerify = true + } + docker, err := dockerclient.NewDockerClient(c.String("docker-host"), tls) + if err != nil { + logrus.Fatal(err) + } + + var wg sync.WaitGroup + for i := 0; i < c.Int("docker-max-procs"); i++ { + wg.Add(1) + go func() { + for { + if err := recoverExec(client, docker); err != nil { + dur := c.Duration("drone-backoff") + logrus.Debugf("Attempting to reconnect in %v", dur) + time.Sleep(dur) + } + } + }() + } + wg.Wait() +} diff --git a/drone/agent/exec.go b/drone/agent/exec.go new file mode 100644 index 000000000..0db150f3b --- /dev/null +++ b/drone/agent/exec.go @@ -0,0 +1,186 @@ +package agent + +import ( + "encoding/json" + "fmt" + "io" + "time" + + "github.com/Sirupsen/logrus" + "github.com/dchest/uniuri" + "github.com/drone/drone/client" + "github.com/drone/drone/engine/compiler" + "github.com/drone/drone/engine/compiler/builtin" + "github.com/drone/drone/engine/runner" + engine "github.com/drone/drone/engine/runner/docker" + "github.com/drone/drone/model" + + "github.com/samalba/dockerclient" + "golang.org/x/net/context" +) + +func recoverExec(client client.Client, docker dockerclient.Client) error { + defer func() { + recover() + }() + return exec(client, docker) +} + +func exec(client client.Client, docker dockerclient.Client) error { + w, err := client.Pull() + if err != nil { + return err + } + + logrus.Infof("Starting build %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + + w.Job.Status = model.StatusRunning + w.Job.Started = time.Now().Unix() + + prefix := fmt.Sprintf("drone_%s", uniuri.New()) + + trans := []compiler.Transform{ + builtin.NewCloneOp("plugins/git:latest", true), + builtin.NewCacheOp( + "plugins/cache:latest", + "/var/lib/drone/cache/"+w.Repo.FullName, + false, + ), + builtin.NewNormalizeOp("plugins"), + builtin.NewWorkspaceOp("/drone", "drone/src/github.com/"+w.Repo.FullName), + builtin.NewEnvOp(map[string]string{ + "CI": "drone", + "CI_REPO": w.Repo.FullName, + "CI_REPO_OWNER": w.Repo.Owner, + "CI_REPO_NAME": w.Repo.Name, + "CI_REPO_LINK": w.Repo.Link, + "CI_REPO_AVATAR": w.Repo.Avatar, + "CI_REPO_BRANCH": w.Repo.Branch, + "CI_REPO_PRIVATE": fmt.Sprintf("%v", w.Repo.IsPrivate), + "CI_REMOTE_URL": w.Repo.Clone, + "CI_COMMIT_SHA": w.Build.Commit, + "CI_COMMIT_REF": w.Build.Ref, + "CI_COMMIT_BRANCH": w.Build.Branch, + "CI_COMMIT_LINK": w.Build.Link, + "CI_COMMIT_MESSAGE": w.Build.Message, + "CI_AUTHOR": w.Build.Author, + "CI_AUTHOR_EMAIL": w.Build.Email, + "CI_AUTHOR_AVATAR": w.Build.Avatar, + "CI_BUILD_NUMBER": fmt.Sprintf("%v", w.Build.Number), + "CI_BUILD_EVENT": w.Build.Event, + // "CI_NETRC_USERNAME": w.Netrc.Login, + // "CI_NETRC_PASSWORD": w.Netrc.Password, + // "CI_NETRC_MACHINE": w.Netrc.Machine, + // "CI_PREV_BUILD_STATUS": w.BuildLast.Status, + // "CI_PREV_BUILD_NUMBER": fmt.Sprintf("%v", w.BuildLast.Number), + // "CI_PREV_COMMIT_SHA": w.BuildLast.Commit, + }), + builtin.NewValidateOp( + w.Repo.IsTrusted, + []string{"plugins/*"}, + ), + builtin.NewShellOp(builtin.Linux_adm64), + builtin.NewArgsOp(), + builtin.NewPodOp(prefix), + builtin.NewAliasOp(prefix), + builtin.NewPullOp(false), + builtin.NewFilterOp( + model.StatusSuccess, // w.BuildLast.Status, + w.Build.Branch, + w.Build.Event, + w.Build.Deploy, + map[string]string{}, + ), + } + + compile := compiler.New() + compile.Transforms(trans) + spec, err := compile.CompileString(w.Yaml) + if err != nil { + // TODO handle error + logrus.Infof("Error compiling Yaml %s/%s#%d %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error()) + return err + } + + if err := client.Push(w); err != nil { + logrus.Errorf("Error persisting update %s/%s#%d.%d. %s", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) + return err + } + + conf := runner.Config{ + Engine: engine.New(docker), + } + + ctx := context.TODO() + ctx, cancel := context.WithCancel(ctx) + + run := conf.Runner(ctx, spec) + run.Run() + defer cancel() + + wait := client.Wait(w.Job.ID) + if err != nil { + return err + } + go func() { + _, werr := wait.Done() + if werr == nil { + logrus.Infof("Cancel build %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + cancel() + } + }() + defer wait.Cancel() + + rc, wc := io.Pipe() + go func() { + err := client.Stream(w.Job.ID, rc) + if err != nil && err != io.ErrClosedPipe { + logrus.Errorf("Error streaming build logs. %s", err) + } + }() + + pipe := run.Pipe() + for { + line := pipe.Next() + if line == nil { + break + } + linejson, _ := json.Marshal(line) + wc.Write(linejson) + wc.Write([]byte{'\n'}) + } + + err = run.Wait() + + pipe.Close() + wc.Close() + rc.Close() + + // catch the build result + if err != nil { + w.Job.ExitCode = 255 + } + if exitErr, ok := err.(*runner.ExitError); ok { + w.Job.ExitCode = exitErr.Code + } + + w.Job.Finished = time.Now().Unix() + + switch w.Job.ExitCode { + case 128, 130: + w.Job.Status = model.StatusKilled + case 0: + w.Job.Status = model.StatusSuccess + default: + w.Job.Status = model.StatusFailure + } + + logrus.Infof("Finished build %s/%s#%d.%d", + w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) + + return client.Push(w) +} diff --git a/drone/drone.go b/drone/drone.go new file mode 100644 index 000000000..cd1f8f811 --- /dev/null +++ b/drone/drone.go @@ -0,0 +1,29 @@ +package main + +import ( + "os" + + "github.com/drone/drone/drone/agent" + "github.com/drone/drone/drone/server" + "github.com/drone/drone/version" + + "github.com/codegangsta/cli" + "github.com/ianschenck/envflag" + _ "github.com/joho/godotenv/autoload" +) + +func main2() { + envflag.Parse() + + app := cli.NewApp() + app.Name = "drone" + app.Version = version.Version + app.Usage = "command line utility" + + app.Commands = []cli.Command{ + agent.AgentCmd, + server.ServeCmd, + } + + app.Run(os.Args) +} diff --git a/drone.go b/drone/main.go similarity index 88% rename from drone.go rename to drone/main.go index a0cae10cc..e08585855 100644 --- a/drone.go +++ b/drone/main.go @@ -2,6 +2,7 @@ package main import ( "net/http" + "os" "time" "github.com/drone/drone/router" @@ -22,6 +23,11 @@ var ( ) func main() { + if os.Getenv("CANARY") == "true" { + main2() + return + } + envflag.Parse() // debug level if requested by user @@ -35,6 +41,9 @@ func main() { handler := router.Load( ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), middleware.Version, + middleware.Queue(), + middleware.Stream(), + middleware.Bus(), middleware.Cache(), middleware.Store(), middleware.Remote(), diff --git a/drone/server/server.go b/drone/server/server.go new file mode 100644 index 000000000..551507a4a --- /dev/null +++ b/drone/server/server.go @@ -0,0 +1,89 @@ +package server + +import ( + "net/http" + "time" + + "github.com/drone/drone/router" + "github.com/drone/drone/router/middleware" + + "github.com/Sirupsen/logrus" + "github.com/codegangsta/cli" + "github.com/gin-gonic/contrib/ginrus" +) + +// ServeCmd is the exported command for starting the drone server. +var ServeCmd = cli.Command{ + Name: "serve", + Usage: "starts the drone server", + Action: func(c *cli.Context) { + if err := start(c); err != nil { + logrus.Fatal(err) + } + }, + Flags: []cli.Flag{ + cli.StringFlag{ + EnvVar: "SERVER_ADDR", + Name: "server-addr", + Usage: "server address", + Value: ":8000", + }, + cli.StringFlag{ + EnvVar: "SERVER_CERT", + Name: "server-cert", + Usage: "server ssl cert", + }, + cli.StringFlag{ + EnvVar: "SERVER_KEY", + Name: "server-key", + Usage: "server ssl key", + }, + cli.BoolFlag{ + EnvVar: "DEBUG", + Name: "debug", + Usage: "start the server in debug mode", + }, + cli.BoolFlag{ + EnvVar: "EXPERIMENTAL", + Name: "experimental", + Usage: "start the server with experimental features", + }, + }, +} + +func start(c *cli.Context) error { + + // debug level if requested by user + if c.Bool("debug") { + logrus.SetLevel(logrus.DebugLevel) + } else { + logrus.SetLevel(logrus.WarnLevel) + } + + // setup the server and start the listener + handler := router.Load( + ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), + middleware.Version, + middleware.Queue(), + middleware.Stream(), + middleware.Bus(), + middleware.Cache(), + middleware.Store(), + middleware.Remote(), + middleware.Engine(), + ) + + if c.String("server-cert") != "" { + return http.ListenAndServeTLS( + c.String("server-addr"), + c.String("server-cert"), + c.String("server-key"), + handler, + ) + } + + return http.ListenAndServe( + c.String("server-addr"), + handler, + ) +} diff --git a/engine/engine/engine.go b/engine/engine/engine.go deleted file mode 100644 index 072a3a4cd..000000000 --- a/engine/engine/engine.go +++ /dev/null @@ -1,158 +0,0 @@ -package engine - -import ( - "fmt" - "time" - - "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" - "github.com/drone/drone/engine/compiler" - "github.com/drone/drone/engine/runner" - "github.com/drone/drone/engine/runner/docker" - "github.com/drone/drone/model" - "github.com/drone/drone/queue" - "github.com/drone/drone/store" - "github.com/drone/drone/stream" - "golang.org/x/net/context" -) - -// Poll polls the build queue for build jobs. -func Poll(c context.Context) { - for { - pollRecover(c) - } -} - -func pollRecover(c context.Context) { - defer recover() - poll(c) -} - -func poll(c context.Context) { - w := queue.Pull(c) - - logrus.Infof("Starting build %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - - rc, wc, err := stream.Create(c, stream.ToKey(w.Job.ID)) - if err != nil { - logrus.Errorf("Error opening build stream %s/%s#%d.%d. %s", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number, err) - } - - defer func() { - wc.Close() - rc.Close() - stream.Remove(c, stream.ToKey(w.Job.ID)) - }() - - w.Job.Status = model.StatusRunning - w.Job.Started = time.Now().Unix() - - quitc := make(chan bool, 1) - eventc := make(chan *bus.Event, 1) - bus.Subscribe(c, eventc) - - compile := compiler.New() - compile.Transforms(nil) - spec, err := compile.CompileString(w.Yaml) - if err != nil { - // TODO handle error - logrus.Infof("Error compiling Yaml %s/%s#%d %s", - w.Repo.Owner, w.Repo.Name, w.Build.Number, err.Error()) - return - } - - defer func() { - bus.Unsubscribe(c, eventc) - quitc <- true - }() - - ctx := context.TODO() - ctx, cancel := context.WithCancel(ctx) - - // TODO store the started build in the database - // TODO publish the started build - store.UpdateJob(c, w.Job) - //store.Write(c, w.Job, rc) - bus.Publish(c, bus.NewEvent(bus.Started, w.Repo, w.Build, w.Job)) - - conf := runner.Config{ - Engine: docker.FromContext(c), - } - - run := conf.Runner(ctx, spec) - run.Run() - defer cancel() - - go func() { - for { - select { - case event := <-eventc: - if event.Type == bus.Cancelled && event.Job.ID == w.Job.ID { - logrus.Infof("Cancel build %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - cancel() - } - case <-quitc: - return - } - } - }() - - pipe := run.Pipe() - for { - line := pipe.Next() - if line == nil { - break - } - fmt.Println(line) - } - - err = run.Wait() - - // catch the build result - if err != nil { - w.Job.ExitCode = 255 - } - if exitErr, ok := err.(*runner.ExitError); ok { - w.Job.ExitCode = exitErr.Code - } - - w.Job.Finished = time.Now().Unix() - - switch w.Job.ExitCode { - case 128, 130: - w.Job.Status = model.StatusKilled - case 0: - w.Job.Status = model.StatusSuccess - default: - w.Job.Status = model.StatusFailure - } - - // store the finished build in the database - logs, _, err := stream.Open(c, stream.ToKey(w.Job.ID)) - if err != nil { - logrus.Errorf("Error reading build stream %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - } - defer func() { - if logs != nil { - logs.Close() - } - }() - if err := store.WriteLog(c, w.Job, logs); err != nil { - logrus.Errorf("Error persisting build stream %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) - } - if logs != nil { - logs.Close() - } - - // TODO publish the finished build - store.UpdateJob(c, w.Job) - bus.Publish(c, bus.NewEvent(bus.Finished, w.Repo, w.Build, w.Job)) - - logrus.Infof("Finished build %s/%s#%d.%d", - w.Repo.Owner, w.Repo.Name, w.Build.Number, w.Job.Number) -} diff --git a/model/repo.go b/model/repo.go index 062f53131..271017abe 100644 --- a/model/repo.go +++ b/model/repo.go @@ -1,5 +1,7 @@ package model +import "strconv" + type RepoLite struct { Owner string `json:"owner"` Name string `json:"name"` @@ -31,3 +33,17 @@ type Repo struct { AllowTag bool `json:"allow_tags" meddler:"repo_allow_tags"` Hash string `json:"-" meddler:"repo_hash"` } + +// ToEnv returns environment variable valus for the repository. +func (r *Repo) ToEnv(to map[string]string) { + to["CI_VCS"] = r.Kind + to["CI_REPO"] = r.FullName + to["CI_REPO_OWNER"] = r.Owner + to["CI_REPO_NAME"] = r.Name + to["CI_REPO_LINK"] = r.Link + to["CI_REPO_AVATAR"] = r.Avatar + to["CI_REPO_BRANCH"] = r.Branch + to["CI_REPO_PRIVATE"] = strconv.FormatBool(r.IsPrivate) + to["CI_REPO_TRUSTED"] = strconv.FormatBool(r.IsTrusted) + to["CI_REMOTE_URL"] = r.Clone +} diff --git a/router/middleware/bus.go b/router/middleware/bus.go new file mode 100644 index 000000000..b5f5c57d5 --- /dev/null +++ b/router/middleware/bus.go @@ -0,0 +1,14 @@ +package middleware + +import ( + "github.com/drone/drone/bus" + "github.com/gin-gonic/gin" +) + +func Bus() gin.HandlerFunc { + bus_ := bus.New() + return func(c *gin.Context) { + bus.ToContext(c, bus_) + c.Next() + } +} diff --git a/router/middleware/queue.go b/router/middleware/queue.go new file mode 100644 index 000000000..692a5432e --- /dev/null +++ b/router/middleware/queue.go @@ -0,0 +1,14 @@ +package middleware + +import ( + "github.com/drone/drone/queue" + "github.com/gin-gonic/gin" +) + +func Queue() gin.HandlerFunc { + queue_ := queue.New() + return func(c *gin.Context) { + queue.ToContext(c, queue_) + c.Next() + } +} diff --git a/router/middleware/stream.go b/router/middleware/stream.go new file mode 100644 index 000000000..43bdabe05 --- /dev/null +++ b/router/middleware/stream.go @@ -0,0 +1,14 @@ +package middleware + +import ( + "github.com/drone/drone/stream" + "github.com/gin-gonic/gin" +) + +func Stream() gin.HandlerFunc { + stream_ := stream.New() + return func(c *gin.Context) { + stream.ToContext(c, stream_) + c.Next() + } +} diff --git a/router/router.go b/router/router.go index 84587352a..6bce12a18 100644 --- a/router/router.go +++ b/router/router.go @@ -2,6 +2,7 @@ package router import ( "net/http" + "os" "strings" "github.com/gin-gonic/gin" @@ -136,8 +137,14 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { stream.Use(session.SetRepo()) stream.Use(session.SetPerm()) stream.Use(session.MustPull) - stream.GET("/:owner/:name", web.GetRepoEvents) - stream.GET("/:owner/:name/:build/:number", web.GetStream) + + if os.Getenv("CANARY") == "true" { + stream.GET("/:owner/:name", web.GetRepoEvents2) + stream.GET("/:owner/:name/:build/:number", web.GetStream2) + } else { + stream.GET("/:owner/:name", web.GetRepoEvents) + stream.GET("/:owner/:name/:build/:number", web.GetStream) + } } bots := e.Group("/bots") @@ -154,6 +161,14 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { auth.POST("/token", web.GetLoginToken) } + queue := e.Group("/api/queue") + { + queue.POST("/pull", api.Pull) + queue.POST("/wait/:id", api.Wait) + queue.POST("/stream/:id", api.Stream) + queue.POST("/status/:id", api.Update) + } + gitlab := e.Group("/gitlab/:owner/:name") { gitlab.Use(session.SetRepo()) diff --git a/store/store.go b/store/store.go index 1ee916ec3..5698460a5 100644 --- a/store/store.go +++ b/store/store.go @@ -275,6 +275,40 @@ func UpdateBuild(c context.Context, build *model.Build) error { return FromContext(c).UpdateBuild(build) } +func UpdateBuildJob(c context.Context, build *model.Build, job *model.Job) (bool, error) { + if err := UpdateJob(c, job); err != nil { + return false, err + } + jobs, err := GetJobList(c, build) + if err != nil { + return false, err + } + // check to see if all jobs are finished for this build. If yes, we need to + // calcualte the overall build status and finish time. + status := model.StatusSuccess + finish := job.Finished + for _, job := range jobs { + if job.Finished > finish { + finish = job.Finished + } + switch job.Status { + case model.StatusSuccess: + // no-op + case model.StatusRunning, model.StatusPending: + return false, nil + default: + status = job.Status + } + } + + build.Status = status + build.Finished = finish + if err := FromContext(c).UpdateBuild(build); err != nil { + return false, err + } + return true, nil +} + func GetJob(c context.Context, id int64) (*model.Job, error) { return FromContext(c).GetJob(id) }