Skip to content

Commit f549614

Browse files
authored
Session timeout (#37)
* First round of changes * Tweak backoff configuration.
1 parent 00aac8a commit f549614

File tree

1 file changed

+49
-19
lines changed

1 file changed

+49
-19
lines changed

coherence/session.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"fmt"
1515
"github.com/google/uuid"
1616
"google.golang.org/grpc"
17+
"google.golang.org/grpc/backoff"
1718
"google.golang.org/grpc/connectivity"
1819
"google.golang.org/grpc/credentials"
1920
"google.golang.org/grpc/credentials/insecure"
@@ -197,12 +198,17 @@ func (s *Session) ID() string {
197198

198199
// Close closes a connection.
199200
func (s *Session) Close() {
200-
s.maps = make(map[string]interface{}, 0)
201-
s.caches = make(map[string]interface{}, 0)
202-
err := s.conn.Close()
203-
s.closed = true
204-
if err != nil {
205-
log.Printf("Unable to close session %s %v", s.sessionID, err)
201+
if !s.closed {
202+
s.maps = make(map[string]interface{}, 0)
203+
s.caches = make(map[string]interface{}, 0)
204+
err := s.conn.Close()
205+
s.closed = true
206+
s.dispatch(Closed, func() SessionLifecycleEvent {
207+
return newSessionLifecycleEvent(s, Closed)
208+
})
209+
if err != nil {
210+
log.Printf("Unable to close session %s %v", s.sessionID, err)
211+
}
206212
}
207213
}
208214

@@ -245,6 +251,17 @@ func (s *Session) ensureConnection() error {
245251
}
246252
s.dialOptions = append(s.dialOptions, tlsOpt)
247253

254+
connOpt := grpc.WithConnectParams(grpc.ConnectParams{
255+
Backoff: backoff.Config{
256+
BaseDelay: 1.0 * time.Second,
257+
Multiplier: 1.1,
258+
Jitter: 0.0,
259+
MaxDelay: 3.0 * time.Second,
260+
},
261+
MinConnectTimeout: 10 * time.Second,
262+
})
263+
s.dialOptions = append(s.dialOptions, connOpt)
264+
248265
s.mutex.Lock()
249266
locked = true
250267

@@ -268,10 +285,11 @@ func (s *Session) ensureConnection() error {
268285
// refer: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
269286
go func(session *Session) {
270287
var (
271-
firstConnect = true
272-
connected = false
273-
ctx = context.Background()
274-
lastState = session.conn.GetState()
288+
firstConnect = true
289+
connected = false
290+
ctx = context.Background()
291+
lastState = session.conn.GetState()
292+
disconnectTime int64 = 0
275293
)
276294

277295
for {
@@ -285,40 +303,52 @@ func (s *Session) ensureConnection() error {
285303
session.debug("connection:", lastState, "=>", newState)
286304

287305
if newState == connectivity.Shutdown {
288-
log.Printf("closed session %v", s.sessionID)
289-
s.dispatch(Closed, func() SessionLifecycleEvent {
290-
return newSessionLifecycleEvent(session, Closed)
291-
})
292-
session.closed = true
306+
log.Printf("closed session %v", session.sessionID)
307+
session.Close()
293308
return
294309
}
295310

296311
if newState == connectivity.Ready {
297312
if !firstConnect && !connected {
313+
disconnectTime = 0
298314
log.Printf("session: %s re-connected to address %s", session.sessionID, session.sessOpts.Address)
299-
s.dispatch(Reconnected, func() SessionLifecycleEvent {
315+
session.dispatch(Reconnected, func() SessionLifecycleEvent {
300316
return newSessionLifecycleEvent(session, Reconnected)
301317
})
302318
session.closed = false
303319
connected = true
304320
} else if firstConnect && !connected {
305321
firstConnect = false
306322
connected = true
307-
s.hasConnected = true
323+
session.hasConnected = true
308324
log.Printf("session: %s connected to address %s", session.sessionID, session.sessOpts.Address)
309-
s.dispatch(Connected, func() SessionLifecycleEvent {
325+
session.dispatch(Connected, func() SessionLifecycleEvent {
310326
return newSessionLifecycleEvent(session, Connected)
311327
})
312328
}
313329
} else {
314330
if connected {
331+
disconnectTime = -1
315332
log.Printf("session: %s disconnected from address %s", session.sessionID, session.sessOpts.Address)
316-
s.dispatch(Disconnected, func() SessionLifecycleEvent {
333+
session.dispatch(Disconnected, func() SessionLifecycleEvent {
317334
return newSessionLifecycleEvent(session, Disconnected)
318335
})
319336
connected = false
320337
}
321338

339+
if disconnectTime != 0 {
340+
if disconnectTime == -1 {
341+
disconnectTime = time.Now().UnixMilli()
342+
} else {
343+
waited := time.Now().UnixMilli() - disconnectTime
344+
if waited >= session.GetSessionTimeout().Milliseconds() {
345+
log.Printf("session: %s unable to reconnect within [%s]. Closing session.", session.sessionID, session.GetSessionTimeout().String())
346+
session.Close()
347+
return
348+
}
349+
}
350+
}
351+
322352
// trigger a reconnection on state change
323353
if newState != connectivity.Connecting {
324354
conn.Connect()

0 commit comments

Comments
 (0)