| // Copyright 2019 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package main |
| |
| import ( |
| "flag" |
| "github.com/bradfitz/go-smtpd" |
| "github.com/sirupsen/logrus" |
| "k8s.io/test-infra/prow/config" |
| "k8s.io/test-infra/prow/kube" |
| "log" |
| "os" |
| "prow-lkml/lkml" |
| "strings" |
| "time" |
| ) |
| |
| // mail server over smtp port 25 |
| const smtpListen = ":25" |
| |
| type mail struct { |
| from smtpd.MailAddress |
| body strings.Builder |
| conn smtpd.Connection |
| hasRcpt bool |
| messageQueue chan<- string |
| } |
| |
| // possibly filter connection by address |
| func onNewConnection(c smtpd.Connection) error { |
| log.Printf("smtpd: new connection from %v", c.Addr()) |
| return nil |
| } |
| |
| func (m *mail) AddRecipient(rcpt smtpd.MailAddress) error { |
| m.hasRcpt = true |
| log.Printf("To: email:%s", rcpt.Email()) |
| return nil |
| } |
| |
| func (m *mail) BeginData() error { |
| if !m.hasRcpt { |
| return smtpd.SMTPError("554 5.5.1 Error: no valid recipients") |
| } |
| return nil |
| } |
| |
| func (m *mail) Write(line []byte) error { |
| m.body.Write(line) |
| return nil |
| } |
| |
| func (m *mail) Close() error { |
| str := m.body.String() |
| m.messageQueue <- str |
| return nil |
| } |
| |
| type options struct { |
| configPath string |
| jobConfigPath string |
| sourceRepo string |
| sourceBranch string |
| jobURI string |
| } |
| |
| func (o *options) validate() error { |
| return nil |
| } |
| |
| func gatherOptions() options { |
| o := options{} |
| flag.StringVar(&o.configPath, "config-path", "", "Path to config.yaml.") |
| // TODO: support muliple mailing lists each mapped to multiple repo/branches |
| // one concern is that this might put too much overhead on one container |
| // might be better to split over multiple instances of the lkml pod |
| flag.StringVar(&o.jobConfigPath, "job-config-path", "", "Path to prow job configs") |
| flag.StringVar(&o.sourceRepo, "source-repo", "", "Repo to patch to") |
| flag.StringVar(&o.sourceBranch, "source-branch", "", "Branch to patch to") |
| flag.StringVar(&o.jobURI, "job-uri", "", "Mapping for presubmit job") |
| flag.Parse() |
| return o |
| } |
| |
| func main() { |
| o := gatherOptions() |
| if err := o.validate(); err != nil { |
| log.Fatal("Args to LKML component were not valid: %v", err) |
| } |
| |
| // config agent for fetching current configuration |
| ca := &config.Agent{} |
| if err := ca.Start(o.configPath, o.jobConfigPath); err != nil { |
| logrus.WithError(err).Fatal("Error starting config agent.") |
| } |
| |
| // start kube client needed to trigger Jobs |
| log.Printf("Namespace is %s", ca.Config().ProwJobNamespace) |
| kc, err := kube.NewClientInCluster(ca.Config().ProwJobNamespace) |
| if err != nil { |
| logrus.WithError(err).Fatal("Error getting kube client.") |
| } |
| |
| // get ClusterIP through Service Environment Variable. |
| // need Service deployed before pod for this to work |
| host := os.Getenv("GIT_SERVICE_HOST") |
| if host == "" { |
| log.Fatal("IP address for git server is invalid. Redeploy Pod after" + |
| "Service is live") |
| } |
| |
| // queue to pass patches from smtp to lkml controller |
| var messageQueue = make(chan string, lkml.QueueSize) |
| |
| c := lkml.New( |
| messageQueue, |
| o.sourceRepo, |
| o.sourceBranch, |
| o.jobURI, |
| host, |
| ca, |
| kc, |
| ) |
| |
| // closure for accessing message channel from mail |
| onNewMail := func(c smtpd.Connection, from smtpd.MailAddress) (smtpd.Envelope, error) { |
| log.Printf("Email From: email:%s", from.Email()) |
| return &mail{ |
| from: from, |
| conn: c, |
| messageQueue: messageQueue, |
| }, nil |
| } |
| |
| // start smtp server |
| go func() { |
| log.Printf("running smtp server on %s", smtpListen) |
| s := &smtpd.Server{ |
| Addr: smtpListen, |
| OnNewMail: onNewMail, |
| OnNewConnection: onNewConnection, |
| ReadTimeout: time.Minute, |
| } |
| err := s.ListenAndServe() |
| log.Printf("SMTP server died: %v", err) |
| // process remaining patches and jobs before closing |
| close(messageQueue) |
| }() |
| |
| if err := c.Run(); err != nil { |
| log.Fatal("Error from lkml controller: %v", err) |
| } |
| } |