Skip to content

Async for bulk operation #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Redis.OM/Redis.OM.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<RootNamespace>Redis.OM</RootNamespace>
<Nullable>enable</Nullable>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<PackageVersion>0.4.2</PackageVersion>
<Version>0.4.2</Version>
<PackageReleaseNotes>https://github.com/redis/redis-om-dotnet/releases/tag/v0.4.2</PackageReleaseNotes>
<PackageVersion>0.5.0</PackageVersion>
<Version>0.5.0</Version>
<PackageReleaseNotes>https://github.com/redis/redis-om-dotnet/releases/tag/v0.5.0</PackageReleaseNotes>
<Description>Object Mapping and More for Redis</Description>
<Title>Redis OM</Title>
<Authors>Steve Lorello</Authors>
Expand Down
15 changes: 12 additions & 3 deletions src/Redis.OM/Searching/IRedisCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,24 @@ public interface IRedisCollection<T> : IOrderedQueryable<T>, IAsyncEnumerable<T>
/// </summary>
/// <param name="items">The items to insert.</param>
/// <returns>The list of Keys.</returns>
Task<List<string>> Insert(IEnumerable<T> items);
Task<List<string>> InsertAsync(IEnumerable<T> items);

/// <summary>
/// Inserts list of items into redis.
/// </summary>
/// <param name="items">The items to insert.</param>
/// <param name="timeSpan">The timespan of the document's (TTL).</param>
/// /// <returns>The list of Keys.</returns>
Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan);
Task<List<string>> InsertAsync(IEnumerable<T> items, TimeSpan timeSpan);

/// <summary>
/// Inserts list of items into redis.
/// </summary>
/// <param name="items">The item.</param>
/// <param name="when">Condition to insert the document under.</param>
/// <param name="timeSpan">The expiration time of the document (TTL).</param>
/// <returns>the Id of the newly inserted item, or null if not inserted.</returns>
Task<List<string?>> InsertAsync(IEnumerable<T> items, WhenKey when, TimeSpan? timeSpan = null);

/// <summary>
/// finds an item by it's ID or keyname.
Expand Down Expand Up @@ -302,4 +311,4 @@ public interface IRedisCollection<T> : IOrderedQueryable<T>, IAsyncEnumerable<T>
/// <returns>A dictionary correlating the ids provided to the objects in Redis.</returns>
Task<IDictionary<string, T?>> FindByIdsAsync(IEnumerable<string> ids);
}
}
}
25 changes: 22 additions & 3 deletions src/Redis.OM/Searching/RedisCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ public async Task<string> InsertAsync(T item, TimeSpan timeSpan)
}

/// <inheritdoc/>
public async Task<List<string>> Insert(IEnumerable<T> items)
public async Task<List<string>> InsertAsync(IEnumerable<T> items)
{
var distinct = items.Distinct().ToArray();
if (!distinct.Any())
Expand All @@ -676,7 +676,7 @@ public async Task<List<string>> Insert(IEnumerable<T> items)
}

/// <inheritdoc/>
public async Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan)
public async Task<List<string>> InsertAsync(IEnumerable<T> items, TimeSpan timeSpan)
{
var distinct = items.Distinct().ToArray();
if (!distinct.Any())
Expand All @@ -694,6 +694,25 @@ public async Task<List<string>> Insert(IEnumerable<T> items, TimeSpan timeSpan)
return result.ToList();
}

/// <inheritdoc/>
public async Task<List<string?>> InsertAsync(IEnumerable<T> items, WhenKey when, TimeSpan? timeSpan = null)
{
var distinct = items.Distinct().ToArray();
if (!distinct.Any())
{
return new List<string?>();
}

var tasks = new List<Task<string?>>();
foreach (var item in distinct)
{
tasks.Add(((RedisQueryProvider)Provider).Connection.SetAsync(item, when, timeSpan));
}

var result = await Task.WhenAll(tasks);
return result.ToList();
}

/// <inheritdoc/>
public T? FindById(string id)
{
Expand Down Expand Up @@ -800,4 +819,4 @@ private void SaveToStateManager(string key, object value)
}
}
}
}
}
59 changes: 40 additions & 19 deletions test/Redis.OM.Unit.Tests/RediSearchTests/BulkOperationsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -15,6 +16,7 @@ public BulkOperationsTests(RedisSetup setup)
{
_connection = setup.Connection;
}

private IRedisConnection _connection = null;

[Fact]
Expand All @@ -27,7 +29,7 @@ public async Task Test_Bulk_InsertAsync()
new Person() { Name = "Jeeva", Age = 22, NickNames = new[] { "Jee", "Jeev", "J" } },
new Person() { Name = "Martin", Age = 60, NickNames = new[] { "Mart", "Mat", "tin" } }
};
var keys = await collection.Insert(persons);
var keys = await collection.InsertAsync(persons);

var people = collection.Where(x => x.NickNames.Contains("Bob") || x.NickNames.Contains("Alie")).ToList();
Assert.Contains(people, x => x.Name == persons.First().Name);
Expand All @@ -43,7 +45,7 @@ public async Task Test_Inserts_TwiceWith_SaveDataWith_ExactFields()
new Person() { Name = "Jeeva", Age = 22, NickNames = new[] { "Jee", "Jeev", "J" } },
new Person() { Name = "Martin", Age = 61, NickNames = new[] { "Mart", "Mat", "tin" } }
};
var keys = await collection.Insert(persons); //performs JSON.SET create keys and emit the list of keys.
var keys = await collection.InsertAsync(persons); //performs JSON.SET create keys and emit the list of keys.

var persons2 = new List<Person>() {
new Person() { Name = "Alice", Age = 14, NickNames = new[] { "Ally", "Alie", "Al" }, IsEngineer = true },
Expand All @@ -52,9 +54,9 @@ public async Task Test_Inserts_TwiceWith_SaveDataWith_ExactFields()
new Person() { Name = "Martin", Age = 61, NickNames = new[] { "Mart", "Mat", "tin" }, TagField = "Martin" }
};

var keys2 = await collection.Insert(persons2); //create keys and emit the list of keys.
var keys2 = await collection.InsertAsync(persons2); //create keys and emit the list of keys.

var people = collection.Where(x => x.Age >= 20 && x.Age <=30).ToList();
var people = collection.Where(x => x.Age >= 20 && x.Age <= 30).ToList();
Assert.NotEqual(keys, keys2); //not performs any re-indexing because keys are not same.
}

Expand All @@ -68,7 +70,7 @@ public async Task Test_BulkInsert_WithSameIds()
new Person() { Name = "Jeeva", Age = 22, NickNames = new[] { "Jee", "Jeev", "J" }, },
new Person() { Name = "Martin", Age = 60, NickNames = new[] { "Mart", "Mat", "tin" }, }
};
await collection.Insert(persons);
await collection.InsertAsync(persons);
var people = collection.Where(x => x.NickNames.Contains("Jeeva") || x.NickNames.Contains("Alie")).ToList();
Assert.False(people.First().Name == persons.First().Name); // this fails because the Name field of people doesn't contains the Name value Alice
}
Expand All @@ -82,7 +84,7 @@ public async Task Test_BulkInsert_HashesWith_Expiration()
new HashPerson() { Name = "Phineas", Age = 14, IsEngineer = true, TagField = "SummerVacation" }
};

await collection.Insert(PhineasFerb, TimeSpan.FromMilliseconds(8000));
await collection.InsertAsync(PhineasFerb, TimeSpan.FromMilliseconds(8000));
var ttl = (long)_connection.Execute("PTTL", PhineasFerb[0].GetKey());
Assert.True(ttl <= 8000);
Assert.True(ttl >= 1000);
Expand All @@ -97,7 +99,7 @@ public async Task Test_BulkInsert_WithExpiration()
new Person() { Name = "Phineas", Age = 14, IsEngineer = true, TagField = "SummerVacation", NickNames = new[] { "Phineas", "Triangle Head", "Phine" } }
};

await collection.Insert(PhineasFerb, TimeSpan.FromSeconds(8));
await collection.InsertAsync(PhineasFerb, TimeSpan.FromSeconds(8));
var ttl = (long)_connection.Execute("PTTL", PhineasFerb[0].GetKey());
Assert.True(ttl <= 8000);
Assert.True(ttl >= 1000);
Expand All @@ -114,13 +116,33 @@ public async Task Test_Bulk_Insert_Del()
new Person() { Name = "Perry", Age = 5, IsEngineer = false, TagField = "Agent", Address = new Address { State = "Tri-State Area "} }
};

await collection.Insert(PhineasFerbShow);
await collection.InsertAsync(PhineasFerbShow);
var searchByState = collection.Where(x => x.Address.State == "Tri-State Area").ToList();
await collection.DeleteAsync(searchByState);
var searchByTag = collection.FindById(searchByState[0].GetKey());
Assert.Null(searchByTag);
}

[Fact]
public async Task Test_Bulk_Insert_WithWhen()
{
var collection = new RedisCollection<Person>(_connection);
var PhineasFerbShow = new List<Person>() {
new Person() { Id="1", Name = "Ferb", Age = 14, IsEngineer = true, TagField = "SummerVacation" , Address = new Address { State = "Tri-State Area"} },
new Person() {Id="2", Name = "Phineas", Age = 14, IsEngineer = true, TagField = "SummerVacation", Address = new Address { State = "Tri-State Area"} },
new Person() {Id="3", Name = "Dr.Doofenshmirtz", Age = 38, IsEngineer = true, TagField = "Villain", Address = new Address { State = "Tri-State Area"} },
new Person() {Id="4", Name = "Perry", Age = 5, IsEngineer = false, TagField = "Agent", Address = new Address { State = "Tri-State Area "} }
};

var res = await collection.InsertAsync(PhineasFerbShow, WhenKey.NotExists, TimeSpan.FromMilliseconds(10000));
Assert.True(res.All(x => x != null));
Thread.Sleep(1100);
res = await collection.InsertAsync(PhineasFerbShow, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
Assert.True(res.All(x => x == null));
res = await collection.InsertAsync(PhineasFerbShow, WhenKey.Always, TimeSpan.FromMilliseconds(6000));
Assert.True(res.All(x => x != null));
}

[Fact]
public async Task Test_Bulk_InsertAsync_DelAsync_ForHashes()
{
Expand All @@ -132,7 +154,7 @@ public async Task Test_Bulk_InsertAsync_DelAsync_ForHashes()
new HashPerson() { Name = "Perry", Age = 5, IsEngineer = false, TagField = "Agent", Address = new Address { State = "Tri-State Area "} }
};

await collection.Insert(PhineasFerbShow);
await collection.InsertAsync(PhineasFerbShow);
var searchByName = await collection.Where(x => x.Name == "Dr.Doofenshmirtz" || x.Name == "Perry").ToListAsync();
await collection.DeleteAsync(searchByName);
var searchByTag = await collection.FindByIdAsync(searchByName[0].GetKey());
Expand All @@ -149,7 +171,7 @@ public async Task Test_Bulk_UpdateAsync()
new Person() { Name = "Monkey D. Garp", Age = 70, NickNames = new[] { "Garp", "Garps", "Hero of the Navy" }, TagField = "Navy" },
new Person() { Name = "Shanks", Age = 50, NickNames = new[] { "Shanks", "Red-Hair" }, TagField = "Red-Haired Pirates" }
};
var keys = await collection.Insert(onepiece);
var keys = await collection.InsertAsync(onepiece);
var people = collection.Where(x => x.NickNames.Contains("Luffy") || x.NickNames.Contains("Shanks")).ToList();
Assert.Equal(onepiece[0].Age, people[0].Age);
people[0].Age = 25;
Expand All @@ -168,7 +190,7 @@ public async Task Test_Bulk_UpdateSync_WithHashesNumeric()
new HashPerson() { Name = "Monkey D. Garp", Age = 70, NickNames = new List<string> { "Garp", "Garps", "Hero of the Navy" }, TagField = "Navy" },
new HashPerson() { Name = "Shanks", Age = 50, NickNames = new List<string> { "Shanks", "Red-Hair" }, TagField = "Red-Haired Pirates" }
};
var keys = collection.Insert(onepiece);
var keys = collection.InsertAsync(onepiece);
var people = collection.Where(x => x.Name.Contains("Luffy") || x.Name.Contains("Shanks")).ToList();
Assert.Equal(onepiece[0].Age, people[0].Age);
people[0].Height = 20.2;
Expand All @@ -178,7 +200,6 @@ public async Task Test_Bulk_UpdateSync_WithHashesNumeric()
Assert.NotEqual(onepiece[0].Age, people[0].Age);
}


[Fact]
public async Task Test_BulkUpdate_WithEmbbedObject()
{
Expand All @@ -189,11 +210,11 @@ public async Task Test_BulkUpdate_WithEmbbedObject()
new Person() { Name = "Monkey D. Garp", Age = 70, NickNames = new[] { "Garp", "Garps", "Hero of the Navy" }, TagField = "Navy" },
new Person() { Name = "Shanks", Age = 50, NickNames = new[] { "Shanks", "Red-Hair" }, TagField = "Red-Haired Pirates" }
};
var keys = collection.Insert(onepiece);
var keys = collection.InsertAsync(onepiece);
var people = collection.Where(x => x.NickNames.Contains("Luffy") || x.NickNames.Contains("Shanks")).ToList();
people[0].Address = new Address { City = "Goa Kingdom" };
people[1].Address = new Address { City = "Goa Kingdom" };
await collection.UpdateAsync(people);
await collection.UpdateAsync(people);
Assert.Contains(people, x => x.Name == onepiece.First().Name);
}

Expand All @@ -218,7 +239,7 @@ public async Task Test_Bulk50_Records_Insert_Update_Del_Async()
}
);
}
var keys = await collection.Insert(people); // 1000 records in an avg of 200ms.
var keys = await collection.InsertAsync(people); // 1000 records in an avg of 200ms.
var listofPeople = (await collection.FindByIdsAsync(keys)).Values.ToList();
for (int i = 0; i < keys.Count; i++)
{
Expand Down Expand Up @@ -248,14 +269,14 @@ public async Task TestBulk_Insert_Update_Del_Async_WithHashes()
new HashPerson() { Name = "Perry", Age = 5, IsEngineer = false, TagField = "Agent", Address = new Address { State = "Tri-State Area "} }
};

await collection.Insert(PhineasFerbShow);
var searchByName = await collection.Where(x => x.Name == "Dr.Doofenshmirtz" || x.Name == "Perry").ToListAsync();
await collection.InsertAsync(PhineasFerbShow);
var searchByName = await collection.Where(x => x.Name == "Dr.Doofenshmirtz" || x.Name == "Perry").ToListAsync();
searchByName[0].TagField = "Vacation";
searchByName[1].DepartmentNumber = 2;
await collection.UpdateAsync(searchByName);
await collection.DeleteAsync(searchByName);
var searchByTag = await collection.FindByIdAsync(searchByName[0].GetKey());
Assert.Null(searchByTag);
}
}
}
}
}