Skip to content

Commit 57077ea

Browse files
author
Miguel Varela Ramos
authored
Add support for AsyncAPI (#1935)
1 parent 8d01a7c commit 57077ea

File tree

52 files changed

+3230
-259
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3230
-259
lines changed

async-gateway/endpoint.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"encoding/json"
21+
"fmt"
22+
"net/http"
23+
24+
"github.com/gorilla/mux"
25+
"go.uber.org/zap"
26+
)
27+
28+
// Endpoint wraps an async-gateway Service with HTTP logic
29+
type Endpoint struct {
30+
service Service
31+
logger *zap.Logger
32+
}
33+
34+
// NewEndpoint creates and initializes a new Endpoint struct
35+
func NewEndpoint(svc Service, logger *zap.Logger) *Endpoint {
36+
return &Endpoint{
37+
service: svc,
38+
logger: logger,
39+
}
40+
}
41+
42+
// CreateWorkload is a handler for the async-gateway service workload creation route
43+
func (e *Endpoint) CreateWorkload(w http.ResponseWriter, r *http.Request) {
44+
requestID := r.Header.Get("x-request-id")
45+
if requestID == "" {
46+
respondPlainText(w, http.StatusBadRequest, "error: missing x-request-id key in request header")
47+
return
48+
}
49+
50+
contentType := r.Header.Get("Content-Type")
51+
if contentType == "" {
52+
respondPlainText(w, http.StatusBadRequest, "error: missing Content-Type key in request header")
53+
return
54+
}
55+
56+
body := r.Body
57+
defer func() {
58+
_ = r.Body.Close()
59+
}()
60+
61+
log := e.logger.With(zap.String("id", requestID), zap.String("contentType", contentType))
62+
63+
id, err := e.service.CreateWorkload(requestID, body, contentType)
64+
if err != nil {
65+
log.Error("failed to create workload", zap.Error(err))
66+
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
67+
return
68+
}
69+
70+
if err = respondJSON(w, http.StatusOK, CreateWorkloadResponse{ID: id}); err != nil {
71+
log.Error("failed to encode json response", zap.Error(err))
72+
return
73+
}
74+
}
75+
76+
// GetWorkload is a handler for the async-gateway service workload retrieval route
77+
func (e *Endpoint) GetWorkload(w http.ResponseWriter, r *http.Request) {
78+
vars := mux.Vars(r)
79+
id, ok := vars["id"]
80+
if !ok {
81+
respondPlainText(w, http.StatusBadRequest, "error: missing request id in url path")
82+
return
83+
}
84+
85+
log := e.logger.With(zap.String("id", id))
86+
87+
res, err := e.service.GetWorkload(id)
88+
if err != nil {
89+
log.Error("failed to get workload", zap.Error(err))
90+
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
91+
return
92+
}
93+
94+
if err = respondJSON(w, http.StatusOK, res); err != nil {
95+
log.Error("failed to encode json response", zap.Error(err))
96+
return
97+
}
98+
}
99+
100+
func respondPlainText(w http.ResponseWriter, statusCode int, message string) {
101+
w.WriteHeader(statusCode)
102+
w.Header().Set("Content-Type", "text/plain")
103+
_, _ = w.Write([]byte(message))
104+
}
105+
106+
func respondJSON(w http.ResponseWriter, statusCode int, s interface{}) error {
107+
w.WriteHeader(statusCode)
108+
w.Header().Set("Content-Type", "application/json")
109+
return json.NewEncoder(w).Encode(s)
110+
}

async-gateway/go.mod

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module github.com/cortexlabs/async-gateway
2+
3+
go 1.15
4+
5+
require (
6+
github.com/aws/aws-sdk-go v1.37.23
7+
github.com/gorilla/mux v1.8.0
8+
go.uber.org/zap v1.16.0
9+
)

async-gateway/go.sum

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
2+
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
3+
github.com/aws/aws-sdk-go v1.37.23 h1:bO80NcSmRv52w+GFpBegoLdlP/Z0OwUqQ9bbeCLCy/0=
4+
github.com/aws/aws-sdk-go v1.37.23/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
5+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
7+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8+
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
9+
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
10+
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
11+
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
12+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
13+
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
14+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
15+
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
16+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
17+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
18+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
19+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
20+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
21+
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
22+
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
23+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
24+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
25+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
26+
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
27+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
28+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
29+
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
30+
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
31+
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
32+
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
33+
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
34+
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
35+
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
36+
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
37+
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
38+
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
39+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
40+
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
41+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
42+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
43+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
44+
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
45+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
46+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
47+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
48+
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
49+
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
50+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
51+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
52+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
53+
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
54+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
55+
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
56+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
57+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
58+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
59+
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
60+
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
61+
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
62+
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
63+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
64+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
65+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
66+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
67+
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
68+
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
69+
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
70+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
71+
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
72+
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

async-gateway/main.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"flag"
21+
"net/http"
22+
"os"
23+
"strings"
24+
25+
"github.com/aws/aws-sdk-go/aws"
26+
"github.com/aws/aws-sdk-go/aws/session"
27+
"github.com/gorilla/mux"
28+
"go.uber.org/zap"
29+
"go.uber.org/zap/zapcore"
30+
)
31+
32+
const (
33+
_defaultPort = "8080"
34+
)
35+
36+
func createLogger() (*zap.Logger, error) {
37+
logLevelEnv := os.Getenv("CORTEX_LOG_LEVEL")
38+
disableJSONLogging := os.Getenv("CORTEX_DISABLE_JSON_LOGGING")
39+
40+
var logLevelZap zapcore.Level
41+
switch logLevelEnv {
42+
case "DEBUG":
43+
logLevelZap = zapcore.DebugLevel
44+
case "WARNING":
45+
logLevelZap = zapcore.WarnLevel
46+
case "ERROR":
47+
logLevelZap = zapcore.ErrorLevel
48+
default:
49+
logLevelZap = zapcore.InfoLevel
50+
}
51+
52+
encoderConfig := zap.NewProductionEncoderConfig()
53+
encoderConfig.MessageKey = "message"
54+
55+
encoding := "json"
56+
if strings.ToLower(disableJSONLogging) == "true" {
57+
encoding = "console"
58+
}
59+
60+
return zap.Config{
61+
Level: zap.NewAtomicLevelAt(logLevelZap),
62+
Encoding: encoding,
63+
EncoderConfig: encoderConfig,
64+
OutputPaths: []string{"stdout"},
65+
ErrorOutputPaths: []string{"stderr"},
66+
}.Build()
67+
}
68+
69+
// usage: ./gateway -bucket <bucket> -region <region> -port <port> -queue queue <apiName>
70+
func main() {
71+
log, err := createLogger()
72+
if err != nil {
73+
panic(err)
74+
}
75+
defer func() {
76+
_ = log.Sync()
77+
}()
78+
79+
var (
80+
port = flag.String("port", _defaultPort, "port on which the gateway server runs on")
81+
queueURL = flag.String("queue", "", "SQS queue URL")
82+
region = flag.String("region", "", "AWS region")
83+
bucket = flag.String("bucket", "", "AWS bucket")
84+
clusterName = flag.String("cluster", "", "cluster name")
85+
)
86+
flag.Parse()
87+
88+
switch {
89+
case *queueURL == "":
90+
log.Fatal("missing required option: -queue")
91+
case *region == "":
92+
log.Fatal("missing required option: -region")
93+
case *bucket == "":
94+
log.Fatal("missing required option: -bucket")
95+
case *clusterName == "":
96+
log.Fatal("missing required option: -cluster")
97+
}
98+
99+
apiName := flag.Arg(0)
100+
if apiName == "" {
101+
log.Fatal("apiName argument was not provided")
102+
}
103+
104+
sess, err := session.NewSessionWithOptions(session.Options{
105+
Config: aws.Config{
106+
Region: region,
107+
},
108+
SharedConfigState: session.SharedConfigEnable,
109+
})
110+
if err != nil {
111+
log.Fatal("failed to create AWS session: %s", zap.Error(err))
112+
}
113+
114+
s3Storage := NewS3(sess, *bucket)
115+
116+
sqsQueue := NewSQS(*queueURL, sess)
117+
118+
svc := NewService(*clusterName, apiName, sqsQueue, s3Storage, log)
119+
ep := NewEndpoint(svc, log)
120+
121+
router := mux.NewRouter()
122+
router.HandleFunc("/", ep.CreateWorkload).Methods("POST")
123+
router.HandleFunc(
124+
"/healthz",
125+
func(w http.ResponseWriter, r *http.Request) {
126+
respondPlainText(w, http.StatusOK, "ok")
127+
},
128+
)
129+
router.HandleFunc("/{id}", ep.GetWorkload).Methods("GET")
130+
131+
log.Info("Running on port " + *port)
132+
if err = http.ListenAndServe(":"+*port, router); err != nil {
133+
log.Fatal("failed to start server", zap.Error(err))
134+
}
135+
}

async-gateway/queue.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
Copyright 2021 Cortex Labs, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"github.com/aws/aws-sdk-go/aws"
21+
"github.com/aws/aws-sdk-go/aws/session"
22+
awssqs "github.com/aws/aws-sdk-go/service/sqs"
23+
)
24+
25+
// Queue is an interface to abstract communication with event queues
26+
type Queue interface {
27+
SendMessage(message string, uniqueID string) error
28+
}
29+
30+
type sqs struct {
31+
queueURL string
32+
client *awssqs.SQS
33+
}
34+
35+
// NewSQS creates a new SQS client that satisfies the Queue interface
36+
func NewSQS(queueURL string, sess *session.Session) Queue {
37+
client := awssqs.New(sess)
38+
39+
return &sqs{queueURL: queueURL, client: client}
40+
}
41+
42+
// SendMessage sends a string
43+
func (q *sqs) SendMessage(message string, uniqueID string) error {
44+
_, err := q.client.SendMessage(&awssqs.SendMessageInput{
45+
MessageBody: aws.String(message),
46+
MessageDeduplicationId: aws.String(uniqueID),
47+
MessageGroupId: aws.String(uniqueID),
48+
QueueUrl: aws.String(q.queueURL),
49+
})
50+
return err
51+
}

0 commit comments

Comments
 (0)