// Copyright 2019 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package lkml provides a controller which processes strings received
// containing mailing list patches, applies them to specified repo,
// and triggers ProwJobs.
package lkml
import (
const (
QueueSize = 10 // arbitrary queue size for processing msgs and series
timeout = time.Minute * 5
jobTimeout = time.Hour * 1 // cleanup repo after job is done
repoPath = "/kunit/linux"
// prowjob annotations used by reporter for sending reply
SubjectAnnotation = "prowjobs.mail.subject"
ReplyAnnotation = "prowjobs.mail.reply"
var (
// TODO: test regular expressions
subjectRE = regexp.MustCompile(`Subject:(\s*\[PATCH\s+[^\n\]\r]*?(?:([0-9]+)\/([0-9]+))?\][^\n\r]*)`)
messageIdRE = regexp.MustCompile(`Message-I[Dd]:\s*<(.*)>`)
inReplyToRE = regexp.MustCompile(`In-Reply-To:\s*<(.*)>`)
type patch struct {
subject string
body string
replyTo string
type patchSeries struct {
branch string
patches []patch
num int
numAdded int
ready chan bool
// Lkml receives string email messages from msgQueue,
// converts them to patches / patchSeries,
// stores them in idToSeries by message id of the cover letter or patch in the
// case of a single patch.
// mutex guards access to map
// When entire series has been received, it will be send to the jobQueue for
// applying patches and triggering ProwJobs.
type Lkml struct {
msgQueue <-chan string
seriesQueue chan patchSeries
idToSeries map[string]*patchSeries
mutex sync.Mutex
sourceRepo string
sourceBranch string
jobURI string
cloneURI url.URL
ca *config.Agent
kc *kube.Client
pullNum int
func New(msgQueue <-chan string, sourceRepo string, sourceBranch string, jobURI string, clusterIP string, ca *config.Agent, kc *kube.Client) Lkml {
// TODO validate args
l := Lkml{
msgQueue: msgQueue,
seriesQueue: make(chan patchSeries, QueueSize),
idToSeries: make(map[string]*patchSeries),
sourceRepo: sourceRepo,
sourceBranch: sourceBranch,
jobURI: jobURI,
cloneURI: url.URL{},
ca: ca,
kc: kc,
return l
func (l *Lkml) Run() error {
err := l.initStorage()
if err != nil {
log.Fatal("Failed to initialize git repo storage: %v", err)
done := make(chan error, 2)
go func() {
* git daemon currently causes excessive memory usage
* using dumb http instead for now which works
* cmd := exec.Command("git", "daemon", "--verbose", "--reuseaddr", "--export-all", "--base-path=/kunit", "/kunit")
* TODO: fix git daemon memory issues, then need to change
* cloneURI to git://{IP}/linux instead of http://{IP}:8080/linux.git
err := http.ListenAndServe(":8080", http.FileServer(http.Dir("/kunit")))
done <- fmt.Errorf("GIT server died: %v", err)
// handle messages and dispatch jobs once series received
go func() {
for msg := range l.msgQueue {
if err := l.handleMessage(msg); err != nil {
log.Printf("HandleMessageQueue: %v", err)
done <- errors.New("Message queue closed.")
// apply patch -> trigger in order
go func() {
for patch := range l.seriesQueue {
if err := l.handleJob(patch); err != nil {
log.Printf("HandleJobQueue: %v", err)
done <- errors.New("Series queue closed.")
// quit if git server dies or smtp server dies & queues empty
return <-done
// run git command in repo dir
func runGitCommand(args ...string) error {
allArgs := append([]string{"-C", repoPath}, args...)
cmd := exec.Command("git", allArgs...)
err := cmd.Run()
return err
func (l *Lkml) initStorage() error {
// set configs
exec.Command("git", "config", "--global", "", "").Run()
exec.Command("git", "config", "--global", "", "Prow Robot").Run()
exec.Command("git", "config", "--global", "pack.windowMemory", "100m").Run()
exec.Command("git", "config", "--global", "pack.packSizeLimit", "100m").Run()
exec.Command("git", "config", "--global", "pack.threads", "1").Run()
// if existing, update and return
if repoDir, err := os.Open(repoPath); err == nil {
err1 := runGitCommand("checkout", l.sourceBranch)
err2 := runGitCommand("pull")
if err1 == nil || err2 == nil {
return nil
// otherwise reclone
exec.Command("rm", "-rf", repoPath)
exec.Command("git", "clone", l.sourceRepo, repoPath).Run()
runGitCommand("checkout", l.sourceBranch)
exec.Command("mv", repoPath+"/.git/hooks/post-update.sample", repoPath+"/.git/hooks/post-update").Run()
return nil
func (l *Lkml) newSeries(branchName string, numPatches int) (*patchSeries, error) {
defer l.mutex.Unlock()
_, exists := l.idToSeries[branchName]
if exists == true {
return nil, fmt.Errorf("Patch Series with id {%s} already existed", branchName)
// add series to map
patch := patchSeries{
branch: branchName,
patches: make([]patch, numPatches),
num: numPatches,
numAdded: 0,
ready: make(chan bool),
l.idToSeries[branchName] = &patch
return &patch, nil
func (l *Lkml) addPatchToSeries(branchName string, patchInd int,
subjectLine string, bodyString string, replyToString string) error {
defer l.mutex.Unlock()
series, exists := l.idToSeries[branchName]
if exists == false {
return fmt.Errorf("Patch Series with id {%s} doesn't existed", branchName)
if patch := series.patches[patchInd]; patch.subject != "" {
return fmt.Errorf("Patch Series with id {%s} already has patch #%d/%d with subject line {%s}",
branchName, patchInd, len(series.patches), patch.subject)
series.patches[patchInd] = patch{subject: subjectLine, body: bodyString, replyTo: replyToString}
series.numAdded += 1
if series.numAdded == series.num {
defer func() { series.ready <- true }()
return nil
* Parse message body to identify one of 4 types of messages received:
* Not a patch : ignore email
* Single patch : send to seriesQueue immediately
* Cover of multi patch : Create Series and wait until timeout for others to arrive.
* If patches arrive, copy to seriesQueue.
* Series will be removed from map regardless by timeout.
* Member patch : Add patch to series by message id utilizing In-Reply-To header
* which references the cover letter under shallow threading
* Note: All access to map and its series are managed by mutex to ensure synchronization.
* This is the most flexible way to ensure proper ordering as emails may arrive
* out of order.
func (l *Lkml) handleMessage(body string) error {
var err error
// Get message id for replying in thread and mapping series
idMatch := messageIdRE.FindStringSubmatch(body)
if idMatch == nil || idMatch[1] == "" {
err = errors.New("Message has no ID")
return err
patchID := idMatch[1]
// get subject line and patch information (x and n from "Patch x/n")
subjectMatch := subjectRE.FindStringSubmatch(body)
if subjectMatch == nil {
err = errors.New("Message is not part of a patch")
return err
subjectLine := subjectMatch[1]
patchInd := 0
if subjectMatch[2] != "" {
patchInd, err = strconv.Atoi(subjectMatch[2])
patchNum := 0
if subjectMatch[3] != "" {
patchNum, err = strconv.Atoi(subjectMatch[3])
// check if single patch
if patchNum == 0 {
l.handleSinglePatch(body, patchID, subjectLine)
// cover letter
if patchInd == 0 {
l.handleCoverLetter(patchID, patchNum)
// patch in series
replyMatch := inReplyToRE.FindStringSubmatch(body)
if replyMatch == nil || replyMatch[1] == "" {
err = fmt.Errorf("Patch in series has no reference to cover letter!")
return err
seriesID := replyMatch[1]
// add patch to series
err = l.addPatchToSeries(seriesID, patchInd-1, subjectLine, body, patchID)
if err != nil {
return err
return nil
func (l *Lkml) handleCoverLetter(patchID string, patchNum int) error {
series, err := l.newSeries(patchID, patchNum)
if err != nil {
return err
go l.handleSeries(series)
return nil
func (l *Lkml) handleSinglePatch(body string, patchID string, subjectLine string) {
patch := patchSeries{
branch: patchID,
patches: []patch{{
subject: subjectLine,
body: body,
replyTo: patchID,
num: 1,
numAdded: 1,
l.seriesQueue <- patch
* Add to seriesQueue once all patches are in.
* Timeout specified as constant to delete outdated patches from map
func (l *Lkml) handleSeries(series *patchSeries) error {
shouldTrigger := false
select {
case <-series.ready:
shouldTrigger = true
case <-time.After(timeout):
log.Print("TIMED OUT")
if shouldTrigger {
l.seriesQueue <- *series
// delete from map
delete(l.idToSeries, series.branch)
return nil
func (l *Lkml) handleJob(series patchSeries) error {
runGitCommand("pull") // update main branch
// TODO : need to delete branches after use somehow, possibly launch a
// goroutine after triggering for time to delete
runGitCommand("checkout", l.sourceBranch)
err := runGitCommand("checkout", "-b", series.branch)
if err != nil {
runGitCommand("checkout", l.sourceBranch)
runGitCommand("branch", "-D", series.branch)
runGitCommand("checkout", "-b", series.branch)
// TODO : handle multiple jobs mapped to the same uri
// when we want to break jobs into multiple test suites, will need to
// iterate over the the array of presubmits for specific job uri
conf :=[l.jobURI][0]
// need to also get tag as this git server doesn't let you fetch from sha
// set tag of new commit to sha for fetching
prevRef := series.branch + "base"
err = runGitCommand("tag", prevRef)
if err != nil {
log.Printf("tag error %v", err)
baseURI := l.cloneURI.String()
for i, p := range series.patches {
// apply mail using git am
err := runGitCommand("am")
if err != nil {
log.Printf("Patch cannot be applied: %v", err)
runGitCommand("am", "--abort")
break // abort remaining jobs
// TODO possibly email if patch could not be applied
// set tag of new commit to message id for fetching (see above)
ref := p.replyTo
runGitCommand("tag", ref)
log.Printf("\t Patch #%d has tag %s", i, ref)
refs := kube.Refs{
Org: l.cloneURI.Host,
Repo: l.cloneURI.Path[1:],
BaseRef: prevRef,
BaseSHA: "",
CloneURI: baseURI,
Pulls: []kube.Pull{{
// plank uses pull number to find duplicate jobs
// need to monotonically increase
// otherwise change job name, org, or repo
Number: l.pullNum,
Ref: ref,
prevRef = ref
annotations := map[string]string{
SubjectAnnotation: p.subject,
ReplyAnnotation: p.replyTo,
runGitCommand("update-server-info") // update with new tag
pr := pjutil.PresubmitSpec(conf, refs)
pj := pjutil.NewProwJobWithAnnotation(pr, conf.Labels, annotations)
_, err = l.kc.CreateProwJob(pj)
if err != nil {
log.Printf("Error in triggering prowjob : %v", err)
return err
l.pullNum += 1 // increase pull number
runGitCommand("checkout", l.sourceBranch)
// go cleanupBranch(series.branch) // enable after testing
return nil
func (l *Lkml) cleanupBranch(branch string) {
runGitCommand("branch", "-D", branch)
// need to delete created tags
// git fetch --prune <remote> +refs/tags/*:refs/tags/*