Skip to content

Commit 9132a64

Browse files
authored
Merge pull request #83 from progaudi/fix/reconnect
fix lost connection and reconnect automatically
2 parents 286108b + 0750eee commit 9132a64

13 files changed

+465
-128
lines changed

src/tarantool.client/Box.cs

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
using System;
2-
using System.Linq;
3-
using System.Threading.Tasks;
1+
using System.Threading.Tasks;
42

53
using ProGaudi.Tarantool.Client.Model;
64
using ProGaudi.Tarantool.Client.Model.Requests;
75
using ProGaudi.Tarantool.Client.Model.Responses;
8-
using ProGaudi.Tarantool.Client.Utils;
96

107
namespace ProGaudi.Tarantool.Client
118
{
@@ -15,40 +12,17 @@ public class Box : IBox
1512

1613
private readonly ILogicalConnection _logicalConnection;
1714

18-
private readonly IResponseReader _responseReader;
19-
20-
private readonly INetworkStreamPhysicalConnection _physicalConnection;
21-
2215
public Box(ClientOptions options)
2316
{
2417
_clientOptions = options;
2518
TarantoolConvertersRegistrator.Register(options.MsgPackContext);
2619

27-
_physicalConnection = new NetworkStreamPhysicalConnection();
28-
_logicalConnection = new LogicalConnection(options, _physicalConnection);
29-
_responseReader = new ResponseReader(_logicalConnection, options, _physicalConnection);
20+
_logicalConnection = new LogicalConnectionManager(options);
3021
}
3122

3223
public async Task Connect()
3324
{
34-
await _physicalConnection.Connect(_clientOptions);
35-
36-
var greetingsResponseBytes = new byte[128];
37-
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
38-
if (readCount != greetingsResponseBytes.Length)
39-
{
40-
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
41-
}
42-
43-
var greetings = new GreetingsResponse(greetingsResponseBytes);
44-
45-
_clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} .");
46-
47-
_responseReader.BeginReading();
48-
49-
_clientOptions.LogWriter?.WriteLine("Server responses reading started.");
50-
51-
await LoginIfNotGuest(greetings);
25+
await _logicalConnection.Connect();
5226
}
5327

5428
public static async Task<Box> Connect(string replicationSource)
@@ -72,8 +46,7 @@ public void Dispose()
7246
{
7347
_clientOptions.LogWriter?.WriteLine("Box is disposing...");
7448
_clientOptions.LogWriter?.Flush();
75-
_responseReader.Dispose();
76-
_physicalConnection.Dispose();
49+
_logicalConnection.Dispose();
7750
}
7851

7952
public ISchema GetSchema()
@@ -141,21 +114,5 @@ public Task<DataResponse<TResponse[]>> Eval<TResponse>(string expression)
141114
{
142115
return Eval<TarantoolTuple, TResponse>(expression, TarantoolTuple.Empty);
143116
}
144-
145-
private async Task LoginIfNotGuest(GreetingsResponse greetings)
146-
{
147-
var singleNode = _clientOptions.ConnectionOptions.Nodes.Single();
148-
149-
if (string.IsNullOrEmpty(singleNode.Uri.UserName))
150-
{
151-
_clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt.");
152-
return;
153-
}
154-
155-
var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri);
156-
157-
await _logicalConnection.SendRequestWithEmptyResponse(authenticateRequest);
158-
_clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}");
159-
}
160117
}
161118
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System;
2+
3+
using ProGaudi.MsgPack.Light;
4+
5+
using ProGaudi.Tarantool.Client.Model.Requests;
6+
7+
namespace ProGaudi.Tarantool.Client.Converters
8+
{
9+
internal class PingPacketConverter : IMsgPackConverter<PingRequest>
10+
{
11+
public void Initialize(MsgPackContext context)
12+
{
13+
}
14+
15+
public void Write(PingRequest value, IMsgPackWriter writer)
16+
{
17+
}
18+
19+
public PingRequest Read(IMsgPackReader reader)
20+
{
21+
throw new NotImplementedException();
22+
}
23+
}
24+
}
Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
1-
using System.Collections.Generic;
2-
using System.IO;
1+
using System;
32
using System.Threading.Tasks;
43

5-
using ProGaudi.Tarantool.Client.Model;
64
using ProGaudi.Tarantool.Client.Model.Requests;
75
using ProGaudi.Tarantool.Client.Model.Responses;
86

97
namespace ProGaudi.Tarantool.Client
108
{
11-
public interface ILogicalConnection
9+
public interface ILogicalConnection : IDisposable
1210
{
11+
Task Connect();
12+
13+
bool IsConnected();
14+
1315
Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
1416
where TRequest : IRequest;
1517

1618
Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TRequest request)
1719
where TRequest : IRequest;
18-
19-
TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream);
20-
21-
IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources();
2220
}
2321
}

src/tarantool.client/INetworkStreamPhysicalConnection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public interface INetworkStreamPhysicalConnection : IDisposable
88
{
99
Task Connect(ClientOptions options);
1010
Task Flush();
11+
bool IsConnected();
1112
Task<int> ReadAsync(byte[] buffer, int offset, int count);
1213
void Write(byte[] buffer, int offset, int count);
1314
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
using System;
2+
using System.IO;
3+
using System.Threading.Tasks;
4+
5+
using ProGaudi.Tarantool.Client.Model;
26

37
namespace ProGaudi.Tarantool.Client
48
{
9+
510
public interface IResponseReader : IDisposable
611
{
712
void BeginReading();
13+
14+
Task<MemoryStream> GetResponseTask(RequestId requestId);
15+
16+
bool IsConnected();
817
}
918
}

src/tarantool.client/LogicalConnection.cs

Lines changed: 102 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
using System;
2-
using System.Collections.Concurrent;
3-
using System.Collections.Generic;
42
using System.IO;
53
using System.Linq;
64
using System.Threading;
@@ -20,20 +18,95 @@ internal class LogicalConnection : ILogicalConnection
2018
{
2119
private readonly MsgPackContext _msgPackContext;
2220

21+
private readonly ClientOptions _clientOptions;
22+
23+
private readonly RequestIdCounter _requestIdCounter;
24+
2325
private readonly INetworkStreamPhysicalConnection _physicalConnection;
2426

25-
private long _currentRequestId;
27+
private readonly ReaderWriterLockSlim _physicalConnectionLock = new ReaderWriterLockSlim();
2628

27-
private readonly ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>> _pendingRequests =
28-
new ConcurrentDictionary<RequestId, TaskCompletionSource<MemoryStream>>();
29+
private readonly IResponseReader _responseReader;
2930

3031
private readonly ILog _logWriter;
3132

32-
public LogicalConnection(ClientOptions options, INetworkStreamPhysicalConnection physicalConnection)
33+
private bool _disposed;
34+
35+
public LogicalConnection(ClientOptions options, RequestIdCounter requestIdCounter)
3336
{
37+
_clientOptions = options;
38+
_requestIdCounter = requestIdCounter;
3439
_msgPackContext = options.MsgPackContext;
3540
_logWriter = options.LogWriter;
36-
_physicalConnection = physicalConnection;
41+
42+
_physicalConnection = new NetworkStreamPhysicalConnection();
43+
_responseReader = new ResponseReader(_clientOptions, _physicalConnection);
44+
}
45+
46+
public void Dispose()
47+
{
48+
if (_disposed)
49+
{
50+
return;
51+
}
52+
53+
_disposed = true;
54+
55+
_responseReader.Dispose();
56+
_physicalConnection.Dispose();
57+
}
58+
59+
public async Task Connect()
60+
{
61+
await _physicalConnection.Connect(_clientOptions);
62+
63+
var greetingsResponseBytes = new byte[128];
64+
var readCount = await _physicalConnection.ReadAsync(greetingsResponseBytes, 0, greetingsResponseBytes.Length);
65+
if (readCount != greetingsResponseBytes.Length)
66+
{
67+
throw ExceptionHelper.UnexpectedGreetingBytesCount(readCount);
68+
}
69+
70+
var greetings = new GreetingsResponse(greetingsResponseBytes);
71+
72+
_clientOptions.LogWriter?.WriteLine($"Greetings received, salt is {Convert.ToBase64String(greetings.Salt)} .");
73+
74+
_responseReader.BeginReading();
75+
76+
_clientOptions.LogWriter?.WriteLine("Server responses reading started.");
77+
78+
await LoginIfNotGuest(greetings);
79+
}
80+
81+
public bool IsConnected()
82+
{
83+
if (_disposed)
84+
{
85+
return false;
86+
}
87+
88+
if (!_responseReader.IsConnected() || !_physicalConnection.IsConnected())
89+
{
90+
return false;
91+
}
92+
93+
return true;
94+
}
95+
96+
private async Task LoginIfNotGuest(GreetingsResponse greetings)
97+
{
98+
var singleNode = _clientOptions.ConnectionOptions.Nodes.Single();
99+
100+
if (string.IsNullOrEmpty(singleNode.Uri.UserName))
101+
{
102+
_clientOptions.LogWriter?.WriteLine("Guest mode, no authentication attempt.");
103+
return;
104+
}
105+
106+
var authenticateRequest = AuthenticationRequest.Create(greetings, singleNode.Uri);
107+
108+
await SendRequestWithEmptyResponse(authenticateRequest);
109+
_clientOptions.LogWriter?.WriteLine($"Authentication request send: {authenticateRequest}");
37110
}
38111

39112
public async Task SendRequestWithEmptyResponse<TRequest>(TRequest request)
@@ -48,15 +121,6 @@ public async Task<DataResponse<TResponse[]>> SendRequest<TRequest, TResponse>(TR
48121
return await SendRequestImpl<TRequest, DataResponse<TResponse[]>>(request);
49122
}
50123

51-
public TaskCompletionSource<MemoryStream> PopResponseCompletionSource(RequestId requestId, MemoryStream resultStream)
52-
{
53-
TaskCompletionSource<MemoryStream> request;
54-
55-
return _pendingRequests.TryRemove(requestId, out request)
56-
? request
57-
: null;
58-
}
59-
60124
public static byte[] ReadFully(Stream input)
61125
{
62126
input.Position = 0;
@@ -72,32 +136,43 @@ public static byte[] ReadFully(Stream input)
72136
}
73137
}
74138

75-
public IEnumerable<TaskCompletionSource<MemoryStream>> PopAllResponseCompletionSources()
76-
{
77-
var result = _pendingRequests.Values.ToArray();
78-
_pendingRequests.Clear();
79-
return result;
80-
}
81-
82139
private async Task<TResponse> SendRequestImpl<TRequest, TResponse>(TRequest request)
83140
where TRequest : IRequest
84141
{
142+
if (_disposed)
143+
{
144+
throw new ObjectDisposedException(nameof(LogicalConnection));
145+
}
146+
85147
var bodyBuffer = MsgPackSerializer.Serialize(request, _msgPackContext);
86148

87-
var requestId = GetRequestId();
88-
var responseTask = GetResponseTask(requestId);
149+
var requestId = _requestIdCounter.GetRequestId();
150+
var responseTask = _responseReader.GetResponseTask(requestId);
89151

90152
long headerLength;
91153
var headerBuffer = CreateAndSerializeBuffer(request, requestId, bodyBuffer, out headerLength);
92154

93-
lock (_physicalConnection)
155+
try
94156
{
157+
_physicalConnectionLock.EnterWriteLock();
158+
95159
_logWriter?.WriteLine($"Begin sending request header buffer, requestId: {requestId}, code: {request.Code}, length: {headerBuffer.Length}");
96-
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int) headerLength);
160+
_physicalConnection.Write(headerBuffer, 0, Constants.PacketSizeBufferSize + (int)headerLength);
97161

98162
_logWriter?.WriteLine($"Begin sending request body buffer, length: {bodyBuffer.Length}");
99163
_physicalConnection.Write(bodyBuffer, 0, bodyBuffer.Length);
100164
}
165+
catch (Exception ex)
166+
{
167+
_logWriter?.WriteLine(
168+
$"Request with requestId {requestId} failed, header:\n{ToReadableString(headerBuffer)} \n body: \n{ToReadableString(bodyBuffer)}");
169+
Dispose();
170+
throw;
171+
}
172+
finally
173+
{
174+
_physicalConnectionLock.ExitWriteLock();
175+
}
101176

102177
try
103178
{
@@ -137,22 +212,5 @@ private byte[] CreateAndSerializeBuffer<TRequest>(
137212
MsgPackSerializer.Serialize(packetLength, stream, _msgPackContext);
138213
return packetSizeBuffer;
139214
}
140-
141-
private RequestId GetRequestId()
142-
{
143-
var requestId = Interlocked.Increment(ref _currentRequestId);
144-
return (RequestId) (ulong) requestId;
145-
}
146-
147-
private Task<MemoryStream> GetResponseTask(RequestId requestId)
148-
{
149-
var tcs = new TaskCompletionSource<MemoryStream>();
150-
if (!_pendingRequests.TryAdd(requestId, tcs))
151-
{
152-
throw ExceptionHelper.RequestWithSuchIdAlreadySent(requestId);
153-
}
154-
155-
return tcs.Task;
156-
}
157215
}
158216
}

0 commit comments

Comments
 (0)