Skip to content

Commit dd853ff

Browse files
committed
Support bulk loading from IDataReader.
1 parent 571497b commit dd853ff

File tree

11 files changed

+778
-43
lines changed

11 files changed

+778
-43
lines changed

docs/content/api/mysql-bulk-copy.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
---
2+
date: 2019-11-11
3+
menu:
4+
main:
5+
parent: api
6+
title: MySqlBulkCopy
7+
weight: 15
8+
---
9+
10+
# MySqlBulkCopy
11+
12+
`MySqlBulkCopy` lets you efficiently load a MySQL Server Table with data from another source.
13+
It is similar to the [`SqlBulkCopy`](https://docs.microsoft.com/en-us/dotnet/api/system.data.sqlclient.sqlbulkcopy) class
14+
for SQL Server.
15+
16+
Due to [security features](../troubleshooting/load-data-local-infile/) in MySQL Server, the connection string
17+
**must** have `AllowLoadLocalInfile=true` in order to use this class.
18+
19+
## Example Code
20+
21+
```csharp
22+
// NOTE: to copy data between tables in the same database, use INSERT ... SELECT
23+
// https://dev.mysql.com/doc/refman/8.0/en/insert-select.html
24+
var dataTable = GetDataTableFromExternalSource();
25+
26+
using (var connection = new MySqlConnection("...;AllowLoadLocalInfile=True"))
27+
{
28+
await connection.OpenAsync();
29+
var bulkCopy = new MySqlBulkCopy(connection);
30+
bulkCopy.DestinationTableName = "some_table_name";
31+
await bulkCopy.WriteToServerAsync(dataTable);
32+
}
33+
```
34+
35+
## API Reference
36+
37+
### Constructors
38+
39+
`public MySqlBulkCopy(MySqlConnection connection, MySqlTransaction transaction = null)`
40+
41+
Initializes a `MySqlBulkCopy` with the specified connection, and optionally the active transaction.
42+
43+
### Properties
44+
45+
`public int BulkCopyTimeout { get; set; }`
46+
47+
The number of seconds for the operation to complete before it times out, or `0` for no timeout.
48+
49+
`public string DestinationTableName { get; set; }`
50+
51+
Name of the destination table on the server.
52+
53+
### Methods
54+
55+
`public void WriteToServer(DataTable dataTable);`
56+
57+
`public Task WriteToServerAsync(DataTable dataTable, CancellationToken cancellationToken = default);`
58+
59+
Copies all rows in the supplied `DataTable` to the destination table specified by the `DestinationTableName` property of the `MySqlBulkCopy` object.
60+
(This method is not available on `netstandard1.3`.)
61+
62+
***
63+
64+
`public void WriteToServer(IDataReader dataReader);`
65+
66+
`public Task WriteToServerAsync(IDataReader dataReader, CancellationToken cancellationToken = default);`
67+
68+
Copies all rows in the supplied `IDataReader` to the destination table specified by the `DestinationTableName` property of the `MySqlBulkCopy` object.

src/MySqlConnector/Core/IMySqlCommand.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ internal interface IMySqlCommand
1111
{
1212
string? CommandText { get; }
1313
CommandType CommandType { get; }
14+
bool AllowUserVariables { get; }
1415
CommandBehavior CommandBehavior { get; }
1516
MySqlParameterCollection? RawParameters { get; }
1617
PreparedStatements? TryGetPreparedStatements();
@@ -28,7 +29,7 @@ public static StatementPreparerOptions CreateStatementPreparerOptions(this IMySq
2829
{
2930
var connection = command.Connection!;
3031
var statementPreparerOptions = StatementPreparerOptions.None;
31-
if (connection.AllowUserVariables || command.CommandType == CommandType.StoredProcedure)
32+
if (connection.AllowUserVariables || command.CommandType == CommandType.StoredProcedure || command.AllowUserVariables)
3233
statementPreparerOptions |= StatementPreparerOptions.AllowUserVariables;
3334
if (connection.DateTimeKind == DateTimeKind.Utc)
3435
statementPreparerOptions |= StatementPreparerOptions.DateTimeUtc;

src/MySqlConnector/Core/ResultSet.cs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using System;
2+
using System.Buffers;
23
using System.Collections.Generic;
4+
using System.Data;
35
using System.Globalization;
46
using System.IO;
57
using System.Threading;
@@ -80,19 +82,37 @@ public async Task ReadResultSetHeaderAsync(IOBehavior ioBehavior)
8082
MySqlBulkLoader.GetAndRemoveSource(localInfile.FileName) :
8183
File.OpenRead(localInfile.FileName);
8284

83-
if (source is Stream stream)
85+
IDisposable? disposable = null;
86+
byte[]? buffer = null;
87+
try
8488
{
85-
using (stream)
89+
switch (source)
8690
{
87-
var readBuffer = new byte[65536];
91+
case Stream stream:
92+
disposable = stream;
93+
buffer = ArrayPool<byte>.Shared.Rent(1048576);
8894
int byteCount;
89-
while ((byteCount = await stream.ReadAsync(readBuffer, 0, readBuffer.Length).ConfigureAwait(false)) > 0)
95+
while ((byteCount = await stream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0)
9096
{
91-
payload = new PayloadData(new ArraySegment<byte>(readBuffer, 0, byteCount));
97+
payload = new PayloadData(new ArraySegment<byte>(buffer, 0, byteCount));
9298
await Session.SendReplyAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
9399
}
100+
break;
101+
102+
case IDataReader dataReader:
103+
await MySqlBulkCopy.SendDataReaderAsync(Connection, dataReader, ioBehavior, CancellationToken.None).ConfigureAwait(false);
104+
break;
105+
106+
default:
107+
throw new InvalidOperationException("Unsupported Source type: {0}".FormatInvariant(source.GetType().Name));
94108
}
95109
}
110+
finally
111+
{
112+
if (buffer is object)
113+
ArrayPool<byte>.Shared.Return(buffer);
114+
disposable?.Dispose();
115+
}
96116
}
97117
catch (Exception ex)
98118
{

src/MySqlConnector/MySql.Data.MySqlClient/MySqlBatchCommand.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public MySqlBatchCommand(string? commandText)
1818

1919
public string? CommandText { get; set; }
2020
public CommandType CommandType { get; set; }
21+
public bool AllowUserVariables => false;
2122
public CommandBehavior CommandBehavior { get; set; }
2223
public int RecordsAffected { get; set; }
2324

0 commit comments

Comments
 (0)