Skip to content

GODRIVER-2219 Make maxConnecting configurable via ClientOptions or URI options. #828

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ func (c *Client) configure(opts *options.ClientOptions) error {
topology.WithMinConnections(func(uint64) uint64 { return *opts.MinPoolSize }),
)
}
// MaxConnecting
if opts.MaxConnecting != nil {
serverOpts = append(
serverOpts,
topology.WithMaxConnecting(func(uint64) uint64 { return *opts.MaxConnecting }),
)
}
// PoolMonitor
if opts.PoolMonitor != nil {
serverOpts = append(
Expand Down
16 changes: 16 additions & 0 deletions mongo/options/clientoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type ClientOptions struct {
MaxConnIdleTime *time.Duration
MaxPoolSize *uint64
MinPoolSize *uint64
MaxConnecting *uint64
PoolMonitor *event.PoolMonitor
Monitor *event.CommandMonitor
ServerMonitor *event.ServerMonitor
Expand Down Expand Up @@ -312,6 +313,10 @@ func (c *ClientOptions) ApplyURI(uri string) *ClientOptions {
c.MinPoolSize = &cs.MinPoolSize
}

if cs.MaxConnectingSet {
c.MaxConnecting = &cs.MaxConnecting
}

if cs.ReadConcernLevel != "" {
c.ReadConcern = readconcern.New(readconcern.Level(cs.ReadConcernLevel))
}
Expand Down Expand Up @@ -583,6 +588,14 @@ func (c *ClientOptions) SetMinPoolSize(u uint64) *ClientOptions {
return c
}

// SetMaxConnecting specifies the maximum number of connections a connection pool may establish simultaneously. This can
// also be set through the "maxConnecting" URI option (e.g. "maxConnecting=2"). If this is 0, the default is used. The
// default is 2. Values greater than 100 are not recommended.
func (c *ClientOptions) SetMaxConnecting(u uint64) *ClientOptions {
c.MaxConnecting = &u
return c
}

// SetPoolMonitor specifies a PoolMonitor to receive connection pool events. See the event.PoolMonitor documentation
// for more information about the structure of the monitor and events that can be received.
func (c *ClientOptions) SetPoolMonitor(m *event.PoolMonitor) *ClientOptions {
Expand Down Expand Up @@ -859,6 +872,9 @@ func MergeClientOptions(opts ...*ClientOptions) *ClientOptions {
if opt.MinPoolSize != nil {
c.MinPoolSize = opt.MinPoolSize
}
if opt.MaxConnecting != nil {
c.MaxConnecting = opt.MaxConnecting
}
if opt.PoolMonitor != nil {
c.PoolMonitor = opt.PoolMonitor
}
Expand Down
11 changes: 11 additions & 0 deletions mongo/options/clientoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestClientOptions(t *testing.T) {
{"MaxConnIdleTime", (*ClientOptions).SetMaxConnIdleTime, 5 * time.Second, "MaxConnIdleTime", true},
{"MaxPoolSize", (*ClientOptions).SetMaxPoolSize, uint64(250), "MaxPoolSize", true},
{"MinPoolSize", (*ClientOptions).SetMinPoolSize, uint64(10), "MinPoolSize", true},
{"MaxConnecting", (*ClientOptions).SetMaxConnecting, uint64(10), "MaxConnecting", true},
{"PoolMonitor", (*ClientOptions).SetPoolMonitor, &event.PoolMonitor{}, "PoolMonitor", false},
{"Monitor", (*ClientOptions).SetMonitor, &event.CommandMonitor{}, "Monitor", false},
{"ReadConcern", (*ClientOptions).SetReadConcern, readconcern.Majority(), "ReadConcern", false},
Expand Down Expand Up @@ -338,6 +339,16 @@ func TestClientOptions(t *testing.T) {
"mongodb://localhost/?maxPoolSize=256",
baseClient().SetMaxPoolSize(256),
},
{
"MinPoolSize",
"mongodb://localhost/?minPoolSize=256",
baseClient().SetMinPoolSize(256),
},
{
"MaxConnecting",
"mongodb://localhost/?maxConnecting=10",
baseClient().SetMaxConnecting(10),
},
{
"ReadConcern",
"mongodb://localhost/?readConcernLevel=linearizable",
Expand Down
9 changes: 9 additions & 0 deletions x/mongo/driver/connstring/connstring.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type ConnString struct {
MaxPoolSizeSet bool
MinPoolSize uint64
MinPoolSizeSet bool
MaxConnecting uint64
MaxConnectingSet bool
Password string
PasswordSet bool
ReadConcernLevel string
Expand Down Expand Up @@ -731,6 +733,13 @@ func (p *parser) addOption(pair string) error {
}
p.MinPoolSize = uint64(n)
p.MinPoolSizeSet = true
case "maxconnecting":
n, err := strconv.Atoi(value)
if err != nil || n < 0 {
return fmt.Errorf("invalid value for %s: %s", key, value)
}
p.MaxConnecting = uint64(n)
p.MaxConnectingSet = true
case "readconcernlevel":
p.ReadConcernLevel = value
case "readpreference":
Expand Down
27 changes: 27 additions & 0 deletions x/mongo/driver/connstring/connstring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,33 @@ func TestMinPoolSize(t *testing.T) {
}
}

func TestMaxConnecting(t *testing.T) {
tests := []struct {
s string
expected uint64
err bool
}{
{s: "maxConnecting=10", expected: 10},
{s: "maxConnecting=100", expected: 100},
{s: "maxConnecting=-2", err: true},
{s: "maxConnecting=gsdge", err: true},
}

for _, test := range tests {
s := fmt.Sprintf("mongodb://localhost/?%s", test.s)
t.Run(s, func(t *testing.T) {
cs, err := connstring.ParseAndValidate(s)
if test.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.True(t, cs.MaxConnectingSet)
require.Equal(t, test.expected, cs.MaxConnecting)
}
})
}
}

func TestReadPreference(t *testing.T) {
tests := []struct {
s string
Expand Down
18 changes: 12 additions & 6 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ func (pe PoolError) Error() string { return string(pe) }

// poolConfig contains all aspects of the pool that can be configured
type poolConfig struct {
Address address.Address
MinPoolSize uint64
MaxPoolSize uint64
MaxIdleTime time.Duration
PoolMonitor *event.PoolMonitor
Address address.Address
MinPoolSize uint64
MaxPoolSize uint64
MaxConnecting uint64
MaxIdleTime time.Duration
PoolMonitor *event.PoolMonitor
}

type pool struct {
Expand Down Expand Up @@ -100,11 +101,16 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
connOpts = append(connOpts, WithIdleTimeout(func(_ time.Duration) time.Duration { return config.MaxIdleTime }))
}

var maxConnecting uint64 = 2
if config.MaxConnecting > 0 {
maxConnecting = config.MaxConnecting
}

pool := &pool{
address: config.Address,
minSize: config.MinPoolSize,
maxSize: config.MaxPoolSize,
maxConnecting: 2,
maxConnecting: maxConnecting,
monitor: config.PoolMonitor,
connOpts: connOpts,
generation: newPoolGenerationMap(),
Expand Down
11 changes: 6 additions & 5 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,12 @@ func NewServer(addr address.Address, topologyID primitive.ObjectID, opts ...Serv
s.rttMonitor = newRTTMonitor(rttCfg)

pc := poolConfig{
Address: addr,
MinPoolSize: cfg.minConns,
MaxPoolSize: cfg.maxConns,
MaxIdleTime: cfg.connectionPoolMaxIdleTime,
PoolMonitor: cfg.poolMonitor,
Address: addr,
MinPoolSize: cfg.minConns,
MaxPoolSize: cfg.maxConns,
MaxConnecting: cfg.maxConnecting,
MaxIdleTime: cfg.connectionPoolMaxIdleTime,
PoolMonitor: cfg.poolMonitor,
}

connectionOpts := copyConnectionOpts(cfg.connectionOpts)
Expand Down
11 changes: 11 additions & 0 deletions x/mongo/driver/topology/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type serverConfig struct {
heartbeatTimeout time.Duration
maxConns uint64
minConns uint64
maxConnecting uint64
poolMonitor *event.PoolMonitor
serverMonitor *event.ServerMonitor
connectionPoolMaxIdleTime time.Duration
Expand Down Expand Up @@ -124,6 +125,16 @@ func WithMinConnections(fn func(uint64) uint64) ServerOption {
}
}

// WithMaxConnecting configures the maximum number of connections a connection
// pool may establish simultaneously. If maxConnecting is 0, the default value
// of 2 is used.
func WithMaxConnecting(fn func(uint64) uint64) ServerOption {
return func(cfg *serverConfig) error {
cfg.maxConnecting = fn(cfg.maxConnecting)
return nil
}
}

// WithConnectionPoolMaxIdleTime configures the maximum time that a connection can remain idle in the connection pool
// before being removed. If connectionPoolMaxIdleTime is 0, then no idle time is set and connections will not be removed
// because of their age
Expand Down