Skip to content

Move invalidation of caches to ActionQueue #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/NHibernate.Test/SecondLevelCacheTest/InvalidationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ protected override void Configure(Configuration configuration)
public void InvalidatesEntities()
{
var cache = Substitute.For<UpdateTimestampsCache>(Sfi.Settings, new Dictionary<string, string>());
((SessionFactoryImpl) (Sfi as DebugSessionFactory).ActualFactory).SetPropertyUsingReflection(x => x.UpdateTimestampsCache, cache);
((SessionFactoryImpl) (Sfi as DebugSessionFactory).ActualFactory).SetPropertyUsingReflection(
x => x.UpdateTimestampsCache,
cache);

var items = new List<Item>();
using (ISession session = OpenSession())
Expand All @@ -40,9 +42,10 @@ public void InvalidatesEntities()
{
foreach (var i in Enumerable.Range(1, 10))
{
var item = new Item { Id = i };
var item = new Item {Id = i};
session.Save(item);
}

tx.Commit();
}

Expand All @@ -53,6 +56,7 @@ public void InvalidatesEntities()
var item = session.Get<Item>(i);
item.Name = item.Id.ToString();
}

tx.Commit();
}

Expand All @@ -63,13 +67,14 @@ public void InvalidatesEntities()
var item = session.Get<Item>(i);
session.Delete(item);
}

tx.Commit();
}
}
//Should receive one preinvalidation and one invalidation per commit
cache.Received(3).PreInvalidate(Arg.Is<object[]>(x => x.Length==1 && (string)x[0] == "Item"));
cache.Received(3).Invalidate(Arg.Is<object[]>(x => x.Length == 1 && (string) x[0] == "Item"));

//Should receive one preinvalidation and one invalidation per commit
cache.Received(3).PreInvalidate(Arg.Is<IReadOnlyCollection<object>>(x => x.Count == 1 && (string) x.First() == "Item"));
cache.Received(3).Invalidate(Arg.Is<IReadOnlyCollection<object>>(x => x.Count == 1 && (string) x.First() == "Item"));
}

public void CleanUp()
Expand Down
34 changes: 27 additions & 7 deletions src/NHibernate/Async/Cache/UpdateTimestampsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,57 @@ public Task ClearAsync(CancellationToken cancellationToken)
return updateTimestamps.ClearAsync(cancellationToken);
}

public virtual Task PreInvalidateAsync(object[] spaces, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<object>(cancellationToken);
}
return PreInvalidateAsync((IReadOnlyList<object>) spaces, cancellationToken);
}

[MethodImpl()]
public virtual async Task PreInvalidateAsync(object[] spaces, CancellationToken cancellationToken)
public virtual async Task PreInvalidateAsync(IReadOnlyCollection<object> spaces, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (await _preInvalidate.LockAsync())
{
//TODO: to handle concurrent writes correctly, this should return a Lock to the client
long ts = updateTimestamps.NextTimestamp() + updateTimestamps.Timeout;
for (int i = 0; i < spaces.Length; i++)
foreach (var space in spaces)
{
await (updateTimestamps.PutAsync(spaces[i], ts, cancellationToken)).ConfigureAwait(false);
await (updateTimestamps.PutAsync(space, ts, cancellationToken)).ConfigureAwait(false);
}

//TODO: return new Lock(ts);
}

//TODO: return new Lock(ts);
}

/// <summary></summary>
public Task InvalidateAsync(object[] spaces, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<object>(cancellationToken);
}
return InvalidateAsync((IReadOnlyList<object>) spaces, cancellationToken);
}

[MethodImpl()]
public virtual async Task InvalidateAsync(object[] spaces, CancellationToken cancellationToken)
public virtual async Task InvalidateAsync(IReadOnlyCollection<object> spaces, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (await _invalidate.LockAsync())
{
//TODO: to handle concurrent writes correctly, the client should pass in a Lock
long ts = updateTimestamps.NextTimestamp();
//TODO: if lock.getTimestamp().equals(ts)
for (int i = 0; i < spaces.Length; i++)
foreach (var space in spaces)
{
log.Debug(string.Format("Invalidating space [{0}]", spaces[i]));
await (updateTimestamps.PutAsync(spaces[i], ts, cancellationToken)).ConfigureAwait(false);
log.Debug(string.Format("Invalidating space [{0}]", space));
await (updateTimestamps.PutAsync(space, ts, cancellationToken)).ConfigureAwait(false);
}
}
}
Expand Down
89 changes: 41 additions & 48 deletions src/NHibernate/Async/Engine/ActionQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ private async Task ExecuteActionsAsync(IList list, CancellationToken cancellatio
}
list.Clear();
await (session.Batcher.ExecuteBatchAsync(cancellationToken)).ConfigureAwait(false);

}

private async Task AfterExecutionsAsync(CancellationToken cancellationToken)
private Task PreInvalidateCachesAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (session.Factory.Settings.IsQueryCacheEnabled)
if (cancellationToken.IsCancellationRequested)
{
var spaces = executedSpaces.ToArray();
afterTransactionProcesses.AddSpacesToInvalidate(spaces);
await (session.Factory.UpdateTimestampsCache.PreInvalidateAsync(spaces, cancellationToken)).ConfigureAwait(false);
return Task.FromCanceled<object>(cancellationToken);
}
try
{
if (session.Factory.Settings.IsQueryCacheEnabled)
{
return session.Factory.UpdateTimestampsCache.PreInvalidateAsync(executedSpaces, cancellationToken);
}
return Task.CompletedTask;
}
catch (Exception ex)
{
return Task.FromException<object>(ex);
}
executedSpaces.Clear();
}

public async Task ExecuteAsync(IExecutable executable, CancellationToken cancellationToken)
Expand All @@ -62,23 +69,23 @@ public async Task ExecuteAsync(IExecutable executable, CancellationToken cancell
}
finally
{
await (AfterExecutionsAsync(cancellationToken)).ConfigureAwait(false);
await (PreInvalidateCachesAsync(cancellationToken)).ConfigureAwait(false);
}
}

private async Task InnerExecuteAsync(IExecutable executable, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (executable.PropertySpaces != null)
{
executedSpaces.UnionWith(executable.PropertySpaces);
}
try
{
await (executable.ExecuteAsync(cancellationToken)).ConfigureAwait(false);
}
finally
{
if (executable.PropertySpaces != null)
{
executedSpaces.UnionWith(executable.PropertySpaces);
}
RegisterCleanupActions(executable);
}
}
Expand All @@ -96,7 +103,7 @@ public async Task ExecuteInsertsAsync(CancellationToken cancellationToken)
}
finally
{
await (AfterExecutionsAsync(cancellationToken)).ConfigureAwait(false);
await (PreInvalidateCachesAsync(cancellationToken)).ConfigureAwait(false);
}
}

Expand All @@ -118,11 +125,11 @@ public async Task ExecuteActionsAsync(CancellationToken cancellationToken)
}
finally
{
await (AfterExecutionsAsync(cancellationToken)).ConfigureAwait(false);
await (PreInvalidateCachesAsync(cancellationToken)).ConfigureAwait(false);
}
}

private async Task PrepareActionsAsync(IList queue, CancellationToken cancellationToken)
private static async Task PrepareActionsAsync(IList queue, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IExecutable executable in queue)
Expand All @@ -140,7 +147,7 @@ public async Task PrepareActionsAsync(CancellationToken cancellationToken)
await (PrepareActionsAsync(collectionUpdates, cancellationToken)).ConfigureAwait(false);
await (PrepareActionsAsync(collectionCreations, cancellationToken)).ConfigureAwait(false);
}

/// <summary>
/// Performs cleanup of any held cache softlocks.
/// </summary>
Expand All @@ -152,41 +159,27 @@ public Task AfterTransactionCompletionAsync(bool success, CancellationToken canc
{
return Task.FromCanceled<object>(cancellationToken);
}
return afterTransactionProcesses.AfterTransactionCompletionAsync(success, cancellationToken);
try
{
afterTransactionProcesses.AfterTransactionCompletion(success);

return InvalidateCachesAsync(cancellationToken);
}
catch (Exception ex)
{
return Task.FromException<object>(ex);
}
}
private partial class AfterTransactionCompletionProcessQueue

private async Task InvalidateCachesAsync(CancellationToken cancellationToken)
{

public async Task AfterTransactionCompletionAsync(bool success, CancellationToken cancellationToken)
cancellationToken.ThrowIfCancellationRequested();
if (session.Factory.Settings.IsQueryCacheEnabled)
{
cancellationToken.ThrowIfCancellationRequested();
int size = processes.Count;

for (int i = 0; i < size; i++)
{
try
{
AfterTransactionCompletionProcessDelegate process = processes[i];
process(success);
}
catch (CacheException e)
{
log.Error( "could not release a cache lock", e);
// continue loop
}
catch (Exception e)
{
throw new AssertionFailure("Unable to perform AfterTransactionCompletion callback", e);
}
}
processes.Clear();

if (session.Factory.Settings.IsQueryCacheEnabled)
{
await (session.Factory.UpdateTimestampsCache.InvalidateAsync(querySpacesToInvalidate.ToArray(), cancellationToken)).ConfigureAwait(false);
}
querySpacesToInvalidate.Clear();
await (session.Factory.UpdateTimestampsCache.InvalidateAsync(executedSpaces, cancellationToken)).ConfigureAwait(false);
}

executedSpaces.Clear();
}
}
}
25 changes: 18 additions & 7 deletions src/NHibernate/Cache/UpdateTimestampsCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,40 @@ public UpdateTimestampsCache(Settings settings, IDictionary<string, string> prop
updateTimestamps = settings.CacheProvider.BuildCache(regionName, props);
}

public void PreInvalidate(object[] spaces)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make obsolete?

{
PreInvalidate((IReadOnlyList<object>) spaces);
}

[MethodImpl(MethodImplOptions.Synchronized)]
public virtual void PreInvalidate(object[] spaces)
public virtual void PreInvalidate(IReadOnlyCollection<object> spaces)
{
//TODO: to handle concurrent writes correctly, this should return a Lock to the client
long ts = updateTimestamps.NextTimestamp() + updateTimestamps.Timeout;
for (int i = 0; i < spaces.Length; i++)
foreach (var space in spaces)
{
updateTimestamps.Put(spaces[i], ts);
updateTimestamps.Put(space, ts);
}

//TODO: return new Lock(ts);
}

/// <summary></summary>
public void Invalidate(object[] spaces)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make obsolete?

{
Invalidate((IReadOnlyList<object>) spaces);
}

[MethodImpl(MethodImplOptions.Synchronized)]
public virtual void Invalidate(object[] spaces)
public virtual void Invalidate(IReadOnlyCollection<object> spaces)
{
//TODO: to handle concurrent writes correctly, the client should pass in a Lock
long ts = updateTimestamps.NextTimestamp();
//TODO: if lock.getTimestamp().equals(ts)
for (int i = 0; i < spaces.Length; i++)
foreach (var space in spaces)
{
log.Debug(string.Format("Invalidating space [{0}]", spaces[i]));
updateTimestamps.Put(spaces[i], ts);
log.Debug(string.Format("Invalidating space [{0}]", space));
updateTimestamps.Put(space, ts);
}
}

Expand Down
Loading