Skip to content

feat(sender): add PoolFromOptions pool constructor #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 12, 2024
57 changes: 53 additions & 4 deletions sender_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ import (
//
// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
type LineSenderPool struct {
// options
maxSenders int
conf string

closed bool
// presence of a non-empty conf takes precedence over opts
conf string
opts []LineSenderOption

// senders are stored here
senders []LineSender
mu *sync.Mutex

// plumbing fields
closed bool
mu *sync.Mutex
}

// LineSenderPoolOption defines line sender pool config option.
Expand Down Expand Up @@ -74,6 +80,37 @@ func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, e
return pool, nil
}

// PoolFromOptions instantiates a new LineSenderPool using programmatic options.
// Any sender acquired from this pool will be initialized with the same options
// that were passed into the opts argument.
//
// Unlike [PoolFromConf], PoolFromOptions does not have the ability to customize
// the returned LineSenderPool. In this case, to add options (such as [WithMaxSenders]),
// you need manually apply these options after calling this method.
//
// // Create a PoolFromOptions with LineSender options
// p, err := PoolFromOptions(
// WithHttp(),
// WithAutoFlushRows(1000000),
// )
//
// if err != nil {
// panic(err)
// }
//
// // Add Pool-level options manually
// WithMaxSenders(32)(p)
func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error) {
pool := &LineSenderPool{
maxSenders: 64,
opts: opts,
senders: []LineSender{},
mu: &sync.Mutex{},
}

return pool, nil
}

// WithMaxSenders sets the maximum number of senders in the pool.
// The default maximum number of senders is 64.
func WithMaxSenders(count int) LineSenderPoolOption {
Expand All @@ -99,7 +136,19 @@ func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) {
return s, nil
}

return LineSenderFromConf(ctx, p.conf)
if p.conf != "" {
return LineSenderFromConf(ctx, p.conf)
} else {
conf := newLineSenderConfig(httpSenderType)
for _, opt := range p.opts {
opt(conf)
if conf.senderType == tcpSenderType {
return nil, errors.New("tcp/s not supported for pooled senders, use http/s only")
}
}
return newHttpLineSender(conf)
}

}

// Release flushes the LineSender and returns it back to the pool. If the pool
Expand Down
Loading