Skip to content

Commit 97be804

Browse files
authored
Fixing Parallel Update Issue (#295)
1 parent c1461c1 commit 97be804

File tree

8 files changed

+90
-35
lines changed

8 files changed

+90
-35
lines changed

src/Redis.OM/Modeling/RedisCollectionStateManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ internal IDictionary<string, IList<IObjectDiff>> DetectDifferences()
149149
var res = new Dictionary<string, IList<IObjectDiff>>();
150150
if (DocumentAttribute.StorageType == StorageType.Json)
151151
{
152-
foreach (var key in Snapshot.Keys)
152+
foreach (var key in Snapshot.Keys.ToArray())
153153
{
154154
if (Data.ContainsKey(key))
155155
{

src/Redis.OM/Redis.OM.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
<RootNamespace>Redis.OM</RootNamespace>
77
<Nullable>enable</Nullable>
88
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
9-
<PackageVersion>0.4.1</PackageVersion>
10-
<Version>0.4.1</Version>
11-
<PackageReleaseNotes>https://github.com/redis/redis-om-dotnet/releases/tag/v0.4.1</PackageReleaseNotes>
9+
<PackageVersion>0.4.2</PackageVersion>
10+
<Version>0.4.2</Version>
11+
<PackageReleaseNotes>https://github.com/redis/redis-om-dotnet/releases/tag/v0.4.2</PackageReleaseNotes>
1212
<Description>Object Mapping and More for Redis</Description>
1313
<Title>Redis OM</Title>
1414
<Authors>Steve Lorello</Authors>

src/Redis.OM/RedisCommands.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -652,9 +652,9 @@ public static async Task<IDictionary<string, string>> HGetAllAsync(this IRedisCo
652652
/// <exception cref="ArgumentException">Thrown if the script cannot be resolved either the script is empty or the script name has not been encountered.</exception>
653653
public static async Task<int?> CreateAndEvalAsync(this IRedisConnection connection, string scriptName, string[] keys, string[] argv, string fullScript = "")
654654
{
655-
if (!Scripts.ShaCollection.ContainsKey(scriptName))
655+
string sha;
656+
if (!Scripts.ShaCollection.TryGetValue(scriptName, out sha))
656657
{
657-
string sha;
658658
if (Scripts.ScriptCollection.ContainsKey(scriptName))
659659
{
660660
sha = await connection.ExecuteAsync("SCRIPT", "LOAD", Scripts.ScriptCollection[scriptName]);
@@ -673,7 +673,7 @@ public static async Task<IDictionary<string, string>> HGetAllAsync(this IRedisCo
673673

674674
var args = new List<string>
675675
{
676-
Scripts.ShaCollection[scriptName],
676+
sha,
677677
keys.Count().ToString(),
678678
};
679679
args.AddRange(keys);

src/Redis.OM/RedisObjectHandler.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,37 @@ internal static string GetKey(this object obj)
160160
return sb.ToString();
161161
}
162162

163+
/// <summary>
164+
/// Attempts to pull the key out of the object, returns false if it fails.
165+
/// </summary>
166+
/// <param name="obj">The object to pull the key out of.</param>
167+
/// <param name="key">The key out param.</param>
168+
/// <returns>True of a key was parsed, false if not.</returns>
169+
internal static bool TryGetKey(this object obj, out string? key)
170+
{
171+
key = null;
172+
var type = obj.GetType();
173+
var documentAttribute = type.GetCustomAttribute(typeof(DocumentAttribute)) as DocumentAttribute;
174+
175+
if (documentAttribute == null)
176+
{
177+
return false;
178+
}
179+
180+
var id = obj.GetId();
181+
if (string.IsNullOrEmpty(id))
182+
{
183+
return false;
184+
}
185+
186+
var sb = new StringBuilder();
187+
sb.Append(GetKeyPrefix(type));
188+
sb.Append(":");
189+
sb.Append(id);
190+
key = sb.ToString();
191+
return true;
192+
}
193+
163194
/// <summary>
164195
/// Generates the key prefix for the given type and id.
165196
/// </summary>

src/Redis.OM/Scripts.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System.Collections.Concurrent;
2+
using System.Collections.Generic;
23

34
namespace Redis.OM
45
{
@@ -165,6 +166,6 @@ local second_op
165166
/// <summary>
166167
/// Gets or sets collection of SHAs.
167168
/// </summary>
168-
internal static Dictionary<string, string> ShaCollection { get; set; } = new ();
169+
internal static ConcurrentDictionary<string, string> ShaCollection { get; set; } = new ();
169170
}
170171
}

src/Redis.OM/Searching/RedisCollection.cs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,13 @@ public async Task UpdateAsync(T item)
195195
/// <inheritdoc />
196196
public async ValueTask UpdateAsync(IEnumerable<T> items)
197197
{
198-
var tasks = items.Select(UpdateAsync);
198+
var tasks = items.Select(UpdateAsyncNoSave);
199199

200200
await Task.WhenAll(tasks);
201+
foreach (var kvp in tasks.Select(x => x.Result))
202+
{
203+
SaveToStateManager(kvp.Key, kvp.Value);
204+
}
201205
}
202206

203207
/// <inheritdoc />
@@ -518,16 +522,17 @@ public T Single(Expression<Func<T, bool>> expression)
518522
var tasks = new Dictionary<string, Task<T?>>();
519523
foreach (var id in ids.Distinct())
520524
{
521-
tasks.Add(id, FindByIdAsync(id));
525+
tasks.Add(id, FindByIdAsyncNoSave(id));
522526
}
523527

524528
await Task.WhenAll(tasks.Values);
525529
var result = tasks.ToDictionary(x => x.Key, x => x.Value.Result);
526530
foreach (var res in result)
527531
{
528-
if (res.Value != null)
532+
string? key;
533+
if (res.Value != null && res.Value.TryGetKey(out key) && key != null)
529534
{
530-
SaveToStateManager(res.Value.GetKey(), res.Value);
535+
SaveToStateManager((string)key, res.Value);
531536
}
532537
}
533538

@@ -726,6 +731,40 @@ private static MethodInfo GetMethodInfo<T1, T2>(Func<T1, T2> f, T1 unused)
726731
return f.Method;
727732
}
728733

734+
private Task<T?> FindByIdAsyncNoSave(string id)
735+
{
736+
var prefix = typeof(T).GetKeyPrefix();
737+
string key = id.Contains(prefix) ? id : $"{prefix}:{id}";
738+
return _connection.GetAsync<T>(key).AsTask();
739+
}
740+
741+
private async Task<KeyValuePair<string, T>> UpdateAsyncNoSave(T item)
742+
{
743+
var key = item.GetKey();
744+
IList<IObjectDiff>? diff;
745+
var diffConstructed = StateManager.TryDetectDifferencesSingle(key, item, out diff);
746+
if (diffConstructed)
747+
{
748+
if (diff!.Any())
749+
{
750+
var args = new List<string>();
751+
var scriptName = diff!.First().Script;
752+
foreach (var update in diff!)
753+
{
754+
args.AddRange(update.SerializeScriptArgs());
755+
}
756+
757+
await _connection.CreateAndEvalAsync(scriptName, new[] { key }, args.ToArray());
758+
}
759+
}
760+
else
761+
{
762+
await _connection.UnlinkAndSetAsync(key, item, StateManager.DocumentAttribute.StorageType);
763+
}
764+
765+
return new KeyValuePair<string, T>(key, item);
766+
}
767+
729768
private void Initialize(RedisQueryProvider provider, Expression? expression, Expression<Func<T, bool>>? booleanExpression)
730769
{
731770
if (expression != null && !typeof(IQueryable<T>).IsAssignableFrom(expression.Type))

test/Redis.OM.Unit.Tests/CoreTests.cs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -255,17 +255,13 @@ public async Task SimpleHashInsertWhenAsync()
255255

256256
await connection.UnlinkAsync(key);
257257
await collection.InsertAsync(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
258-
var expiration = (long)await connection.ExecuteAsync("PTTL", key);
259-
Assert.True(expiration>4000);
260258
await Task.Delay(1000);
261259
res = await collection.InsertAsync(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
262260
Assert.Null(res);
263-
expiration = (long)await connection.ExecuteAsync("PTTL", key);
264-
Assert.True(expiration < 4000);
261+
var expiration = (long)await connection.ExecuteAsync("PTTL", key);
262+
Assert.True(expiration < 4000, $"Expiration is {expiration}");
265263
res = await collection.InsertAsync(obj, WhenKey.Exists, TimeSpan.FromMilliseconds(5000));
266-
expiration = (long)await connection.ExecuteAsync("PTTL", key);
267264
Assert.NotNull(res);
268-
Assert.True(expiration > 4000);
269265

270266
await connection.UnlinkAsync(key);
271267
res = await collection.InsertAsync(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
@@ -303,17 +299,13 @@ public void SimpleHashInsertWhen()
303299

304300
connection.Unlink(key);
305301
collection.Insert(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
306-
var expiration = (long)connection.Execute("PTTL", key);
307-
Assert.True(expiration>4000);
308302
Thread.Sleep(1100);
309303
res = collection.Insert(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
310304
Assert.Null(res);
311-
expiration = (long)connection.Execute("PTTL", key);
305+
var expiration = (long)connection.Execute("PTTL", key);
312306
Assert.True(expiration < 4000);
313307
res = collection.Insert(obj, WhenKey.Exists, TimeSpan.FromMilliseconds(5000));
314-
expiration = (long)connection.Execute("PTTL", key);
315308
Assert.NotNull(res);
316-
Assert.True(expiration > 4000);
317309

318310
connection.Unlink(key);
319311
res = collection.Insert(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
@@ -350,14 +342,12 @@ public async Task SimpleJsonInsertWhenAsync()
350342
var k2 = await collection.InsertAsync(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
351343
Assert.NotNull(k2);
352344
Assert.Equal(key, k2);
353-
var expiration = (long)await connection.ExecuteAsync("PTTL", key);
354345
Assert.Equal(key.Split(":")[1], obj.Id);
355-
Assert.True(expiration>4000);
356346
await Task.Delay(1000);
357347
Assert.True(connection.Execute("EXISTS", key) == 1, $"Expected: {key} to exist, it did not.");
358348
res = await collection.InsertAsync(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
359349
Assert.Null(res);
360-
expiration = (long)await connection.ExecuteAsync("PTTL", key);
350+
var expiration = (long)await connection.ExecuteAsync("PTTL", key);
361351
Assert.True(expiration < 4000);
362352
res = await collection.InsertAsync(obj, WhenKey.Exists, TimeSpan.FromMilliseconds(5000));
363353
expiration = (long)await connection.ExecuteAsync("PTTL", key);
@@ -402,21 +392,15 @@ public void SimpleJsonInsertWhen()
402392

403393
connection.Unlink(key);
404394
collection.Insert(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
405-
var expiration = (long)connection.Execute("PTTL", key);
406-
Assert.True(expiration>4000);
407395
Thread.Sleep(1100);
408396
res = collection.Insert(obj, WhenKey.NotExists, TimeSpan.FromMilliseconds(5000));
409397
Assert.Null(res);
410-
expiration = (long)connection.Execute("PTTL", key);
398+
var expiration = (long)connection.Execute("PTTL", key);
411399
Assert.True(expiration < 4000, $"Expiration: {expiration}");
412400
res = collection.Insert(obj, WhenKey.Exists, TimeSpan.FromMilliseconds(5000));
413-
expiration = (long)connection.Execute("PTTL", key);
414401
Assert.NotNull(res);
415-
Assert.True(expiration > 4000);
416402
res = collection.Insert(obj, WhenKey.Always, TimeSpan.FromMilliseconds(6000));
417-
expiration = (long)connection.Execute("PTTL", key);
418403
Assert.NotNull(res);
419-
Assert.True(expiration>5000);
420404
res = collection.Insert(obj, WhenKey.Always);
421405
expiration = (long)connection.Execute("PTTL", key);
422406
Assert.NotNull(res);

test/Redis.OM.Unit.Tests/RediSearchTests/BulkOperationsTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public async Task Test_Bulk_UpdateSync_WithHashesNumeric()
174174
people[0].Height = 20.2;
175175
people[0].Age = 25;
176176
people[1].Age = 52;
177-
await collection.UpdateAsync(people);
177+
await collection.UpdateAsync(people);
178178
Assert.NotEqual(onepiece[0].Age, people[0].Age);
179179
}
180180

0 commit comments

Comments
 (0)