Skip to content

[DE-592] resume AQL cursor in transaction #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions core/src/main/java/com/arangodb/ArangoDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,19 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type);

/**
* Return an cursor from the given cursor-ID if still existing
*
* @param cursorId The ID of the cursor
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
* @param options options
* @return cursor of the results
* @see <a href=
* "https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-the-next-batch-from-a-cursor">API
* Documentation</a>
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, AqlQueryOptions options);

/**
* Return an cursor from the given cursor-ID if still existing
*
Expand All @@ -327,6 +340,20 @@ public interface ArangoDatabase extends ArangoSerdeAccessor {
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId);

/**
* Return an cursor from the given cursor-ID if still existing
*
* @param cursorId The ID of the cursor
* @param type The type of the result (POJO or {@link com.arangodb.util.RawData})
* @param nextBatchId The ID of the next cursor batch (set only if cursor allows retries, see
* {@link AqlQueryOptions#allowRetry(Boolean)}
* @param options options
* @return cursor of the results
* @see <a href= "https://docs.arangodb.com/stable/develop/http-api/queries/aql-queries/#read-the-next-batch-from-a-cursor">API Documentation</a>
* @since ArangoDB 3.11
*/
<T> ArangoCursor<T> cursor(String cursorId, Class<T> type, String nextBatchId, AqlQueryOptions options);

/**
* Explain an AQL query and return information about it
*
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/com/arangodb/ArangoDatabaseAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ public interface ArangoDatabaseAsync extends ArangoSerdeAccessor {

<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type);

<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, AqlQueryOptions options);

<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId);

<T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId, AqlQueryOptions options);

/**
* Asynchronous version of {@link ArangoDatabase#explainQuery(String, Map, AqlQueryExplainOptions)}
*/
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/com/arangodb/entity/CursorEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

/**
* @return the total number of result documents available (only available if the query was executed with the count
* attribute set)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,25 @@ public <T> CompletableFuture<ArangoCursorAsync<T>> query(String query, Class<T>

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type) {
return cursor(cursorId, type, null);
return cursor(cursorId, type, null, new AqlQueryOptions());
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, AqlQueryOptions options) {
return cursor(cursorId, type, null, options);
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
return cursor(cursorId, type, nextBatchId, new AqlQueryOptions());
}

@Override
public <T> CompletableFuture<ArangoCursorAsync<T>> cursor(String cursorId, Class<T> type, String nextBatchId, AqlQueryOptions options) {
options.allowRetry(nextBatchId != null);
HostHandle hostHandle = new HostHandle();
return executorAsync()
.execute(() ->
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
.execute(() -> queryNextRequest(cursorId, options, nextBatchId),
cursorEntityDeserializer(type),
hostHandle)
.thenApply(res -> new ArangoCursorAsyncImpl<>(this, res, type, hostHandle, nextBatchId != null));
Expand Down
21 changes: 16 additions & 5 deletions core/src/main/java/com/arangodb/internal/ArangoDatabaseImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,28 @@ public <T> ArangoCursor<T> query(final String query, final Class<T> type) {

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type) {
return cursor(cursorId, type, null);
return cursor(cursorId, type, null, new AqlQueryOptions());
}

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final AqlQueryOptions options) {
return cursor(cursorId, type, null, options);
}

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId) {
final HostHandle hostHandle = new HostHandle();
final CursorEntity<T> result = executorSync().execute(
queryNextRequest(cursorId, new AqlQueryOptions(), nextBatchId),
return cursor(cursorId, type, nextBatchId, new AqlQueryOptions());
}

@Override
public <T> ArangoCursor<T> cursor(final String cursorId, final Class<T> type, final String nextBatchId, final AqlQueryOptions options) {
options.allowRetry(nextBatchId != null);
HostHandle hostHandle = new HostHandle();
CursorEntity<T> result = executorSync().execute(
queryNextRequest(cursorId, options, nextBatchId),
cursorEntityDeserializer(type),
hostHandle);
return createCursor(result, type, null, hostHandle);
return createCursor(result, type, options, hostHandle);
}

private <T> ArangoCursor<T> createCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ArangoCursorAsyncImpl<T> extends InternalArangoCursor<T> implements

private final ArangoDatabaseAsyncImpl db;
private final HostHandle hostHandle;
private final CursorEntity<T> entity;

public ArangoCursorAsyncImpl(
final ArangoDatabaseAsyncImpl db,
Expand All @@ -28,13 +29,18 @@ public ArangoCursorAsyncImpl(
super(db, db.name(), entity, type, allowRetry);
this.db = db;
this.hostHandle = hostHandle;
this.entity = entity;
}

@Override
public CompletableFuture<ArangoCursorAsync<T>> nextBatch() {
if (Boolean.TRUE.equals(hasMore())) {
return executorAsync().execute(this::queryNextRequest, db.cursorEntityDeserializer(getType()), hostHandle)
.thenApply(r -> new ArangoCursorAsyncImpl<>(db, r, getType(), hostHandle, allowRetry()));
.thenApply(r -> {
// needed because the latest batch does not return the cursor id
r.setId(entity.getId());
return new ArangoCursorAsyncImpl<>(db, r, getType(), hostHandle, allowRetry());
});
} else {
CompletableFuture<ArangoCursorAsync<T>> cf = new CompletableFuture<>();
cf.completeExceptionally(new NoSuchElementException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,26 @@ void queryCursor(ArangoDatabaseAsync db) throws ExecutionException, InterruptedE
assertThat(result).containsExactly(1, 2, 3, 4);
}

@ParameterizedTest
@MethodSource("asyncDbs")
void queryCursorInTx(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()).get();
ArangoCursorAsync<Integer> c1 = db.query("for i in 1..4 return i", Integer.class,
new AqlQueryOptions().batchSize(1).streamTransactionId(tx.getId())).get();
List<Integer> result = new ArrayList<>();
result.addAll(c1.getResult());
ArangoCursorAsync<Integer> c2 = c1.nextBatch().get();
result.addAll(c2.getResult());
ArangoCursorAsync<Integer> c3 = db.cursor(c2.getId(), Integer.class,
new AqlQueryOptions().streamTransactionId(tx.getId())).get();
result.addAll(c3.getResult());
ArangoCursorAsync<Integer> c4 = c3.nextBatch().get();
result.addAll(c4.getResult());
assertThat(c4.hasMore()).isFalse();
assertThat(result).containsExactly(1, 2, 3, 4);
db.abortStreamTransaction(tx.getId()).get();
}

@ParameterizedTest
@MethodSource("asyncDbs")
void queryCursorRetry(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
Expand All @@ -797,6 +817,28 @@ void queryCursorRetry(ArangoDatabaseAsync db) throws ExecutionException, Interru
assertThat(result).containsExactly(1, 2, 3, 4);
}

@ParameterizedTest
@MethodSource("asyncDbs")
void queryCursorRetryInTx(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
assumeTrue(isAtLeastVersion(3, 11));
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions()).get();
ArangoCursorAsync<Integer> c1 = db.query("for i in 1..4 return i", Integer.class,
new AqlQueryOptions().batchSize(1).allowRetry(true).streamTransactionId(tx.getId())).get();
List<Integer> result = new ArrayList<>();
result.addAll(c1.getResult());
ArangoCursorAsync<Integer> c2 = c1.nextBatch().get();
result.addAll(c2.getResult());
ArangoCursorAsync<Integer> c3 = db.cursor(c2.getId(), Integer.class, c2.getNextBatchId(),
new AqlQueryOptions().streamTransactionId(tx.getId())).get();
result.addAll(c3.getResult());
ArangoCursorAsync<Integer> c4 = c3.nextBatch().get();
result.addAll(c4.getResult());
c4.close();
assertThat(c4.hasMore()).isFalse();
assertThat(result).containsExactly(1, 2, 3, 4);
db.abortStreamTransaction(tx.getId()).get();
}

@ParameterizedTest
@MethodSource("asyncDbs")
void changeQueryTrackingProperties(ArangoDatabaseAsync db) throws ExecutionException, InterruptedException {
Expand Down
40 changes: 40 additions & 0 deletions test-functional/src/test/java/com/arangodb/ArangoDatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,25 @@ void queryCursor(ArangoDatabase db) {
assertThat(result).containsExactly(1, 2, 3, 4);
}

@ParameterizedTest
@MethodSource("dbs")
void queryCursorInTx(ArangoDatabase db) {
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions());
ArangoCursor<Integer> cursor = db.query("for i in 1..4 return i", Integer.class,
new AqlQueryOptions().batchSize(1).streamTransactionId(tx.getId()));
List<Integer> result = new ArrayList<>();
result.add(cursor.next());
result.add(cursor.next());
ArangoCursor<Integer> cursor2 = db.cursor(cursor.getId(), Integer.class,
new AqlQueryOptions().streamTransactionId(tx.getId())
);
result.add(cursor2.next());
result.add(cursor2.next());
assertThat(cursor2.hasNext()).isFalse();
assertThat(result).containsExactly(1, 2, 3, 4);
db.abortStreamTransaction(tx.getId());
}

@ParameterizedTest
@MethodSource("dbs")
void queryCursorRetry(ArangoDatabase db) throws IOException {
Expand All @@ -865,6 +884,27 @@ void queryCursorRetry(ArangoDatabase db) throws IOException {
assertThat(result).containsExactly(1, 2, 3, 4);
}

@ParameterizedTest
@MethodSource("dbs")
void queryCursorRetryInTx(ArangoDatabase db) throws IOException {
assumeTrue(isAtLeastVersion(3, 11));
StreamTransactionEntity tx = db.beginStreamTransaction(new StreamTransactionOptions());
ArangoCursor<Integer> cursor = db.query("for i in 1..4 return i", Integer.class,
new AqlQueryOptions().batchSize(1).allowRetry(true).streamTransactionId(tx.getId()));
List<Integer> result = new ArrayList<>();
result.add(cursor.next());
result.add(cursor.next());
ArangoCursor<Integer> cursor2 = db.cursor(cursor.getId(), Integer.class, cursor.getNextBatchId(),
new AqlQueryOptions().streamTransactionId(tx.getId())
);
result.add(cursor2.next());
result.add(cursor2.next());
cursor2.close();
assertThat(cursor2.hasNext()).isFalse();
assertThat(result).containsExactly(1, 2, 3, 4);
db.abortStreamTransaction(tx.getId());
}

@ParameterizedTest
@MethodSource("dbs")
void changeQueryTrackingProperties(ArangoDatabase db) {
Expand Down