Add lkml package

The lkml package provides a controller which processes strings received
containing mailing list patches, applies them to specified repo,
and triggers ProwJobs while serving the repo intra-cluster.

Build with `bazel build lkml:go_default_library`

Note : Currently experiences memory issue with large repos when using
git-daemon (over 4 GB while cloning) so currently serving over http.
Configuring git pack-objects to use less memory did not seem to work.

Change-Id: I4ae96085f586b082df16316b4df3297eb58898f6
Signed-off-by: Avi Kondareddy <avikr@google.com>
diff --git a/WORKSPACE b/WORKSPACE
new file mode 100644
index 0000000..fea7477
--- /dev/null
+++ b/WORKSPACE
@@ -0,0 +1,95 @@
+workspace(name = "prow_lkml")
+
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+# rules for go
+http_archive(
+    name = "io_bazel_rules_go",
+    sha256 = "b7a62250a3a73277ade0ce306d22f122365b513f5402222403e507f2f997d421",
+    urls = ["https://github.com/bazelbuild/rules_go/releases/download/0.16.3/rules_go-0.16.3.tar.gz"],
+)
+
+load("@io_bazel_rules_go//go:def.bzl", "go_register_toolchains", "go_rules_dependencies")
+
+go_rules_dependencies()
+
+go_register_toolchains()
+
+http_archive(
+    name = "bazel_gazelle",
+    sha256 = "6e875ab4b6bf64a38c352887760f21203ab054676d9c1b274963907e0768740d",
+    urls = ["https://github.com/bazelbuild/bazel-gazelle/releases/download/0.15.0/bazel-gazelle-0.15.0.tar.gz"],
+)
+
+load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies")
+
+gazelle_dependencies()
+
+# rules for docker
+http_archive(
+    name = "io_bazel_rules_docker",
+    sha256 = "29d109605e0d6f9c892584f07275b8c9260803bf0c6fcb7de2623b2bedc910bd",
+    strip_prefix = "rules_docker-0.5.1",
+    urls = ["https://github.com/bazelbuild/rules_docker/archive/v0.5.1.tar.gz"],
+)
+
+load(
+    "@io_bazel_rules_docker//container:container.bzl",
+    "container_pull",
+    container_repositories = "repositories",
+)
+
+container_repositories()
+
+load(
+    "@io_bazel_rules_docker//go:image.bzl",
+    _go_image_repos = "repositories",
+)
+
+_go_image_repos()
+
+# base images
+container_pull(
+    name = "git_base",
+    registry = "gcr.io",
+    repository = "k8s-prow/git",
+    tag = "0.2",
+)
+
+container_pull(
+    name = "debian_base",
+    registry = "index.docker.io",
+    repository = "library/debian",
+)
+
+container_pull(
+    name = "alpine_base",
+    registry = "index.docker.io",
+    repository = "library/alpine",
+)
+
+# import git daemon
+http_archive(
+    name = "git_daemon",
+    build_file_content = """
+load("@bazel_tools//tools/build_defs/pkg:pkg.bzl", "pkg_tar")
+
+pkg_tar(
+    name = "files",
+    srcs = glob(["**"]),
+    strip_prefix = ".",
+    visibility = ["//visibility:public"],
+)
+    """,
+    type = "tar.gz",
+    urls = ["http://dl-cdn.alpinelinux.org/alpine/v3.6/main/x86_64/git-daemon-2.13.7-r2.apk"],
+)
+
+load("@bazel_gazelle//:deps.bzl", "go_repository")
+load("@bazel_tools//tools/build_defs/repo:git.bzl", "git_repository")
+
+git_repository(
+    name = "test_infra",
+    branch = "master",
+    remote = "https://github.com/kubernetes/test-infra",
+)
diff --git a/lkml/BUILD.bazel b/lkml/BUILD.bazel
new file mode 100644
index 0000000..ef531fb
--- /dev/null
+++ b/lkml/BUILD.bazel
@@ -0,0 +1,13 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "go_default_library",
+    srcs = ["lkml.go"],
+    importpath = "prow-lkml/lkml",
+    visibility = ["//visibility:public"],
+    deps = [
+        "@test_infra//prow/config:go_default_library",
+        "@test_infra//prow/kube:go_default_library",
+        "@test_infra//prow/pjutil:go_default_library",
+    ],
+)
diff --git a/lkml/lkml.go b/lkml/lkml.go
new file mode 100644
index 0000000..0645bd5
--- /dev/null
+++ b/lkml/lkml.go
@@ -0,0 +1,430 @@
+// Package lkml provides a controller which processes strings received
+// containing mailing list patches, applies them to specified repo,
+// and triggers ProwJobs.
+package lkml
+
+import (
+	"errors"
+	"fmt"
+	"k8s.io/test-infra/prow/config"
+	"k8s.io/test-infra/prow/kube"
+	"k8s.io/test-infra/prow/pjutil"
+	"log"
+	"net/http"
+	"net/url"
+	"os"
+	"os/exec"
+	"regexp"
+	"strconv"
+	"sync"
+	"time"
+)
+
+const (
+	QueueSize  = 10 // arbitrary queue size for processing msgs and series
+	timeout    = time.Minute * 5
+	jobTimeout = time.Hour * 1 // cleanup repo after job is done
+	repoPath   = "/kunit/linux"
+
+	// prowjob annotations used by reporter for sending reply
+	SubjectAnnotation = "prowjobs.mail.subject"
+	ReplyAnnotation   = "prowjobs.mail.reply"
+)
+
+var (
+	// TODO: test regular expressions
+	subjectRE   = regexp.MustCompile(`Subject:(\s*\[PATCH\s+[^\n\]\r]*?(?:([0-9]+)\/([0-9]+))?\][^\n\r]*)`)
+	messageIdRE = regexp.MustCompile(`Message-I[Dd]:\s*<(.*)>`)
+	inReplyToRE = regexp.MustCompile(`In-Reply-To:\s*<(.*)>`)
+)
+
+type patch struct {
+	subject string
+	body    string
+	replyTo string
+}
+
+type patchSeries struct {
+	branch   string
+	patches  []patch
+	num      int
+	numAdded int
+	ready    chan bool
+}
+
+// Lkml receives string email messages from msgQueue,
+// converts them to patches / patchSeries,
+// stores them in idToSeries by message id of the cover letter or patch in the
+// case of a single patch.
+// mutex guards access to map
+// When entire series has been received, it will be send to the jobQueue for
+// applying patches and triggering ProwJobs.
+type Lkml struct {
+	msgQueue     <-chan string
+	seriesQueue  chan patchSeries
+	idToSeries   map[string]*patchSeries
+	mutex        sync.Mutex
+	sourceRepo   string
+	sourceBranch string
+	jobURI       string
+	cloneURI     url.URL
+	ca           *config.Agent
+	kc           *kube.Client
+	pullNum      int
+}
+
+func New(msgQueue <-chan string, sourceRepo string, sourceBranch string, jobURI string, clusterIP string, ca *config.Agent, kc *kube.Client) Lkml {
+	// TODO validate args
+
+	l := Lkml{
+		msgQueue:     msgQueue,
+		seriesQueue:  make(chan patchSeries, QueueSize),
+		idToSeries:   make(map[string]*patchSeries),
+		sourceRepo:   sourceRepo,
+		sourceBranch: sourceBranch,
+		jobURI:       jobURI,
+		cloneURI:     url.URL{},
+		ca:           ca,
+		kc:           kc,
+	}
+
+	return l
+}
+
+func (l *Lkml) Run() error {
+	err := l.initStorage()
+	if err != nil {
+		log.Fatal("Failed to initialize git repo storage: %v", err)
+	}
+
+	done := make(chan error, 2)
+
+	go func() {
+		/*
+		 * git daemon currently causes excessive memory usage
+		 * using dumb http instead for now which works
+		 * cmd := exec.Command("git", "daemon", "--verbose", "--reuseaddr", "--export-all", "--base-path=/kunit", "/kunit")
+		 * TODO: fix git daemon memory issues, then need to change
+		 * cloneURI to git://{IP}/linux instead of http://{IP}:8080/linux.git
+		 */
+		err := http.ListenAndServe(":8080", http.FileServer(http.Dir("/kunit")))
+		done <- fmt.Errorf("GIT server died: %v", err)
+	}()
+
+	// handle messages and dispatch jobs once series received
+	go func() {
+		for msg := range l.msgQueue {
+			if err := l.handleMessage(msg); err != nil {
+				log.Printf("HandleMessageQueue: %v", err)
+			}
+		}
+		done <- errors.New("Message queue closed.")
+	}()
+
+	// apply patch -> trigger in order
+	go func() {
+		for patch := range l.seriesQueue {
+			if err := l.handleJob(patch); err != nil {
+				log.Printf("HandleJobQueue: %v", err)
+			}
+		}
+		done <- errors.New("Series queue closed.")
+	}()
+
+	// quit if git server dies or smtp server dies & queues empty
+	return <-done
+}
+
+// run git command in repo dir
+func runGitCommand(args ...string) error {
+	allArgs := append([]string{"-C", repoPath}, args...)
+	cmd := exec.Command("git", allArgs...)
+	err := cmd.Run()
+	return err
+}
+
+func (l *Lkml) initStorage() error {
+	// set configs
+	exec.Command("git", "config", "--global", "user.email", "prow@robot.com").Run()
+	exec.Command("git", "config", "--global", "user.name", "Prow Robot").Run()
+	exec.Command("git", "config", "--global", "pack.windowMemory", "100m").Run()
+	exec.Command("git", "config", "--global", "pack.packSizeLimit", "100m").Run()
+	exec.Command("git", "config", "--global", "pack.threads", "1").Run()
+
+	// if existing, update and return
+	if repoDir, err := os.Open(repoPath); err == nil {
+		repoDir.Close()
+		err1 := runGitCommand("checkout", l.sourceBranch)
+		err2 := runGitCommand("pull")
+		if err1 == nil || err2 == nil {
+			return nil
+		}
+	}
+
+	// otherwise reclone
+	exec.Command("rm", "-rf", repoPath)
+	exec.Command("git", "clone", l.sourceRepo, repoPath).Run()
+	runGitCommand("checkout", l.sourceBranch)
+	runGitCommand("update-server-info")
+	exec.Command("mv", repoPath+"/.git/hooks/post-update.sample", repoPath+"/.git/hooks/post-update").Run()
+	return nil
+}
+
+func (l *Lkml) newSeries(branchName string, numPatches int) (*patchSeries, error) {
+	l.mutex.Lock()
+	defer l.mutex.Unlock()
+
+	_, exists := l.idToSeries[branchName]
+	if exists == true {
+		return nil, fmt.Errorf("Patch Series with id {%s} already existed", branchName)
+	}
+
+	// add series to map
+	patch := patchSeries{
+		branch:   branchName,
+		patches:  make([]patch, numPatches),
+		num:      numPatches,
+		numAdded: 0,
+		ready:    make(chan bool),
+	}
+
+	l.idToSeries[branchName] = &patch
+
+	return &patch, nil
+}
+
+func (l *Lkml) addPatchToSeries(branchName string, patchInd int,
+	subjectLine string, bodyString string, replyToString string) error {
+
+	l.mutex.Lock()
+	defer l.mutex.Unlock()
+
+	series, exists := l.idToSeries[branchName]
+	if exists == false {
+		return fmt.Errorf("Patch Series with id {%s} doesn't existed", branchName)
+	}
+
+	if patch := series.patches[patchInd]; patch.subject != "" {
+		return fmt.Errorf("Patch Series with id {%s} already has patch #%d/%d with subject line {%s}",
+			branchName, patchInd, len(series.patches), patch.subject)
+	}
+
+	series.patches[patchInd] = patch{subject: subjectLine, body: bodyString, replyTo: replyToString}
+	series.numAdded += 1
+
+	if series.numAdded == series.num {
+		defer func() { series.ready <- true }()
+	}
+
+	return nil
+}
+
+/*
+ * Parse message body to identify one of 4 types of messages received:
+ *
+ *   Not a patch : ignore email
+ *   Single patch : send to seriesQueue immediately
+ *   Cover of multi patch : Create Series and wait until timeout for others to arrive.
+ *    			    If patches arrive, copy to seriesQueue.
+ *			    Series will be removed from map regardless by timeout.
+ *   Member patch : Add patch to series by message id utilizing In-Reply-To header
+ *		   which references the cover letter under shallow threading
+ *
+ * Note: All access to map and its series are managed by mutex to ensure synchronization.
+ * This is the most flexible way to ensure proper ordering as emails may arrive
+ * out of order.
+ */
+func (l *Lkml) handleMessage(body string) error {
+	var err error
+
+	// Get message id for replying in thread and mapping series
+	idMatch := messageIdRE.FindStringSubmatch(body)
+	if idMatch == nil || idMatch[1] == "" {
+		err = errors.New("Message has no ID")
+		return err
+	}
+	patchID := idMatch[1]
+
+	// get subject line and patch information (x and n from "Patch x/n")
+	subjectMatch := subjectRE.FindStringSubmatch(body)
+	if subjectMatch == nil {
+		err = errors.New("Message is not part of a patch")
+		return err
+	}
+	subjectLine := subjectMatch[1]
+	patchInd := 0
+	if subjectMatch[2] != "" {
+		patchInd, err = strconv.Atoi(subjectMatch[2])
+	}
+	patchNum := 0
+	if subjectMatch[3] != "" {
+		patchNum, err = strconv.Atoi(subjectMatch[3])
+	}
+
+	// check if single patch
+	if patchNum == 0 {
+		l.handleSinglePatch(body, patchID, subjectLine)
+	}
+
+	// cover letter
+	if patchInd == 0 {
+		l.handleCoverLetter(patchID, patchNum)
+	}
+
+	// patch in series
+	replyMatch := inReplyToRE.FindStringSubmatch(body)
+	if replyMatch == nil || replyMatch[1] == "" {
+		err = fmt.Errorf("Patch in series has no reference to cover letter!")
+		return err
+	}
+	seriesID := replyMatch[1]
+
+	// add patch to series
+	err = l.addPatchToSeries(seriesID, patchInd-1, subjectLine, body, patchID)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (l *Lkml) handleCoverLetter(patchID string, patchNum int) error {
+	series, err := l.newSeries(patchID, patchNum)
+	if err != nil {
+		return err
+	}
+
+	go l.handleSeries(series)
+	return nil
+}
+
+func (l *Lkml) handleSinglePatch(body string, patchID string, subjectLine string) {
+	patch := patchSeries{
+		branch: patchID,
+		patches: []patch{{
+			subject: subjectLine,
+			body:    body,
+			replyTo: patchID,
+		}},
+		num:      1,
+		numAdded: 1,
+	}
+
+	l.seriesQueue <- patch
+}
+
+/*
+* Add to seriesQueue once all patches are in.
+* Timeout specified as constant to delete outdated patches from map
+ */
+func (l *Lkml) handleSeries(series *patchSeries) error {
+	shouldTrigger := false
+
+	select {
+	case <-series.ready:
+		shouldTrigger = true
+	case <-time.After(timeout):
+		log.Print("TIMED OUT")
+		break
+	}
+
+	if shouldTrigger {
+		l.seriesQueue <- *series
+	}
+
+	// delete from map
+	l.mutex.Lock()
+	delete(l.idToSeries, series.branch)
+	l.mutex.Unlock()
+
+	return nil
+}
+
+func (l *Lkml) handleJob(series patchSeries) error {
+	runGitCommand("pull") // update main branch
+
+	// TODO : need to delete branches after use somehow, possibly launch a
+	// goroutine after triggering for time to delete
+	runGitCommand("checkout", l.sourceBranch)
+	err := runGitCommand("checkout", "-b", series.branch)
+	if err != nil {
+		runGitCommand("checkout", l.sourceBranch)
+		runGitCommand("branch", "-D", series.branch)
+		runGitCommand("checkout", "-b", series.branch)
+	}
+
+	// TODO : handle multiple jobs mapped to the same uri
+	// when we want to break jobs into multiple test suites, will need to
+	// iterate over the the array of presubmits for specific job uri
+	conf := l.ca.Config().Presubmits[l.jobURI][0]
+
+	// need to also get tag as this git server doesn't let you fetch from sha
+	// set tag of new commit to sha for fetching
+	prevRef := series.branch + "base"
+	err = runGitCommand("tag", prevRef)
+	if err != nil {
+		log.Printf("tag error %v", err)
+	}
+
+	baseURI := l.cloneURI.String()
+
+	for i, p := range series.patches {
+		// apply mail using git am
+		err := runGitCommand("am")
+		if err != nil {
+			log.Printf("Patch cannot be applied: %v", err)
+			runGitCommand("am", "--abort")
+			break // abort remaining jobs
+			// TODO possibly email if patch could not be applied
+		}
+
+		// set tag of new commit to message id for fetching (see above)
+		ref := p.replyTo
+		runGitCommand("tag", ref)
+
+		log.Printf("\t Patch #%d has tag %s", i, ref)
+		refs := kube.Refs{
+			Org:      l.cloneURI.Host,
+			Repo:     l.cloneURI.Path[1:],
+			BaseRef:  prevRef,
+			BaseSHA:  "",
+			CloneURI: baseURI,
+			Pulls: []kube.Pull{{
+				// plank uses pull number to find duplicate jobs
+				// need to monotonically increase
+				// otherwise change job name, org, or repo
+				Number: l.pullNum,
+				Ref:    ref,
+			}},
+		}
+
+		prevRef = ref
+
+		annotations := map[string]string{
+			SubjectAnnotation: p.subject,
+			ReplyAnnotation:   p.replyTo,
+		}
+
+		runGitCommand("update-server-info") // update with new tag
+		pr := pjutil.PresubmitSpec(conf, refs)
+		pj := pjutil.NewProwJobWithAnnotation(pr, conf.Labels, annotations)
+		_, err = l.kc.CreateProwJob(pj)
+		if err != nil {
+			log.Printf("Error in triggering prowjob : %v", err)
+			return err
+		}
+
+		l.pullNum += 1 // increase pull number
+	}
+
+	runGitCommand("checkout", l.sourceBranch)
+
+	// go cleanupBranch(series.branch) // enable after testing
+	return nil
+}
+
+func (l *Lkml) cleanupBranch(branch string) {
+	<-time.After(jobTimeout)
+	runGitCommand("branch", "-D", branch)
+	// need to delete created tags
+	// git fetch --prune <remote> +refs/tags/*:refs/tags/*
+}