Skip to content

Commit 6a2840d

Browse files
committed
Execute prepared commands comprising a single statement.
Signed-off-by: Bradley Grainger <[email protected]>
1 parent 8f84081 commit 6a2840d

File tree

8 files changed

+575
-15
lines changed

8 files changed

+575
-15
lines changed

docs/content/tutorials/migrating-from-connector-net.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,8 @@ The following bugs in Connector/NET are fixed by switching to MySqlConnector.
128128
* [#89335](https://bugs.mysql.com/bug.php?id=89335): `MySqlCommandBuilder.DeriveParameters` fails for `JSON` type
129129
* [#91123](https://bugs.mysql.com/bug.php?id=91123): Database names are case-sensitive when calling a stored procedure
130130
* [#91199](https://bugs.mysql.com/bug.php?id=91199): Can't insert `MySqlDateTime` values
131+
* [#91751](https://bugs.mysql.com/bug.php?id=91751): `YEAR` column retrieved incorrectly with prepared command
132+
* [#91752](https://bugs.mysql.com/bug.php?id=91752): `00:00:00` is converted to `NULL` with prepared command
133+
* [#91753](https://bugs.mysql.com/bug.php?id=91753): Unnamed parameter not supported by `MySqlCommand.Prepare`
134+
* [#91754](https://bugs.mysql.com/bug.php?id=91754): Inserting 16MiB `BLOB` shifts it by four bytes when prepared
135+
* [#91770](https://bugs.mysql.com/bug.php?id=91770): `TIME(n)` column loses microseconds with prepared command
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using MySqlConnector.Protocol.Payloads;
2+
3+
namespace MySqlConnector.Core
4+
{
5+
/// <summary>
6+
/// <see cref="PreparedStatement"/> is a statement that has been prepared on the MySQL Server.
7+
/// </summary>
8+
internal sealed class PreparedStatement
9+
{
10+
public PreparedStatement(int statementId, ParsedStatement statement, ColumnDefinitionPayload[] columns, ColumnDefinitionPayload[] parameters)
11+
{
12+
StatementId = statementId;
13+
Statement = statement;
14+
Columns = columns;
15+
Parameters = parameters;
16+
}
17+
18+
public int StatementId { get; }
19+
public ParsedStatement Statement { get; }
20+
public ColumnDefinitionPayload[] Columns { get; }
21+
public ColumnDefinitionPayload[] Parameters { get; }
22+
}
23+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using System;
2+
using System.Data;
3+
using System.Data.Common;
4+
using System.IO;
5+
using System.Net.Sockets;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using MySql.Data.MySqlClient;
9+
using MySqlConnector.Logging;
10+
using MySqlConnector.Protocol;
11+
using MySqlConnector.Protocol.Serialization;
12+
using MySqlConnector.Utilities;
13+
14+
namespace MySqlConnector.Core
15+
{
16+
internal sealed class PreparedStatementCommandExecutor : ICommandExecutor
17+
{
18+
public PreparedStatementCommandExecutor(MySqlCommand command)
19+
{
20+
m_command = command;
21+
}
22+
23+
public async Task<DbDataReader> ExecuteReaderAsync(string commandText, MySqlParameterCollection parameterCollection, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
24+
{
25+
cancellationToken.ThrowIfCancellationRequested();
26+
if (Log.IsDebugEnabled())
27+
Log.Debug("Session{0} ExecuteBehavior {1} CommandText: {2}", m_command.Connection.Session.Id, ioBehavior, commandText);
28+
using (var payload = CreateQueryPayload(m_command.PreparedStatements[0], parameterCollection, m_command.Connection.GuidFormat))
29+
using (m_command.RegisterCancel(cancellationToken))
30+
{
31+
m_command.Connection.Session.StartQuerying(m_command);
32+
m_command.LastInsertedId = -1;
33+
try
34+
{
35+
await m_command.Connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
36+
return await MySqlDataReader.CreateAsync(m_command, behavior, ResultSetProtocol.Binary, ioBehavior).ConfigureAwait(false);
37+
}
38+
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
39+
{
40+
Log.Warn("Session{0} query was interrupted", m_command.Connection.Session.Id);
41+
throw new OperationCanceledException(cancellationToken);
42+
}
43+
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
44+
{
45+
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
46+
// use "decimal megabytes" (to round up) when creating the exception message
47+
int megabytes = payload.ArraySegment.Count / 1_000_000;
48+
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
49+
}
50+
}
51+
}
52+
53+
private PayloadData CreateQueryPayload(PreparedStatement preparedStatement, MySqlParameterCollection parameterCollection, MySqlGuidFormat guidFormat)
54+
{
55+
var writer = new ByteBufferWriter();
56+
writer.Write((byte) CommandKind.StatementExecute);
57+
writer.Write(preparedStatement.StatementId);
58+
writer.Write((byte) 0);
59+
writer.Write(1);
60+
if (preparedStatement.Parameters?.Length > 0)
61+
{
62+
// TODO: How to handle incorrect number of parameters?
63+
64+
// build subset of parameters for this statement
65+
var parameters = new MySqlParameter[preparedStatement.Statement.ParameterNames.Count];
66+
for (var i = 0; i < preparedStatement.Statement.ParameterNames.Count; i++)
67+
{
68+
var parameterName = preparedStatement.Statement.ParameterNames[i];
69+
var parameterIndex = parameterName != null ? parameterCollection.NormalizedIndexOf(parameterName) : preparedStatement.Statement.ParameterIndexes[i];
70+
parameters[i] = parameterCollection[parameterIndex];
71+
}
72+
73+
// write null bitmap
74+
byte nullBitmap = 0;
75+
for (var i = 0; i < parameters.Length; i++)
76+
{
77+
var parameter = parameters[i];
78+
if (parameter.Value == null || parameter.Value == DBNull.Value)
79+
{
80+
if (i > 0 && i % 8 == 0)
81+
{
82+
writer.Write(nullBitmap);
83+
nullBitmap = 0;
84+
}
85+
86+
nullBitmap |= (byte) (1 << (i % 8));
87+
}
88+
}
89+
writer.Write(nullBitmap);
90+
91+
// write "new parameters bound" flag
92+
writer.Write((byte) 1);
93+
94+
foreach (var parameter in parameters)
95+
writer.Write(TypeMapper.ConvertToColumnTypeAndFlags(parameter.MySqlDbType, guidFormat));
96+
97+
var options = m_command.CreateStatementPreparerOptions();
98+
foreach (var parameter in parameters)
99+
parameter.AppendBinary(writer, options);
100+
}
101+
102+
return writer.ToPayloadData();
103+
}
104+
105+
static IMySqlConnectorLogger Log { get; } = MySqlConnectorLogManager.CreateLogger(nameof(PreparedStatementCommandExecutor));
106+
107+
readonly MySqlCommand m_command;
108+
}
109+
}

src/MySqlConnector/MySql.Data.MySqlClient/MySqlCommand.cs

Lines changed: 97 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Data;
34
using System.Data.Common;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using MySqlConnector.Core;
8+
using MySqlConnector.Protocol;
9+
using MySqlConnector.Protocol.Payloads;
710
using MySqlConnector.Protocol.Serialization;
811
using MySqlConnector.Utilities;
912

@@ -62,14 +65,85 @@ public MySqlCommand(string commandText, MySqlConnection connection, MySqlTransac
6265

6366
public new MySqlDataReader ExecuteReader(CommandBehavior commandBehavior) => (MySqlDataReader) base.ExecuteReader(commandBehavior);
6467

65-
public override void Prepare()
68+
public override void Prepare() => PrepareAsync(IOBehavior.Synchronous, default).GetAwaiter().GetResult();
69+
public Task PrepareAsync() => PrepareAsync(AsyncIOBehavior, default);
70+
public Task PrepareAsync(CancellationToken cancellationToken) => PrepareAsync(AsyncIOBehavior, cancellationToken);
71+
72+
private async Task PrepareAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
6673
{
6774
if (Connection == null)
6875
throw new InvalidOperationException("Connection property must be non-null.");
6976
if (Connection.State != ConnectionState.Open)
7077
throw new InvalidOperationException("Connection must be Open; current state is {0}".FormatInvariant(Connection.State));
7178
if (string.IsNullOrWhiteSpace(CommandText))
7279
throw new InvalidOperationException("CommandText must be specified");
80+
if (m_connection?.HasActiveReader ?? false)
81+
throw new InvalidOperationException("Cannot call Prepare when there is an open DataReader for this command; it must be closed first.");
82+
if (Connection.IgnorePrepare)
83+
return;
84+
85+
if (CommandType != CommandType.Text)
86+
throw new NotSupportedException("Only CommandType.Text is currently supported by MySqlCommand.Prepare");
87+
88+
var statementPreparer = new StatementPreparer(CommandText, Parameters, CreateStatementPreparerOptions());
89+
var parsedStatements = statementPreparer.SplitStatements();
90+
91+
if (parsedStatements.Statements.Count > 1)
92+
throw new NotSupportedException("Multiple semicolon-delimited SQL statements are not supported by MySqlCommand.Prepare");
93+
94+
var columnsAndParameters = new ResizableArray<byte>();
95+
var columnsAndParametersSize = 0;
96+
97+
var preparedStatements = new List<PreparedStatement>(parsedStatements.Statements.Count);
98+
foreach (var statement in parsedStatements.Statements)
99+
{
100+
await Connection.Session.SendAsync(new PayloadData(statement.StatementBytes), ioBehavior, cancellationToken).ConfigureAwait(false);
101+
var payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
102+
var response = StatementPrepareResponsePayload.Create(payload);
103+
104+
ColumnDefinitionPayload[] parameters = null;
105+
if (response.ParameterCount > 0)
106+
{
107+
parameters = new ColumnDefinitionPayload[response.ParameterCount];
108+
for (var i = 0; i < response.ParameterCount; i++)
109+
{
110+
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
111+
Utility.Resize(ref columnsAndParameters, columnsAndParametersSize + payload.ArraySegment.Count);
112+
Buffer.BlockCopy(payload.ArraySegment.Array, payload.ArraySegment.Offset, columnsAndParameters.Array, columnsAndParametersSize, payload.ArraySegment.Count);
113+
parameters[i] = ColumnDefinitionPayload.Create(new ResizableArraySegment<byte>(columnsAndParameters, columnsAndParametersSize, payload.ArraySegment.Count));
114+
columnsAndParametersSize += payload.ArraySegment.Count;
115+
}
116+
if (!Connection.Session.SupportsDeprecateEof)
117+
{
118+
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
119+
EofPayload.Create(payload);
120+
}
121+
}
122+
123+
ColumnDefinitionPayload[] columns = null;
124+
if (response.ColumnCount > 0)
125+
{
126+
columns = new ColumnDefinitionPayload[response.ColumnCount];
127+
for (var i = 0; i < response.ColumnCount; i++)
128+
{
129+
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
130+
Utility.Resize(ref columnsAndParameters, columnsAndParametersSize + payload.ArraySegment.Count);
131+
Buffer.BlockCopy(payload.ArraySegment.Array, payload.ArraySegment.Offset, columnsAndParameters.Array, columnsAndParametersSize, payload.ArraySegment.Count);
132+
columns[i] = ColumnDefinitionPayload.Create(new ResizableArraySegment<byte>(columnsAndParameters, columnsAndParametersSize, payload.ArraySegment.Count));
133+
columnsAndParametersSize += payload.ArraySegment.Count;
134+
}
135+
if (!Connection.Session.SupportsDeprecateEof)
136+
{
137+
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
138+
EofPayload.Create(payload);
139+
}
140+
}
141+
142+
preparedStatements.Add(new PreparedStatement(response.StatementId, statement, columns, parameters));
143+
}
144+
145+
m_parsedStatements = parsedStatements;
146+
m_statements = preparedStatements;
73147
}
74148

75149
public override string CommandText
@@ -80,6 +154,7 @@ public override string CommandText
80154
if (m_connection?.HasActiveReader ?? false)
81155
throw new InvalidOperationException("Cannot set MySqlCommand.CommandText when there is an open DataReader for this command; it must be closed first.");
82156
m_commandText = value;
157+
m_statements = null;
83158
}
84159
}
85160

@@ -104,22 +179,12 @@ public override int CommandTimeout
104179

105180
public override CommandType CommandType
106181
{
107-
get
108-
{
109-
return m_commandType;
110-
}
182+
get => m_commandType;
111183
set
112184
{
113185
if (value != CommandType.Text && value != CommandType.StoredProcedure)
114186
throw new ArgumentException("CommandType must be Text or StoredProcedure.", nameof(value));
115-
if (value == m_commandType)
116-
return;
117-
118187
m_commandType = value;
119-
if (value == CommandType.Text)
120-
m_commandExecutor = new TextCommandExecutor(this);
121-
else if (value == CommandType.StoredProcedure)
122-
m_commandExecutor = new StoredProcedureCommandExecutor(this);
123188
}
124189
}
125190

@@ -203,9 +268,20 @@ protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior b
203268
return ExecuteReaderAsync(behavior, AsyncIOBehavior, cancellationToken);
204269
}
205270

206-
internal Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) =>
207-
!IsValid(out var exception) ? Utility.TaskFromException<DbDataReader>(exception) :
208-
m_commandExecutor.ExecuteReaderAsync(CommandText, m_parameterCollection, behavior, ioBehavior, cancellationToken);
271+
internal Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
272+
{
273+
if (!IsValid(out var exception))
274+
return Utility.TaskFromException<DbDataReader>(exception);
275+
276+
if (m_statements != null)
277+
m_commandExecutor = new PreparedStatementCommandExecutor(this);
278+
else if (m_commandType == CommandType.Text)
279+
m_commandExecutor = new TextCommandExecutor(this);
280+
else if (m_commandType == CommandType.StoredProcedure)
281+
m_commandExecutor = new StoredProcedureCommandExecutor(this);
282+
283+
return m_commandExecutor.ExecuteReaderAsync(CommandText, m_parameterCollection, behavior, ioBehavior, cancellationToken);
284+
}
209285

210286
protected override void Dispose(bool disposing)
211287
{
@@ -214,6 +290,8 @@ protected override void Dispose(bool disposing)
214290
if (disposing)
215291
{
216292
m_parameterCollection = null;
293+
m_parsedStatements?.Dispose();
294+
m_parsedStatements = null;
217295
}
218296
}
219297
finally
@@ -243,6 +321,8 @@ internal IDisposable RegisterCancel(CancellationToken token)
243321

244322
internal int CancelAttemptCount { get; set; }
245323

324+
internal IReadOnlyList<PreparedStatement> PreparedStatements => m_statements;
325+
246326
/// <summary>
247327
/// Causes the effective command timeout to be reset back to the value specified by <see cref="CommandTimeout"/>.
248328
/// </summary>
@@ -324,6 +404,8 @@ private bool IsValid(out Exception exception)
324404
MySqlConnection m_connection;
325405
string m_commandText;
326406
MySqlParameterCollection m_parameterCollection;
407+
ParsedStatements m_parsedStatements;
408+
IReadOnlyList<PreparedStatement> m_statements;
327409
int? m_commandTimeout;
328410
CommandType m_commandType;
329411
ICommandExecutor m_commandExecutor;

0 commit comments

Comments
 (0)