|
| 1 | +// Copyright 2018 The Operator-SDK Authors |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package eventapi |
| 16 | + |
| 17 | +import ( |
| 18 | + "encoding/json" |
| 19 | + "fmt" |
| 20 | + "io" |
| 21 | + "io/ioutil" |
| 22 | + "net" |
| 23 | + "net/http" |
| 24 | + "strings" |
| 25 | + "sync" |
| 26 | + "time" |
| 27 | + |
| 28 | + "github.com/sirupsen/logrus" |
| 29 | +) |
| 30 | + |
| 31 | +// EventReceiver serves the event API |
| 32 | +type EventReceiver struct { |
| 33 | + // Events is the channel used by the event API handler to send JobEvents |
| 34 | + // back to the runner, or whatever code is using this receiver. |
| 35 | + Events chan JobEvent |
| 36 | + |
| 37 | + // SocketPath is the path on the filesystem to a unix streaming socket |
| 38 | + SocketPath string |
| 39 | + |
| 40 | + // URLPath is the path portion of the url at which events should be |
| 41 | + // received. For example, "/events/" |
| 42 | + URLPath string |
| 43 | + |
| 44 | + // server is the http.Server instance that serves the event API. It must be |
| 45 | + // closed. |
| 46 | + server io.Closer |
| 47 | + |
| 48 | + // stopped indicates if this receiver has permanently stopped receiving |
| 49 | + // events. When true, requests to POST an event will receive a "410 Gone" |
| 50 | + // response, and the body will be ignored. |
| 51 | + stopped bool |
| 52 | + |
| 53 | + // mutex controls access to the "stopped" bool above, ensuring that writes |
| 54 | + // are goroutine-safe. |
| 55 | + mutex sync.RWMutex |
| 56 | + |
| 57 | + // ident is the unique identifier for a particular run of ansible-runner |
| 58 | + ident string |
| 59 | + |
| 60 | + // logger holds a logger that has some fields already set |
| 61 | + logger logrus.FieldLogger |
| 62 | +} |
| 63 | + |
| 64 | +func New(ident string, errChan chan<- error) (*EventReceiver, error) { |
| 65 | + sockPath := fmt.Sprintf("/tmp/ansibleoperator-%s", ident) |
| 66 | + listener, err := net.Listen("unix", sockPath) |
| 67 | + if err != nil { |
| 68 | + return nil, err |
| 69 | + } |
| 70 | + |
| 71 | + rec := EventReceiver{ |
| 72 | + Events: make(chan JobEvent, 1000), |
| 73 | + SocketPath: sockPath, |
| 74 | + URLPath: "/events/", |
| 75 | + ident: ident, |
| 76 | + logger: logrus.WithFields(logrus.Fields{ |
| 77 | + "component": "eventapi", |
| 78 | + "job": ident, |
| 79 | + }), |
| 80 | + } |
| 81 | + |
| 82 | + mux := http.NewServeMux() |
| 83 | + mux.HandleFunc(rec.URLPath, rec.handleEvents) |
| 84 | + srv := http.Server{Handler: mux} |
| 85 | + rec.server = &srv |
| 86 | + |
| 87 | + go func() { |
| 88 | + errChan <- srv.Serve(listener) |
| 89 | + }() |
| 90 | + return &rec, nil |
| 91 | +} |
| 92 | + |
| 93 | +// Close ensures that appropriate resources are cleaned up, such as any unix |
| 94 | +// streaming socket that may be in use. Close must be called. |
| 95 | +func (e *EventReceiver) Close() { |
| 96 | + e.mutex.Lock() |
| 97 | + e.stopped = true |
| 98 | + e.mutex.Unlock() |
| 99 | + e.logger.Debug("event API stopped") |
| 100 | + e.server.Close() |
| 101 | + close(e.Events) |
| 102 | +} |
| 103 | + |
| 104 | +func (e *EventReceiver) handleEvents(w http.ResponseWriter, r *http.Request) { |
| 105 | + if r.URL.Path != e.URLPath { |
| 106 | + http.NotFound(w, r) |
| 107 | + e.logger.WithFields(logrus.Fields{ |
| 108 | + "code": "404", |
| 109 | + }).Infof("path not found: %s\n", r.URL.Path) |
| 110 | + return |
| 111 | + } |
| 112 | + |
| 113 | + if r.Method != http.MethodPost { |
| 114 | + e.logger.WithFields(logrus.Fields{ |
| 115 | + "code": "405", |
| 116 | + }).Infof("method %s not allowed", r.Method) |
| 117 | + w.WriteHeader(http.StatusMethodNotAllowed) |
| 118 | + return |
| 119 | + } |
| 120 | + |
| 121 | + ct := r.Header.Get("content-type") |
| 122 | + if strings.Split(ct, ";")[0] != "application/json" { |
| 123 | + e.logger.WithFields(logrus.Fields{ |
| 124 | + "code": "415", |
| 125 | + }).Info("wrong content type: %s", ct) |
| 126 | + w.WriteHeader(http.StatusUnsupportedMediaType) |
| 127 | + w.Write([]byte("The content-type must be \"application/json\"")) |
| 128 | + return |
| 129 | + } |
| 130 | + |
| 131 | + body, err := ioutil.ReadAll(r.Body) |
| 132 | + if err != nil { |
| 133 | + e.logger.WithFields(logrus.Fields{ |
| 134 | + "code": "500", |
| 135 | + }).Errorf("%s", err.Error()) |
| 136 | + w.WriteHeader(http.StatusInternalServerError) |
| 137 | + return |
| 138 | + } |
| 139 | + |
| 140 | + event := JobEvent{} |
| 141 | + err = json.Unmarshal(body, &event) |
| 142 | + if err != nil { |
| 143 | + e.logger.WithFields(logrus.Fields{ |
| 144 | + "code": "400", |
| 145 | + }).Infof("could not deserialize body: %s", err.Error()) |
| 146 | + w.WriteHeader(http.StatusBadRequest) |
| 147 | + w.Write([]byte("Could not deserialize body as JSON")) |
| 148 | + return |
| 149 | + } |
| 150 | + |
| 151 | + // Guarantee that the Events channel will not be written to if stopped == |
| 152 | + // true, because in that case the channel has been closed. |
| 153 | + e.mutex.RLock() |
| 154 | + defer e.mutex.RUnlock() |
| 155 | + if e.stopped { |
| 156 | + e.mutex.RUnlock() |
| 157 | + w.WriteHeader(http.StatusGone) |
| 158 | + e.logger.WithFields(logrus.Fields{ |
| 159 | + "code": "410", |
| 160 | + }).Info("stopped and not accepting additional events for this job") |
| 161 | + return |
| 162 | + } |
| 163 | + // ansible-runner sends "status events" and "ansible events". The "status |
| 164 | + // events" signify a change in the state of ansible-runner itself, which |
| 165 | + // we're not currently interested in. |
| 166 | + // https://ansible-runner.readthedocs.io/en/latest/external_interface.html#event-structure |
| 167 | + if event.UUID == "" { |
| 168 | + e.logger.Info("dropping event that is not a JobEvent") |
| 169 | + } else { |
| 170 | + // timeout if the channel blocks for too long |
| 171 | + timeout := time.NewTimer(10 * time.Second) |
| 172 | + select { |
| 173 | + case e.Events <- event: |
| 174 | + case <-timeout.C: |
| 175 | + e.logger.WithFields(logrus.Fields{ |
| 176 | + "code": "500", |
| 177 | + }).Warn("timed out writing event to channel") |
| 178 | + w.WriteHeader(http.StatusInternalServerError) |
| 179 | + return |
| 180 | + } |
| 181 | + _ = timeout.Stop() |
| 182 | + } |
| 183 | + w.WriteHeader(http.StatusNoContent) |
| 184 | +} |
0 commit comments