Skip to content

Commit b857af8

Browse files
author
maksim.konovalov
committed
add discovering mvp
1 parent d8e2284 commit b857af8

File tree

3 files changed

+126
-1
lines changed

3 files changed

+126
-1
lines changed

box/info.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ type Replication struct {
4848
// Upstream information.
4949
type Upstream struct {
5050
// Status is replication status of the connection with the instance.
51+
/*
52+
connect: an instance is connecting to the master.
53+
auth: authentication is being performed.
54+
wait_snapshot: an instance is receiving metadata from the master. If join fails with a non-critical error at this stage (for example, ER_READONLY, ER_ACCESS_DENIED, or a network-related issue), an instance tries to find a new master to join.
55+
fetch_snapshot: an instance is receiving data from the master’s .snap files.
56+
final_join: an instance is receiving new data added during fetch_snapshot.
57+
sync: the master and replica are synchronizing to have the same data.
58+
follow: the current instance’s role is replica. This means that the instance is read-only or acts as a replica for this remote peer in master-master configuration. The instance is receiving or able to receive data from the instance n’s (upstream) master.
59+
stopped: replication is stopped due to a replication error (for example, duplicate key).
60+
disconnected: an instance is not connected to the replica set (for example, due to network issues, not replication errors).
61+
*/
5162
Status string `msgpack:"status"`
5263
// Idle is the time (in seconds) since the last event was received.
5364
Idle float64 `msgpack:"idle"`
@@ -61,6 +72,8 @@ type Upstream struct {
6172
Message string `msgpack:"message,omitempty"`
6273
// SystemMessage contains an error message in case of a degraded state; otherwise, it is nil.
6374
SystemMessage string `msgpack:"system_message,omitempty"`
75+
// Name - instance name, required only for tarantool 3.
76+
Name string `msgpack:"name,omitempty"`
6477
}
6578

6679
// Downstream information.

pool/connection_pool.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"time"
2020

2121
"github.com/tarantool/go-iproto"
22-
2322
"github.com/tarantool/go-tarantool/v2"
2423
)
2524

@@ -75,12 +74,17 @@ type Instance struct {
7574

7675
// Opts provides additional options (configurable via ConnectWithOpts).
7776
type Opts struct {
77+
DiscoveringDialer tarantool.NetDialer
7878
// Timeout for timer to reopen connections that have been closed by some
7979
// events and to relocate connection between subpools if ro/rw role has
8080
// been updated.
8181
CheckTimeout time.Duration
8282
// ConnectionHandler provides an ability to handle connection updates.
8383
ConnectionHandler ConnectionHandler
84+
// WatchTopology start pool auto discovering by replication option
85+
EnableDiscovery bool
86+
// DiscoveryTimeout timout for discovering function
87+
DiscoveryTimeout time.Duration
8488
}
8589

8690
/*
@@ -103,6 +107,9 @@ Main features:
103107
- Automatic master discovery by mode parameter.
104108
*/
105109
type ConnectionPool struct {
110+
// root background connection pool ctx
111+
ctx context.Context
112+
106113
ends map[string]*endpoint
107114
endsMutex sync.RWMutex
108115

@@ -115,6 +122,8 @@ type ConnectionPool struct {
115122
anyPool *roundRobinStrategy
116123
poolsMutex sync.RWMutex
117124
watcherContainer watcherContainer
125+
126+
discoveringCancel context.CancelFunc
118127
}
119128

120129
var _ Pooler = (*ConnectionPool)(nil)
@@ -171,6 +180,7 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
171180
anyPool := newRoundRobinStrategy(size)
172181

173182
connPool := &ConnectionPool{
183+
ctx: ctx,
174184
ends: make(map[string]*endpoint),
175185
opts: opts,
176186
state: connectedState,
@@ -192,6 +202,13 @@ func ConnectWithOpts(ctx context.Context, instances []Instance,
192202
go connPool.controller(endpointCtx, endpoint)
193203
}
194204

205+
if opts.EnableDiscovery {
206+
err := connPool.StartDiscovery()
207+
if err != nil { // it strange if there is error, but we need to check that one
208+
return nil, err
209+
}
210+
}
211+
195212
return connPool, nil
196213
}
197214

pool/discovering.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package pool
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log"
7+
"time"
8+
9+
"github.com/tarantool/go-tarantool/v2/box"
10+
)
11+
12+
func (p *ConnectionPool) StartDiscovery() error {
13+
if p.discoveringCancel != nil {
14+
return errors.New("discovering already started")
15+
}
16+
17+
ctx, cancel := context.WithCancel(p.ctx)
18+
19+
t := time.NewTicker(p.opts.DiscoveryTimeout)
20+
21+
go func() {
22+
for {
23+
select {
24+
case <-t.C:
25+
// we use any connection, because master can be unavailable
26+
info, err := box.New(NewConnectorAdapter(p, ANY)).Info()
27+
if err != nil {
28+
log.Printf("tarantool: watch topology failed: %s\n", err)
29+
continue
30+
}
31+
32+
for _, replication := range info.Replication {
33+
upstream := replication.Upstream
34+
35+
if upstream.Status != "follow" {
36+
log.Printf("found replication instance (%s:%s) in non-follow state; skip discovering\n",
37+
upstream.Name, upstream.Peer)
38+
continue
39+
}
40+
41+
addr := upstream.Peer
42+
43+
if addr == "" { // itself instance
44+
continue
45+
}
46+
47+
name := upstream.Name
48+
49+
if name == "" {
50+
name = replication.UUID
51+
}
52+
53+
p.endsMutex.Lock()
54+
_, exists := p.ends[addr]
55+
p.endsMutex.Unlock()
56+
57+
if !exists {
58+
dialer := p.opts.DiscoveringDialer
59+
dialer.Address = addr
60+
61+
i := Instance{
62+
Name: name,
63+
Dialer: dialer,
64+
}
65+
66+
err = p.Add(ctx, i)
67+
if err != nil {
68+
log.Printf("tarantool: add to pool failed: %s\n", err)
69+
continue
70+
}
71+
}
72+
}
73+
74+
continue
75+
case <-ctx.Done():
76+
return
77+
}
78+
}
79+
}()
80+
81+
p.discoveringCancel = cancel
82+
83+
return nil
84+
}
85+
86+
func (p *ConnectionPool) StopDiscovery() error {
87+
if p.discoveringCancel != nil {
88+
p.discoveringCancel()
89+
p.discoveringCancel = nil
90+
91+
return nil
92+
}
93+
94+
return errors.New("discovering not started yet")
95+
}

0 commit comments

Comments
 (0)