| // Copyright 2019 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package lkml provides a controller which processes strings received |
| // containing mailing list patches, applies them to specified repo, |
| // and triggers ProwJobs. |
| package lkml |
| |
| import ( |
| "errors" |
| "fmt" |
| "k8s.io/test-infra/prow/config" |
| "k8s.io/test-infra/prow/kube" |
| "k8s.io/test-infra/prow/pjutil" |
| "log" |
| "net/http" |
| "net/url" |
| "os" |
| "os/exec" |
| "regexp" |
| "strconv" |
| "sync" |
| "time" |
| ) |
| |
| const ( |
| QueueSize = 10 // arbitrary queue size for processing msgs and series |
| timeout = time.Minute * 5 |
| jobTimeout = time.Hour * 1 // cleanup repo after job is done |
| repoPath = "/kunit/linux" |
| |
| // prowjob annotations used by reporter for sending reply |
| SubjectAnnotation = "prowjobs.mail.subject" |
| ReplyAnnotation = "prowjobs.mail.reply" |
| ) |
| |
| var ( |
| // TODO: test regular expressions |
| subjectRE = regexp.MustCompile(`Subject:(\s*\[PATCH\s+[^\n\]\r]*?(?:([0-9]+)\/([0-9]+))?\][^\n\r]*)`) |
| messageIdRE = regexp.MustCompile(`Message-I[Dd]:\s*<(.*)>`) |
| inReplyToRE = regexp.MustCompile(`In-Reply-To:\s*<(.*)>`) |
| ) |
| |
| type patch struct { |
| subject string |
| body string |
| replyTo string |
| } |
| |
| type patchSeries struct { |
| branch string |
| patches []patch |
| num int |
| numAdded int |
| ready chan bool |
| } |
| |
| // Lkml receives string email messages from msgQueue, |
| // converts them to patches / patchSeries, |
| // stores them in idToSeries by message id of the cover letter or patch in the |
| // case of a single patch. |
| // mutex guards access to map |
| // When entire series has been received, it will be send to the jobQueue for |
| // applying patches and triggering ProwJobs. |
| type Lkml struct { |
| msgQueue <-chan string |
| seriesQueue chan patchSeries |
| idToSeries map[string]*patchSeries |
| mutex sync.Mutex |
| sourceRepo string |
| sourceBranch string |
| jobURI string |
| cloneURI url.URL |
| ca *config.Agent |
| kc *kube.Client |
| pullNum int |
| } |
| |
| func New(msgQueue <-chan string, sourceRepo string, sourceBranch string, jobURI string, clusterIP string, ca *config.Agent, kc *kube.Client) Lkml { |
| // TODO validate args |
| |
| l := Lkml{ |
| msgQueue: msgQueue, |
| seriesQueue: make(chan patchSeries, QueueSize), |
| idToSeries: make(map[string]*patchSeries), |
| sourceRepo: sourceRepo, |
| sourceBranch: sourceBranch, |
| jobURI: jobURI, |
| cloneURI: url.URL{}, |
| ca: ca, |
| kc: kc, |
| } |
| |
| return l |
| } |
| |
| func (l *Lkml) Run() error { |
| err := l.initStorage() |
| if err != nil { |
| log.Fatal("Failed to initialize git repo storage: %v", err) |
| } |
| |
| done := make(chan error, 2) |
| |
| go func() { |
| /* |
| * git daemon currently causes excessive memory usage |
| * using dumb http instead for now which works |
| * cmd := exec.Command("git", "daemon", "--verbose", "--reuseaddr", "--export-all", "--base-path=/kunit", "/kunit") |
| * TODO: fix git daemon memory issues, then need to change |
| * cloneURI to git://{IP}/linux instead of http://{IP}:8080/linux.git |
| */ |
| err := http.ListenAndServe(":8080", http.FileServer(http.Dir("/kunit"))) |
| done <- fmt.Errorf("GIT server died: %v", err) |
| }() |
| |
| // handle messages and dispatch jobs once series received |
| go func() { |
| for msg := range l.msgQueue { |
| if err := l.handleMessage(msg); err != nil { |
| log.Printf("HandleMessageQueue: %v", err) |
| } |
| } |
| done <- errors.New("Message queue closed.") |
| }() |
| |
| // apply patch -> trigger in order |
| go func() { |
| for patch := range l.seriesQueue { |
| if err := l.handleJob(patch); err != nil { |
| log.Printf("HandleJobQueue: %v", err) |
| } |
| } |
| done <- errors.New("Series queue closed.") |
| }() |
| |
| // quit if git server dies or smtp server dies & queues empty |
| return <-done |
| } |
| |
| // run git command in repo dir |
| func runGitCommand(args ...string) error { |
| allArgs := append([]string{"-C", repoPath}, args...) |
| cmd := exec.Command("git", allArgs...) |
| err := cmd.Run() |
| return err |
| } |
| |
| func (l *Lkml) initStorage() error { |
| // set configs |
| exec.Command("git", "config", "--global", "user.email", "prow@robot.com").Run() |
| exec.Command("git", "config", "--global", "user.name", "Prow Robot").Run() |
| exec.Command("git", "config", "--global", "pack.windowMemory", "100m").Run() |
| exec.Command("git", "config", "--global", "pack.packSizeLimit", "100m").Run() |
| exec.Command("git", "config", "--global", "pack.threads", "1").Run() |
| |
| // if existing, update and return |
| if repoDir, err := os.Open(repoPath); err == nil { |
| repoDir.Close() |
| err1 := runGitCommand("checkout", l.sourceBranch) |
| err2 := runGitCommand("pull") |
| if err1 == nil || err2 == nil { |
| return nil |
| } |
| } |
| |
| // otherwise reclone |
| exec.Command("rm", "-rf", repoPath) |
| exec.Command("git", "clone", l.sourceRepo, repoPath).Run() |
| runGitCommand("checkout", l.sourceBranch) |
| runGitCommand("update-server-info") |
| exec.Command("mv", repoPath+"/.git/hooks/post-update.sample", repoPath+"/.git/hooks/post-update").Run() |
| return nil |
| } |
| |
| func (l *Lkml) newSeries(branchName string, numPatches int) (*patchSeries, error) { |
| l.mutex.Lock() |
| defer l.mutex.Unlock() |
| |
| _, exists := l.idToSeries[branchName] |
| if exists == true { |
| return nil, fmt.Errorf("Patch Series with id {%s} already existed", branchName) |
| } |
| |
| // add series to map |
| patch := patchSeries{ |
| branch: branchName, |
| patches: make([]patch, numPatches), |
| num: numPatches, |
| numAdded: 0, |
| ready: make(chan bool), |
| } |
| |
| l.idToSeries[branchName] = &patch |
| |
| return &patch, nil |
| } |
| |
| func (l *Lkml) addPatchToSeries(branchName string, patchInd int, |
| subjectLine string, bodyString string, replyToString string) error { |
| |
| l.mutex.Lock() |
| defer l.mutex.Unlock() |
| |
| series, exists := l.idToSeries[branchName] |
| if exists == false { |
| return fmt.Errorf("Patch Series with id {%s} doesn't existed", branchName) |
| } |
| |
| if patch := series.patches[patchInd]; patch.subject != "" { |
| return fmt.Errorf("Patch Series with id {%s} already has patch #%d/%d with subject line {%s}", |
| branchName, patchInd, len(series.patches), patch.subject) |
| } |
| |
| series.patches[patchInd] = patch{subject: subjectLine, body: bodyString, replyTo: replyToString} |
| series.numAdded += 1 |
| |
| if series.numAdded == series.num { |
| defer func() { series.ready <- true }() |
| } |
| |
| return nil |
| } |
| |
| /* |
| * Parse message body to identify one of 4 types of messages received: |
| * |
| * Not a patch : ignore email |
| * Single patch : send to seriesQueue immediately |
| * Cover of multi patch : Create Series and wait until timeout for others to arrive. |
| * If patches arrive, copy to seriesQueue. |
| * Series will be removed from map regardless by timeout. |
| * Member patch : Add patch to series by message id utilizing In-Reply-To header |
| * which references the cover letter under shallow threading |
| * |
| * Note: All access to map and its series are managed by mutex to ensure synchronization. |
| * This is the most flexible way to ensure proper ordering as emails may arrive |
| * out of order. |
| */ |
| func (l *Lkml) handleMessage(body string) error { |
| var err error |
| |
| // Get message id for replying in thread and mapping series |
| idMatch := messageIdRE.FindStringSubmatch(body) |
| if idMatch == nil || idMatch[1] == "" { |
| err = errors.New("Message has no ID") |
| return err |
| } |
| patchID := idMatch[1] |
| |
| // get subject line and patch information (x and n from "Patch x/n") |
| subjectMatch := subjectRE.FindStringSubmatch(body) |
| if subjectMatch == nil { |
| err = errors.New("Message is not part of a patch") |
| return err |
| } |
| subjectLine := subjectMatch[1] |
| patchInd := 0 |
| if subjectMatch[2] != "" { |
| patchInd, err = strconv.Atoi(subjectMatch[2]) |
| } |
| patchNum := 0 |
| if subjectMatch[3] != "" { |
| patchNum, err = strconv.Atoi(subjectMatch[3]) |
| } |
| |
| // check if single patch |
| if patchNum == 0 { |
| l.handleSinglePatch(body, patchID, subjectLine) |
| } |
| |
| // cover letter |
| if patchInd == 0 { |
| l.handleCoverLetter(patchID, patchNum) |
| } |
| |
| // patch in series |
| replyMatch := inReplyToRE.FindStringSubmatch(body) |
| if replyMatch == nil || replyMatch[1] == "" { |
| err = fmt.Errorf("Patch in series has no reference to cover letter!") |
| return err |
| } |
| seriesID := replyMatch[1] |
| |
| // add patch to series |
| err = l.addPatchToSeries(seriesID, patchInd-1, subjectLine, body, patchID) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (l *Lkml) handleCoverLetter(patchID string, patchNum int) error { |
| series, err := l.newSeries(patchID, patchNum) |
| if err != nil { |
| return err |
| } |
| |
| go l.handleSeries(series) |
| return nil |
| } |
| |
| func (l *Lkml) handleSinglePatch(body string, patchID string, subjectLine string) { |
| patch := patchSeries{ |
| branch: patchID, |
| patches: []patch{{ |
| subject: subjectLine, |
| body: body, |
| replyTo: patchID, |
| }}, |
| num: 1, |
| numAdded: 1, |
| } |
| |
| l.seriesQueue <- patch |
| } |
| |
| /* |
| * Add to seriesQueue once all patches are in. |
| * Timeout specified as constant to delete outdated patches from map |
| */ |
| func (l *Lkml) handleSeries(series *patchSeries) error { |
| shouldTrigger := false |
| |
| select { |
| case <-series.ready: |
| shouldTrigger = true |
| case <-time.After(timeout): |
| log.Print("TIMED OUT") |
| break |
| } |
| |
| if shouldTrigger { |
| l.seriesQueue <- *series |
| } |
| |
| // delete from map |
| l.mutex.Lock() |
| delete(l.idToSeries, series.branch) |
| l.mutex.Unlock() |
| |
| return nil |
| } |
| |
| func (l *Lkml) handleJob(series patchSeries) error { |
| runGitCommand("pull") // update main branch |
| |
| // TODO : need to delete branches after use somehow, possibly launch a |
| // goroutine after triggering for time to delete |
| runGitCommand("checkout", l.sourceBranch) |
| err := runGitCommand("checkout", "-b", series.branch) |
| if err != nil { |
| runGitCommand("checkout", l.sourceBranch) |
| runGitCommand("branch", "-D", series.branch) |
| runGitCommand("checkout", "-b", series.branch) |
| } |
| |
| // TODO : handle multiple jobs mapped to the same uri |
| // when we want to break jobs into multiple test suites, will need to |
| // iterate over the the array of presubmits for specific job uri |
| conf := l.ca.Config().Presubmits[l.jobURI][0] |
| |
| // need to also get tag as this git server doesn't let you fetch from sha |
| // set tag of new commit to sha for fetching |
| prevRef := series.branch + "base" |
| err = runGitCommand("tag", prevRef) |
| if err != nil { |
| log.Printf("tag error %v", err) |
| } |
| |
| baseURI := l.cloneURI.String() |
| |
| for i, p := range series.patches { |
| // apply mail using git am |
| err := runGitCommand("am") |
| if err != nil { |
| log.Printf("Patch cannot be applied: %v", err) |
| runGitCommand("am", "--abort") |
| break // abort remaining jobs |
| // TODO possibly email if patch could not be applied |
| } |
| |
| // set tag of new commit to message id for fetching (see above) |
| ref := p.replyTo |
| runGitCommand("tag", ref) |
| |
| log.Printf("\t Patch #%d has tag %s", i, ref) |
| refs := kube.Refs{ |
| Org: l.cloneURI.Host, |
| Repo: l.cloneURI.Path[1:], |
| BaseRef: prevRef, |
| BaseSHA: "", |
| CloneURI: baseURI, |
| Pulls: []kube.Pull{{ |
| // plank uses pull number to find duplicate jobs |
| // need to monotonically increase |
| // otherwise change job name, org, or repo |
| Number: l.pullNum, |
| Ref: ref, |
| }}, |
| } |
| |
| prevRef = ref |
| |
| annotations := map[string]string{ |
| SubjectAnnotation: p.subject, |
| ReplyAnnotation: p.replyTo, |
| } |
| |
| runGitCommand("update-server-info") // update with new tag |
| pr := pjutil.PresubmitSpec(conf, refs) |
| pj := pjutil.NewProwJobWithAnnotation(pr, conf.Labels, annotations) |
| _, err = l.kc.CreateProwJob(pj) |
| if err != nil { |
| log.Printf("Error in triggering prowjob : %v", err) |
| return err |
| } |
| |
| l.pullNum += 1 // increase pull number |
| } |
| |
| runGitCommand("checkout", l.sourceBranch) |
| |
| // go cleanupBranch(series.branch) // enable after testing |
| return nil |
| } |
| |
| func (l *Lkml) cleanupBranch(branch string) { |
| <-time.After(jobTimeout) |
| runGitCommand("branch", "-D", branch) |
| // need to delete created tags |
| // git fetch --prune <remote> +refs/tags/*:refs/tags/* |
| } |