18
18
import static org .neo4j .cypherdsl .core .Cypher .asterisk ;
19
19
import static org .neo4j .cypherdsl .core .Cypher .parameter ;
20
20
21
- import org .neo4j .cypherdsl .core .Cypher ;
22
21
import reactor .core .publisher .Flux ;
23
22
import reactor .core .publisher .Mono ;
24
23
import reactor .util .function .Tuple2 ;
36
35
import org .apache .commons .logging .LogFactory ;
37
36
import org .apiguardian .api .API ;
38
37
import org .neo4j .cypherdsl .core .Condition ;
38
+ import org .neo4j .cypherdsl .core .Cypher ;
39
39
import org .neo4j .cypherdsl .core .Functions ;
40
40
import org .neo4j .cypherdsl .core .Statement ;
41
41
import org .neo4j .cypherdsl .core .renderer .Renderer ;
@@ -223,7 +223,8 @@ public <T> Mono<T> save(T instance) {
223
223
private <T > Mono <T > saveImpl (T instance , @ Nullable String inDatabase ) {
224
224
225
225
Neo4jPersistentEntity entityMetaData = neo4jMappingContext .getPersistentEntity (instance .getClass ());
226
- return Mono .just (instance ).flatMap (eventSupport ::maybeCallBeforeBind )
226
+ return Mono .just (entityMetaData .isNew (instance ))
227
+ .flatMap (isNewEntity -> Mono .just (instance ).flatMap (eventSupport ::maybeCallBeforeBind )
227
228
.flatMap (entity -> determineDynamicLabels (entity , entityMetaData , inDatabase )).flatMap (t -> {
228
229
T entity = t .getT1 ();
229
230
DynamicLabels dynamicLabels = t .getT2 ();
@@ -240,17 +241,19 @@ private <T> Mono<T> saveImpl(T instance, @Nullable String inDatabase) {
240
241
}));
241
242
242
243
if (!entityMetaData .isUsingInternalIds ()) {
243
- return idMono .then (processRelations (entityMetaData , entity , inDatabase )).thenReturn (entity );
244
+ return idMono .then (processRelations (entityMetaData , entity , isNewEntity , inDatabase ))
245
+ .thenReturn (entity );
244
246
} else {
245
247
return idMono .map (internalId -> {
246
248
PersistentPropertyAccessor <T > propertyAccessor = entityMetaData .getPropertyAccessor (entity );
247
249
propertyAccessor .setProperty (entityMetaData .getRequiredIdProperty (), internalId );
248
250
249
251
return propertyAccessor .getBean ();
250
252
}).flatMap (
251
- savedEntity -> processRelations (entityMetaData , savedEntity , inDatabase ).thenReturn (savedEntity ));
253
+ savedEntity -> processRelations (entityMetaData , savedEntity , isNewEntity , inDatabase )
254
+ .thenReturn (savedEntity ));
252
255
}
253
- });
256
+ })) ;
254
257
}
255
258
256
259
private <T > Mono <Tuple2 <T , DynamicLabels >> determineDynamicLabels (T entityToBeSaved ,
@@ -302,12 +305,12 @@ public <T> Flux<T> saveAll(Iterable<T> instances) {
302
305
}
303
306
304
307
Function <T , Map <String , Object >> binderFunction = neo4jMappingContext .getRequiredBinderFunctionFor (domainClass );
308
+ String isNewIndicatorKey = "isNewIndicator" ;
305
309
return getDatabaseName ().flatMapMany (databaseName -> Flux .fromIterable (entities )
306
310
.flatMap (eventSupport ::maybeCallBeforeBind ).collectList ().flatMapMany (entitiesToBeSaved -> Mono .defer (() -> {
307
311
// Defer the actual save statement until the previous flux completes
308
312
List <Map <String , Object >> boundedEntityList = entitiesToBeSaved .stream ().map (binderFunction )
309
313
.collect (Collectors .toList ());
310
-
311
314
return neo4jClient
312
315
.query (() -> renderer .render (cypherGenerator .prepareSaveOfMultipleInstancesOf (entityMetaData )))
313
316
.in (databaseName .getValue ()).bind (boundedEntityList ).to (Constants .NAME_OF_ENTITY_LIST_PARAM ).run ();
@@ -317,7 +320,23 @@ public <T> Flux<T> saveAll(Iterable<T> instances) {
317
320
"Created %d and deleted %d nodes, created %d and deleted %d relationships and set %d properties." ,
318
321
counters .nodesCreated (), counters .nodesDeleted (), counters .relationshipsCreated (),
319
322
counters .relationshipsDeleted (), counters .propertiesSet ()));
320
- }).thenMany (Flux .fromIterable (entitiesToBeSaved ))));
323
+ }).thenMany (
324
+ Flux .deferContextual (ctx -> {
325
+ List <Boolean > isNewIndicator = ctx .get (isNewIndicatorKey );
326
+ return Flux .fromIterable (entitiesToBeSaved )
327
+ .index ()
328
+ .flatMap (t -> {
329
+ T entityToBeSaved = t .getT2 ();
330
+ boolean isNew = isNewIndicator .get (Math .toIntExact (t .getT1 ()));
331
+ return processRelations (entityMetaData , entityToBeSaved , isNew ,
332
+ databaseName .getValue ())
333
+ .then (Mono .just (entityToBeSaved ));
334
+ }
335
+ );
336
+ })
337
+ )))
338
+ .contextWrite (ctx -> ctx .put (isNewIndicatorKey , entities .stream ()
339
+ .map (entity -> entityMetaData .isNew (entity )).collect (Collectors .toList ())));
321
340
}
322
341
323
342
@ Override
@@ -414,14 +433,14 @@ private <T> Mono<ExecutableQuery<T>> createExecutableQuery(Class<T> domainType,
414
433
}
415
434
416
435
private Mono <Void > processRelations (Neo4jPersistentEntity <?> neo4jPersistentEntity , Object parentObject ,
417
- @ Nullable String inDatabase ) {
436
+ boolean isParentObjectNew , @ Nullable String inDatabase ) {
418
437
419
- return processNestedRelations (neo4jPersistentEntity , parentObject , inDatabase ,
438
+ return processNestedRelations (neo4jPersistentEntity , parentObject , isParentObjectNew , inDatabase ,
420
439
new NestedRelationshipProcessingStateMachine ());
421
440
}
422
441
423
442
private Mono <Void > processNestedRelations (Neo4jPersistentEntity <?> neo4jPersistentEntity , Object parentObject ,
424
- @ Nullable String inDatabase , NestedRelationshipProcessingStateMachine stateMachine ) {
443
+ boolean isParentObjectNew , @ Nullable String inDatabase , NestedRelationshipProcessingStateMachine stateMachine ) {
425
444
426
445
return Mono .defer (() -> {
427
446
PersistentPropertyAccessor <?> propertyAccessor = neo4jPersistentEntity .getPropertyAccessor (parentObject );
@@ -448,7 +467,7 @@ private Mono<Void> processNestedRelations(Neo4jPersistentEntity<?> neo4jPersiste
448
467
449
468
// remove all relationships before creating all new if the entity is not new
450
469
// this avoids the usage of cache but might have significant impact on overall performance
451
- if (!neo4jPersistentEntity . isNew ( parentObject ) ) {
470
+ if (!isParentObjectNew ) {
452
471
Neo4jPersistentEntity <?> previouslyRelatedPersistentEntity = neo4jMappingContext
453
472
.getPersistentEntity (relationshipContext .getAssociationTargetType ());
454
473
@@ -475,7 +494,8 @@ private Mono<Void> processNestedRelations(Neo4jPersistentEntity<?> neo4jPersiste
475
494
.flatMap (valueToBeSaved -> {
476
495
Neo4jPersistentEntity <?> targetNodeDescription = neo4jMappingContext
477
496
.getPersistentEntity (valueToBeSavedPreEvt .getClass ());
478
- return saveRelatedNode (valueToBeSaved , relationshipContext .getAssociationTargetType (),
497
+ return Mono .just (targetNodeDescription .isNew (valueToBeSaved )).flatMap (isNew ->
498
+ saveRelatedNode (valueToBeSaved , relationshipContext .getAssociationTargetType (),
479
499
targetNodeDescription , inDatabase ).flatMap (relatedInternalId -> {
480
500
481
501
// if an internal id is used this must get set to link this entity in the next iteration
@@ -498,11 +518,12 @@ private Mono<Void> processNestedRelations(Neo4jPersistentEntity<?> neo4jPersiste
498
518
499
519
if (processState != ProcessState .PROCESSED_ALL_VALUES ) {
500
520
return relationshipCreationMonoNested .checkpoint ().then (
501
- processNestedRelations (targetNodeDescription , valueToBeSaved , inDatabase , stateMachine ));
521
+ processNestedRelations (targetNodeDescription , valueToBeSaved ,
522
+ isNew , inDatabase , stateMachine ));
502
523
} else {
503
524
return relationshipCreationMonoNested .checkpoint ().then ();
504
525
}
505
- }).checkpoint ();
526
+ }).checkpoint ()) ;
506
527
});
507
528
relationshipCreationMonos .add (createRelationship );
508
529
}
0 commit comments