Skip to content

Shadow pipelines #1948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cli/cmd/lib_traffic_splitters.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,14 @@ func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environ

apiRes := apisRes[0]
lastUpdated := time.Unix(apiRes.Spec.LastUpdated, 0)

apiName := apiRes.Spec.Name
if api.Shadow {
apiName += " (shadow)"
}
rows = append(rows, []interface{}{
env.Name,
apiRes.Spec.Name,
apiName,
api.Weight,
apiRes.Status.Message(),
apiRes.Status.Requested,
Expand Down Expand Up @@ -108,7 +113,11 @@ func trafficSplitterListTable(trafficSplitter []schema.APIResponse, envNames []s
lastUpdated := time.Unix(splitAPI.Spec.LastUpdated, 0)
var apis []string
for _, api := range splitAPI.Spec.APIs {
apis = append(apis, api.Name+":"+s.Int32(api.Weight))
apiName := api.Name
if api.Shadow {
apiName += " (shadow)"
}
apis = append(apis, apiName+":"+s.Int32(api.Weight))
}
apisStr := s.TruncateEllipses(strings.Join(apis, " "), 50)
rows = append(rows, []interface{}{
Expand Down
3 changes: 2 additions & 1 deletion docs/workloads/realtime/traffic-splitter/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
endpoint: <string> # the endpoint for the Traffic Splitter (default: <name>)
apis: # list of Realtime APIs to target
- name: <string> # name of a Realtime API that is already running or is included in the same configuration file (required)
weight: <int> # percentage of traffic to route to the Realtime API (all weights must sum to 100) (required)
weight: <int> # percentage of traffic to route to the Realtime API (all non-shadow weights must sum to 100) (required)
shadow: <bool> # duplicate incoming traffic and send fire-and-forget to this api (only one shadow per traffic splitter) (default: false)
```
36 changes: 28 additions & 8 deletions pkg/lib/k8s/virtual_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,34 @@ type Destination struct {
ServiceName string
Weight int32
Port uint32
Shadow bool
}

func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualService {
destinations := []*istionetworking.HTTPRouteDestination{}
var mirror *istionetworking.Destination
var mirrorWeight *istionetworking.Percent

for _, destination := range spec.Destinations {
destinations = append(destinations, &istionetworking.HTTPRouteDestination{
Destination: &istionetworking.Destination{
if destination.Shadow {
mirror = &istionetworking.Destination{
Host: destination.ServiceName,
Port: &istionetworking.PortSelector{
Number: destination.Port,
},
},
Weight: destination.Weight,
})
}
mirrorWeight = &istionetworking.Percent{Value: float64(destination.Weight)}
} else {
destinations = append(destinations, &istionetworking.HTTPRouteDestination{
Destination: &istionetworking.Destination{
Host: destination.ServiceName,
Port: &istionetworking.PortSelector{
Number: destination.Port,
},
},
Weight: destination.Weight,
})
}
}

var httpRoutes []*istionetworking.HTTPRoute
Expand All @@ -79,7 +93,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
},
},
},
Route: destinations,
Route: destinations,
Mirror: mirror,
MirrorPercentage: mirrorWeight,
})

if spec.Rewrite != nil {
Expand All @@ -98,7 +114,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
},
},
},
Route: destinations,
Route: destinations,
Mirror: mirror,
MirrorPercentage: mirrorWeight,
}

prefixMatch := &istionetworking.HTTPRoute{
Expand All @@ -111,7 +129,9 @@ func VirtualService(spec *VirtualServiceSpec) *istioclientnetworking.VirtualServ
},
},
},
Route: destinations,
Route: destinations,
Mirror: mirror,
MirrorPercentage: mirrorWeight,
}

if spec.Rewrite != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/operator/resources/trafficsplitter/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func getTrafficSplitterDestinations(trafficSplitter *spec.API) []k8s.Destination
ServiceName: operator.K8sName(api.Name),
Weight: api.Weight,
Port: uint32(_defaultPortInt32),
Shadow: api.Shadow,
}
}
return destinations
Expand Down
10 changes: 9 additions & 1 deletion pkg/types/spec/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const (
ErrConcurrencyMismatchServerSideBatchingPython = "spec.concurrency_mismatch_server_side_batching_python"
ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight"
ErrTrafficSplitterAPIsNotUnique = "spec.traffic_splitter_apis_not_unique"
ErrOneShadowPerTrafficSplitter = "spec.one_shadow_per_traffic_splitter"
ErrUnexpectedDockerSecretData = "spec.unexpected_docker_secret_data"
)

Expand Down Expand Up @@ -593,7 +594,7 @@ func ErrorConcurrencyMismatchServerSideBatchingPython(maxBatchsize int32, thread
func ErrorIncorrectTrafficSplitterWeightTotal(totalWeight int32) error {
return errors.WithStack(&errors.Error{
Kind: ErrIncorrectTrafficSplitterWeight,
Message: fmt.Sprintf("expected weights to sum to 100 but found %d", totalWeight),
Message: fmt.Sprintf("expected weights of all non-shadow apis to sum to 100 but found %d", totalWeight),
})
}

Expand All @@ -604,6 +605,13 @@ func ErrorTrafficSplitterAPIsNotUnique(names []string) error {
})
}

func ErrorOneShadowPerTrafficSplitter() error {
return errors.WithStack(&errors.Error{
Kind: ErrOneShadowPerTrafficSplitter,
Message: fmt.Sprintf("multiple shadow apis detected; only one api is allowed to be marked as a shadow"),
})
}

var _pwRegex = regexp.MustCompile(`"password":"[^"]+"`)
var _authRegex = regexp.MustCompile(`"auth":"[^"]+"`)

Expand Down
4 changes: 3 additions & 1 deletion pkg/types/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ func getModelVersionsFromPaths(paths []string, prefix string) []string {
func verifyTotalWeight(apis []*userconfig.TrafficSplit) error {
totalWeight := int32(0)
for _, api := range apis {
totalWeight += api.Weight
if !api.Shadow {
totalWeight += api.Weight
}
}
if totalWeight == 100 {
return nil
Expand Down
14 changes: 14 additions & 0 deletions pkg/types/spec/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func multiAPIsValidation() *cr.StructFieldValidation {
LessThanOrEqualTo: pointer.Int32(100),
},
},
{
StructField: "Shadow",
BoolValidation: &cr.BoolValidation{},
},
},
},
},
Expand Down Expand Up @@ -846,6 +850,16 @@ func ValidateTrafficSplitter(api *userconfig.API) error {
return err
}

hasShadow := false
for _, api := range api.APIs {
if api.Shadow {
if hasShadow {
return ErrorOneShadowPerTrafficSplitter()
}
hasShadow = true
}
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/types/userconfig/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type MultiModels struct {
type TrafficSplit struct {
Name string `json:"name" yaml:"name"`
Weight int32 `json:"weight" yaml:"weight"`
Shadow bool `json:"shadow" yaml:"shadow"`
}

type ModelResource struct {
Expand Down Expand Up @@ -386,6 +387,7 @@ func (trafficSplit *TrafficSplit) UserStr() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%s: %s\n", NameKey, trafficSplit.Name))
sb.WriteString(fmt.Sprintf("%s: %s\n", WeightKey, s.Int32(trafficSplit.Weight)))
sb.WriteString(fmt.Sprintf("%s: %s\n", ShadowKey, s.Bool(trafficSplit.Shadow)))
return sb.String()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/types/userconfig/config_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
// TrafficSplitter
APIsKey = "apis"
WeightKey = "weight"
ShadowKey = "shadow"

// Predictor
TypeKey = "type"
Expand Down
9 changes: 9 additions & 0 deletions test/apis/traffic-splitter/cortex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,19 @@
models:
path: s3://cortex-examples/onnx/iris-classifier/

- name: request-recorder
kind: RealtimeAPI
predictor:
type: python
path: request_recorder.py

- name: iris-classifier
kind: TrafficSplitter
apis:
- name: iris-classifier-onnx
weight: 30
- name: iris-classifier-pytorch
weight: 70
- name: request-recorder
shadow: true
weight: 100
10 changes: 10 additions & 0 deletions test/apis/traffic-splitter/request_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from cortex_internal.lib.log import logger as cortex_logger


class PythonPredictor:
def __init__(self, config):
pass

def predict(self, payload):
cortex_logger.info("received payload", extra={"payload": payload})
return payload