Add lkml package
The lkml package provides a controller which processes strings received
containing mailing list patches, applies them to specified repo,
and triggers ProwJobs while serving the repo intra-cluster.
Build with `bazel build lkml:go_default_library`
Note : Currently experiences memory issue with large repos when using
git-daemon (over 4 GB while cloning) so currently serving over http.
Configuring git pack-objects to use less memory did not seem to work.
Change-Id: I4ae96085f586b082df16316b4df3297eb58898f6
Signed-off-by: Avi Kondareddy <avikr@google.com>
diff --git a/WORKSPACE b/WORKSPACE
new file mode 100644
index 0000000..fea7477
--- /dev/null
+++ b/WORKSPACE
@@ -0,0 +1,95 @@
+workspace(name = "prow_lkml")
+
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+# rules for go
+http_archive(
+ name = "io_bazel_rules_go",
+ sha256 = "b7a62250a3a73277ade0ce306d22f122365b513f5402222403e507f2f997d421",
+ urls = ["https://github.com/bazelbuild/rules_go/releases/download/0.16.3/rules_go-0.16.3.tar.gz"],
+)
+
+load("@io_bazel_rules_go//go:def.bzl", "go_register_toolchains", "go_rules_dependencies")
+
+go_rules_dependencies()
+
+go_register_toolchains()
+
+http_archive(
+ name = "bazel_gazelle",
+ sha256 = "6e875ab4b6bf64a38c352887760f21203ab054676d9c1b274963907e0768740d",
+ urls = ["https://github.com/bazelbuild/bazel-gazelle/releases/download/0.15.0/bazel-gazelle-0.15.0.tar.gz"],
+)
+
+load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies")
+
+gazelle_dependencies()
+
+# rules for docker
+http_archive(
+ name = "io_bazel_rules_docker",
+ sha256 = "29d109605e0d6f9c892584f07275b8c9260803bf0c6fcb7de2623b2bedc910bd",
+ strip_prefix = "rules_docker-0.5.1",
+ urls = ["https://github.com/bazelbuild/rules_docker/archive/v0.5.1.tar.gz"],
+)
+
+load(
+ "@io_bazel_rules_docker//container:container.bzl",
+ "container_pull",
+ container_repositories = "repositories",
+)
+
+container_repositories()
+
+load(
+ "@io_bazel_rules_docker//go:image.bzl",
+ _go_image_repos = "repositories",
+)
+
+_go_image_repos()
+
+# base images
+container_pull(
+ name = "git_base",
+ registry = "gcr.io",
+ repository = "k8s-prow/git",
+ tag = "0.2",
+)
+
+container_pull(
+ name = "debian_base",
+ registry = "index.docker.io",
+ repository = "library/debian",
+)
+
+container_pull(
+ name = "alpine_base",
+ registry = "index.docker.io",
+ repository = "library/alpine",
+)
+
+# import git daemon
+http_archive(
+ name = "git_daemon",
+ build_file_content = """
+load("@bazel_tools//tools/build_defs/pkg:pkg.bzl", "pkg_tar")
+
+pkg_tar(
+ name = "files",
+ srcs = glob(["**"]),
+ strip_prefix = ".",
+ visibility = ["//visibility:public"],
+)
+ """,
+ type = "tar.gz",
+ urls = ["http://dl-cdn.alpinelinux.org/alpine/v3.6/main/x86_64/git-daemon-2.13.7-r2.apk"],
+)
+
+load("@bazel_gazelle//:deps.bzl", "go_repository")
+load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")
+
+git_repository(
+ name = "test_infra",
+ branch = "master",
+ remote = "https://github.com/kubernetes/test-infra",
+)
diff --git a/lkml/BUILD.bazel b/lkml/BUILD.bazel
new file mode 100644
index 0000000..ef531fb
--- /dev/null
+++ b/lkml/BUILD.bazel
@@ -0,0 +1,13 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = ["lkml.go"],
+ importpath = "prow-lkml/lkml",
+ visibility = ["//visibility:public"],
+ deps = [
+ "@test_infra//prow/config:go_default_library",
+ "@test_infra//prow/kube:go_default_library",
+ "@test_infra//prow/pjutil:go_default_library",
+ ],
+)
diff --git a/lkml/lkml.go b/lkml/lkml.go
new file mode 100644
index 0000000..0645bd5
--- /dev/null
+++ b/lkml/lkml.go
@@ -0,0 +1,430 @@
+// Package lkml provides a controller which processes strings received
+// containing mailing list patches, applies them to specified repo,
+// and triggers ProwJobs.
+package lkml
+
+import (
+ "errors"
+ "fmt"
+ "k8s.io/test-infra/prow/config"
+ "k8s.io/test-infra/prow/kube"
+ "k8s.io/test-infra/prow/pjutil"
+ "log"
+ "net/http"
+ "net/url"
+ "os"
+ "os/exec"
+ "regexp"
+ "strconv"
+ "sync"
+ "time"
+)
+
+const (
+ QueueSize = 10 // arbitrary queue size for processing msgs and series
+ timeout = time.Minute * 5
+ jobTimeout = time.Hour * 1 // cleanup repo after job is done
+ repoPath = "/kunit/linux"
+
+ // prowjob annotations used by reporter for sending reply
+ SubjectAnnotation = "prowjobs.mail.subject"
+ ReplyAnnotation = "prowjobs.mail.reply"
+)
+
+var (
+ // TODO: test regular expressions
+ subjectRE = regexp.MustCompile(`Subject:(\s*\[PATCH\s+[^\n\]\r]*?(?:([0-9]+)\/([0-9]+))?\][^\n\r]*)`)
+ messageIdRE = regexp.MustCompile(`Message-I[Dd]:\s*<(.*)>`)
+ inReplyToRE = regexp.MustCompile(`In-Reply-To:\s*<(.*)>`)
+)
+
+type patch struct {
+ subject string
+ body string
+ replyTo string
+}
+
+type patchSeries struct {
+ branch string
+ patches []patch
+ num int
+ numAdded int
+ ready chan bool
+}
+
+// Lkml receives string email messages from msgQueue,
+// converts them to patches / patchSeries,
+// stores them in idToSeries by message id of the cover letter or patch in the
+// case of a single patch.
+// mutex guards access to map
+// When entire series has been received, it will be send to the jobQueue for
+// applying patches and triggering ProwJobs.
+type Lkml struct {
+ msgQueue <-chan string
+ seriesQueue chan patchSeries
+ idToSeries map[string]*patchSeries
+ mutex sync.Mutex
+ sourceRepo string
+ sourceBranch string
+ jobURI string
+ cloneURI url.URL
+ ca *config.Agent
+ kc *kube.Client
+ pullNum int
+}
+
+func New(msgQueue <-chan string, sourceRepo string, sourceBranch string, jobURI string, clusterIP string, ca *config.Agent, kc *kube.Client) Lkml {
+ // TODO validate args
+
+ l := Lkml{
+ msgQueue: msgQueue,
+ seriesQueue: make(chan patchSeries, QueueSize),
+ idToSeries: make(map[string]*patchSeries),
+ sourceRepo: sourceRepo,
+ sourceBranch: sourceBranch,
+ jobURI: jobURI,
+ cloneURI: url.URL{},
+ ca: ca,
+ kc: kc,
+ }
+
+ return l
+}
+
+func (l *Lkml) Run() error {
+ err := l.initStorage()
+ if err != nil {
+ log.Fatal("Failed to initialize git repo storage: %v", err)
+ }
+
+ done := make(chan error, 2)
+
+ go func() {
+ /*
+ * git daemon currently causes excessive memory usage
+ * using dumb http instead for now which works
+ * cmd := exec.Command("git", "daemon", "--verbose", "--reuseaddr", "--export-all", "--base-path=/kunit", "/kunit")
+ * TODO: fix git daemon memory issues, then need to change
+ * cloneURI to git://{IP}/linux instead of http://{IP}:8080/linux.git
+ */
+ err := http.ListenAndServe(":8080", http.FileServer(http.Dir("/kunit")))
+ done <- fmt.Errorf("GIT server died: %v", err)
+ }()
+
+ // handle messages and dispatch jobs once series received
+ go func() {
+ for msg := range l.msgQueue {
+ if err := l.handleMessage(msg); err != nil {
+ log.Printf("HandleMessageQueue: %v", err)
+ }
+ }
+ done <- errors.New("Message queue closed.")
+ }()
+
+ // apply patch -> trigger in order
+ go func() {
+ for patch := range l.seriesQueue {
+ if err := l.handleJob(patch); err != nil {
+ log.Printf("HandleJobQueue: %v", err)
+ }
+ }
+ done <- errors.New("Series queue closed.")
+ }()
+
+ // quit if git server dies or smtp server dies & queues empty
+ return <-done
+}
+
+// run git command in repo dir
+func runGitCommand(args ...string) error {
+ allArgs := append([]string{"-C", repoPath}, args...)
+ cmd := exec.Command("git", allArgs...)
+ err := cmd.Run()
+ return err
+}
+
+func (l *Lkml) initStorage() error {
+ // set configs
+ exec.Command("git", "config", "--global", "user.email", "prow@robot.com").Run()
+ exec.Command("git", "config", "--global", "user.name", "Prow Robot").Run()
+ exec.Command("git", "config", "--global", "pack.windowMemory", "100m").Run()
+ exec.Command("git", "config", "--global", "pack.packSizeLimit", "100m").Run()
+ exec.Command("git", "config", "--global", "pack.threads", "1").Run()
+
+ // if existing, update and return
+ if repoDir, err := os.Open(repoPath); err == nil {
+ repoDir.Close()
+ err1 := runGitCommand("checkout", l.sourceBranch)
+ err2 := runGitCommand("pull")
+ if err1 == nil || err2 == nil {
+ return nil
+ }
+ }
+
+ // otherwise reclone
+ exec.Command("rm", "-rf", repoPath)
+ exec.Command("git", "clone", l.sourceRepo, repoPath).Run()
+ runGitCommand("checkout", l.sourceBranch)
+ runGitCommand("update-server-info")
+ exec.Command("mv", repoPath+"/.git/hooks/post-update.sample", repoPath+"/.git/hooks/post-update").Run()
+ return nil
+}
+
+func (l *Lkml) newSeries(branchName string, numPatches int) (*patchSeries, error) {
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ _, exists := l.idToSeries[branchName]
+ if exists == true {
+ return nil, fmt.Errorf("Patch Series with id {%s} already existed", branchName)
+ }
+
+ // add series to map
+ patch := patchSeries{
+ branch: branchName,
+ patches: make([]patch, numPatches),
+ num: numPatches,
+ numAdded: 0,
+ ready: make(chan bool),
+ }
+
+ l.idToSeries[branchName] = &patch
+
+ return &patch, nil
+}
+
+func (l *Lkml) addPatchToSeries(branchName string, patchInd int,
+ subjectLine string, bodyString string, replyToString string) error {
+
+ l.mutex.Lock()
+ defer l.mutex.Unlock()
+
+ series, exists := l.idToSeries[branchName]
+ if exists == false {
+ return fmt.Errorf("Patch Series with id {%s} doesn't existed", branchName)
+ }
+
+ if patch := series.patches[patchInd]; patch.subject != "" {
+ return fmt.Errorf("Patch Series with id {%s} already has patch #%d/%d with subject line {%s}",
+ branchName, patchInd, len(series.patches), patch.subject)
+ }
+
+ series.patches[patchInd] = patch{subject: subjectLine, body: bodyString, replyTo: replyToString}
+ series.numAdded += 1
+
+ if series.numAdded == series.num {
+ defer func() { series.ready <- true }()
+ }
+
+ return nil
+}
+
+/*
+ * Parse message body to identify one of 4 types of messages received:
+ *
+ * Not a patch : ignore email
+ * Single patch : send to seriesQueue immediately
+ * Cover of multi patch : Create Series and wait until timeout for others to arrive.
+ * If patches arrive, copy to seriesQueue.
+ * Series will be removed from map regardless by timeout.
+ * Member patch : Add patch to series by message id utilizing In-Reply-To header
+ * which references the cover letter under shallow threading
+ *
+ * Note: All access to map and its series are managed by mutex to ensure synchronization.
+ * This is the most flexible way to ensure proper ordering as emails may arrive
+ * out of order.
+ */
+func (l *Lkml) handleMessage(body string) error {
+ var err error
+
+ // Get message id for replying in thread and mapping series
+ idMatch := messageIdRE.FindStringSubmatch(body)
+ if idMatch == nil || idMatch[1] == "" {
+ err = errors.New("Message has no ID")
+ return err
+ }
+ patchID := idMatch[1]
+
+ // get subject line and patch information (x and n from "Patch x/n")
+ subjectMatch := subjectRE.FindStringSubmatch(body)
+ if subjectMatch == nil {
+ err = errors.New("Message is not part of a patch")
+ return err
+ }
+ subjectLine := subjectMatch[1]
+ patchInd := 0
+ if subjectMatch[2] != "" {
+ patchInd, err = strconv.Atoi(subjectMatch[2])
+ }
+ patchNum := 0
+ if subjectMatch[3] != "" {
+ patchNum, err = strconv.Atoi(subjectMatch[3])
+ }
+
+ // check if single patch
+ if patchNum == 0 {
+ l.handleSinglePatch(body, patchID, subjectLine)
+ }
+
+ // cover letter
+ if patchInd == 0 {
+ l.handleCoverLetter(patchID, patchNum)
+ }
+
+ // patch in series
+ replyMatch := inReplyToRE.FindStringSubmatch(body)
+ if replyMatch == nil || replyMatch[1] == "" {
+ err = fmt.Errorf("Patch in series has no reference to cover letter!")
+ return err
+ }
+ seriesID := replyMatch[1]
+
+ // add patch to series
+ err = l.addPatchToSeries(seriesID, patchInd-1, subjectLine, body, patchID)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (l *Lkml) handleCoverLetter(patchID string, patchNum int) error {
+ series, err := l.newSeries(patchID, patchNum)
+ if err != nil {
+ return err
+ }
+
+ go l.handleSeries(series)
+ return nil
+}
+
+func (l *Lkml) handleSinglePatch(body string, patchID string, subjectLine string) {
+ patch := patchSeries{
+ branch: patchID,
+ patches: []patch{{
+ subject: subjectLine,
+ body: body,
+ replyTo: patchID,
+ }},
+ num: 1,
+ numAdded: 1,
+ }
+
+ l.seriesQueue <- patch
+}
+
+/*
+* Add to seriesQueue once all patches are in.
+* Timeout specified as constant to delete outdated patches from map
+ */
+func (l *Lkml) handleSeries(series *patchSeries) error {
+ shouldTrigger := false
+
+ select {
+ case <-series.ready:
+ shouldTrigger = true
+ case <-time.After(timeout):
+ log.Print("TIMED OUT")
+ break
+ }
+
+ if shouldTrigger {
+ l.seriesQueue <- *series
+ }
+
+ // delete from map
+ l.mutex.Lock()
+ delete(l.idToSeries, series.branch)
+ l.mutex.Unlock()
+
+ return nil
+}
+
+func (l *Lkml) handleJob(series patchSeries) error {
+ runGitCommand("pull") // update main branch
+
+ // TODO : need to delete branches after use somehow, possibly launch a
+ // goroutine after triggering for time to delete
+ runGitCommand("checkout", l.sourceBranch)
+ err := runGitCommand("checkout", "-b", series.branch)
+ if err != nil {
+ runGitCommand("checkout", l.sourceBranch)
+ runGitCommand("branch", "-D", series.branch)
+ runGitCommand("checkout", "-b", series.branch)
+ }
+
+ // TODO : handle multiple jobs mapped to the same uri
+ // when we want to break jobs into multiple test suites, will need to
+ // iterate over the the array of presubmits for specific job uri
+ conf := l.ca.Config().Presubmits[l.jobURI][0]
+
+ // need to also get tag as this git server doesn't let you fetch from sha
+ // set tag of new commit to sha for fetching
+ prevRef := series.branch + "base"
+ err = runGitCommand("tag", prevRef)
+ if err != nil {
+ log.Printf("tag error %v", err)
+ }
+
+ baseURI := l.cloneURI.String()
+
+ for i, p := range series.patches {
+ // apply mail using git am
+ err := runGitCommand("am")
+ if err != nil {
+ log.Printf("Patch cannot be applied: %v", err)
+ runGitCommand("am", "--abort")
+ break // abort remaining jobs
+ // TODO possibly email if patch could not be applied
+ }
+
+ // set tag of new commit to message id for fetching (see above)
+ ref := p.replyTo
+ runGitCommand("tag", ref)
+
+ log.Printf("\t Patch #%d has tag %s", i, ref)
+ refs := kube.Refs{
+ Org: l.cloneURI.Host,
+ Repo: l.cloneURI.Path[1:],
+ BaseRef: prevRef,
+ BaseSHA: "",
+ CloneURI: baseURI,
+ Pulls: []kube.Pull{{
+ // plank uses pull number to find duplicate jobs
+ // need to monotonically increase
+ // otherwise change job name, org, or repo
+ Number: l.pullNum,
+ Ref: ref,
+ }},
+ }
+
+ prevRef = ref
+
+ annotations := map[string]string{
+ SubjectAnnotation: p.subject,
+ ReplyAnnotation: p.replyTo,
+ }
+
+ runGitCommand("update-server-info") // update with new tag
+ pr := pjutil.PresubmitSpec(conf, refs)
+ pj := pjutil.NewProwJobWithAnnotation(pr, conf.Labels, annotations)
+ _, err = l.kc.CreateProwJob(pj)
+ if err != nil {
+ log.Printf("Error in triggering prowjob : %v", err)
+ return err
+ }
+
+ l.pullNum += 1 // increase pull number
+ }
+
+ runGitCommand("checkout", l.sourceBranch)
+
+ // go cleanupBranch(series.branch) // enable after testing
+ return nil
+}
+
+func (l *Lkml) cleanupBranch(branch string) {
+ <-time.After(jobTimeout)
+ runGitCommand("branch", "-D", branch)
+ // need to delete created tags
+ // git fetch --prune <remote> +refs/tags/*:refs/tags/*
+}