Skip to content

Commit df8f93d

Browse files
author
iwysiu
committed
GODRIVER-1540 fix deadlock in connection (#348)
1 parent ed18ae6 commit df8f93d

File tree

4 files changed

+63
-8
lines changed

4 files changed

+63
-8
lines changed

x/mongo/driver/topology/connection.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -290,16 +290,17 @@ func (c *connection) close() error {
290290
if atomic.LoadInt32(&c.connected) != connected {
291291
return nil
292292
}
293-
if c.pool == nil {
294-
var err error
295293

296-
if c.nc != nil {
297-
err = c.nc.Close()
298-
}
299-
atomic.StoreInt32(&c.connected, disconnected)
300-
return err
294+
var err error
295+
if c.nc != nil {
296+
err = c.nc.Close()
301297
}
302-
return c.pool.closeConnection(c)
298+
atomic.StoreInt32(&c.connected, disconnected)
299+
300+
if c.pool != nil {
301+
_ = c.pool.removeConnection(c)
302+
}
303+
return err
303304
}
304305

305306
func (c *connection) expired() bool {

x/mongo/driver/topology/connection_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,18 @@ func (nc *netconn) Close() error {
492492
return nc.Conn.Close()
493493
}
494494

495+
type writeFailConn struct {
496+
net.Conn
497+
}
498+
499+
func (wfc *writeFailConn) Write([]byte) (int, error) {
500+
return 0, errors.New("Write error")
501+
}
502+
503+
func (wfc *writeFailConn) SetWriteDeadline(t time.Time) error {
504+
return nil
505+
}
506+
495507
type dialer struct {
496508
Dialer
497509
opened map[*netconn]struct{}

x/mongo/driver/topology/pool.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,18 @@ func (p *pool) closeConnection(c *connection) error {
431431
return nil
432432
}
433433

434+
// removeConnection removes a connection from the pool.
435+
func (p *pool) removeConnection(c *connection) error {
436+
if c.pool != p {
437+
return ErrWrongPool
438+
}
439+
p.Lock()
440+
delete(p.opened, c.poolID)
441+
p.Unlock()
442+
443+
return nil
444+
}
445+
434446
// put returns a connection to this pool. If the pool is connected, the connection is not
435447
// stale, and there is space in the cache, the connection is returned to the cache.
436448
func (p *pool) put(c *connection) error {

x/mongo/driver/topology/pool_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55
"errors"
66
"net"
7+
"strings"
78
"sync/atomic"
89
"testing"
910
"time"
1011

1112
"go.mongodb.org/mongo-driver/x/mongo/driver/address"
13+
"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
1214
)
1315

1416
func TestPool(t *testing.T) {
@@ -518,6 +520,34 @@ func TestPool(t *testing.T) {
518520
}
519521
close(cleanup)
520522
})
523+
t.Run("handshaker i/o fails", func(t *testing.T) {
524+
want := "unable to write wire message to network: Write error"
525+
526+
pc := poolConfig{
527+
Address: address.Address(""),
528+
}
529+
p, err := newPool(pc, WithHandshaker(func(Handshaker) Handshaker {
530+
return operation.NewIsMaster()
531+
}),
532+
WithDialer(func(Dialer) Dialer {
533+
return DialerFunc(func(context.Context, string, string) (net.Conn, error) {
534+
return &writeFailConn{&net.TCPConn{}}, nil
535+
})
536+
}),
537+
)
538+
noerr(t, err)
539+
err = p.connect()
540+
noerr(t, err)
541+
542+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
543+
defer cancel()
544+
545+
_, err = p.get(ctx)
546+
connErr := err.(ConnectionError)
547+
if !strings.Contains(connErr.Error(), want) {
548+
t.Errorf("Incorrect error. got %v; error should contain %v", connErr.Wrapped, want)
549+
}
550+
})
521551
})
522552
t.Run("Connection", func(t *testing.T) {
523553
t.Run("Connection Close Does Not Error After Pool Is Disconnected", func(t *testing.T) {

0 commit comments

Comments
 (0)