Skip to content

deal with exceptions in asynchronous API tasks #3855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 1, 2022
112 changes: 99 additions & 13 deletions opengrok-web/src/main/java/org/opengrok/web/api/ApiTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
import jakarta.ws.rs.core.Response;
import org.opengrok.indexer.logger.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -36,7 +41,7 @@ public class ApiTask {

private static final Logger LOGGER = LoggerFactory.getLogger(ApiTask.class);

private final Runnable runnable;
private final Callable<Object> callable;

enum ApiTaskState {
INITIAL,
Expand All @@ -50,57 +55,137 @@ enum ApiTaskState {

private final Response.Status responseStatus;

private Future<Object> future;

private final Map<Class<?>, Response.Status> exceptionStatusMap = new HashMap<>();

/**
* @param path request path (for identification)
* @param runnable Runnable object
* @param callable Callable object
*/
public ApiTask(String path, Runnable runnable) {
this(path, runnable, Response.Status.OK);
public ApiTask(String path, Callable<Object> callable) {
this(path, callable, Response.Status.OK);
}

/**
* @param path request path (for identification)
* @param runnable Runnable object
* @param callable Callable object
* @param status request status to return after the runnable is done
*/
public ApiTask(String path, Runnable runnable, Response.Status status) {
this.runnable = runnable;
public ApiTask(String path, Callable<Object> callable, Response.Status status) {
this(path, callable, status, null);
}

/**
* @param path request path (for identification)
* @param callable Callable object
* @param status request status to return after the runnable is done
* @param exceptionStatusMap map of {@link Exception} to {@link Response.Status}
*/
public ApiTask(String path, Callable<Object> callable, Response.Status status,
Map<Class<?>, Response.Status> exceptionStatusMap) {
this.callable = callable;
this.uuid = UUID.randomUUID();
this.responseStatus = status;
this.path = path;
this.state = ApiTaskState.INITIAL;
if (exceptionStatusMap != null) {
this.exceptionStatusMap.putAll(exceptionStatusMap);
}
}

/**
* The UUID is randomly generated in the constructor.
* @return UUID
*/
public UUID getUuid() {
return uuid;
}

/**
* @return response status
* @return response status to be used when the task was successfully completed
*/
public Response.Status getResponseStatus() {
Response.Status getResponseStatus() {
return responseStatus;
}

/**
* Set status as submitted.
*/
public void setSubmitted() {
void setSubmitted() {
state = ApiTaskState.SUBMITTED;
}

/**
* @return whether the API task successfully completed
*/
public boolean isCompleted() {
return state.equals(ApiTaskState.COMPLETED);
}

public void setCompleted() {
void setCompleted() {
state = ApiTaskState.COMPLETED;
}

/**
* @param future Future object used for tracking the progress of the API task
*/
void setFuture(Future<Object> future) {
this.future = future;
}

/**
* @return whether the task is finished (normally or with exception)
*/
public boolean isDone() {
if (future != null) {
return future.isDone();
} else {
return false;
}
}

/**
* Provides simple Exception to status code mapping. The Exception match is exact, i.e. exception class hierarchy
* is not considered.
* @param exception Exception
* @return Response status
*/
private Response.Status mapExceptionToStatus(ExecutionException exception) {
return exceptionStatusMap.getOrDefault(exception.getCause().getClass(), Response.Status.INTERNAL_SERVER_ERROR);
}

/**
* This method assumes that the API task is finished.
* @return response object corresponding to the state of the API task
* @throws IllegalStateException if the API task is not finished
*/
public Response getResponse() {
// Avoid thread being blocked in future.get() below.
if (!isDone()) {
throw new IllegalStateException(String.format("task %s not yet done", this));
}

Object obj;
try {
obj = future.get();
} catch (InterruptedException ex) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
} catch (ExecutionException ex) {
return Response.status(mapExceptionToStatus(ex)).entity(ex.toString()).build();
}

if (obj != null) {
return Response.status(getResponseStatus()).entity(obj.toString()).build();
}

return Response.status(getResponseStatus()).build();
}

/**
* @return Runnable object that contains the work that needs to be completed for this API request
*/
public Runnable getRunnable() {
public Callable<Object> getCallable() {
synchronized (this) {
if (state.equals(ApiTaskState.SUBMITTED)) {
throw new IllegalStateException(String.format("API task %s already submitted", this));
Expand All @@ -109,9 +194,10 @@ public Runnable getRunnable() {
return () -> {
LOGGER.log(Level.FINE, "API task {0} started", this);
setSubmitted();
runnable.run();
Object ret = callable.call();
setCompleted();
LOGGER.log(Level.FINE, "API task {0} done", this);
return ret;
};
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Response submitApiTask(String name, ApiTask apiTask) {
return Response.status(Response.Status.BAD_REQUEST).build();
}

queues.get(queueName).submit(apiTask.getRunnable());
apiTask.setFuture(queues.get(queueName).submit(apiTask.getCallable()));
apiTasks.put(apiTask.getUuid(), apiTask);

return Response.status(Response.Status.ACCEPTED).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public Response set(@Context HttpServletRequest request,
new ApiTask(request.getRequestURI(), () -> {
env.applyConfig(body, reindex, CommandTimeoutType.RESTFUL);
suggesterService.refresh();
return null;
}));
}

Expand All @@ -102,6 +103,7 @@ public Response setField(@Context HttpServletRequest request,
// apply the configuration - let the environment reload the configuration if necessary
env.applyConfig(false, CommandTimeoutType.RESTFUL);
suggesterService.refresh();
return null;
}));
}

Expand All @@ -110,7 +112,11 @@ public Response setField(@Context HttpServletRequest request,
public Response reloadAuthorization(@Context HttpServletRequest request) {
return ApiTaskManager.getInstance().submitApiTask("authorization",
new ApiTask(request.getRequestURI(),
() -> env.getAuthorizationFramework().reload(), Response.Status.NO_CONTENT));
() -> {
env.getAuthorizationFramework().reload();
return null;
},
Response.Status.NO_CONTENT));
}

private Object getConfigurationValueException(String fieldName) throws WebApplicationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ public Response deleteProject(@Context HttpServletRequest request, @PathParam("p

return ApiTaskManager.getInstance().submitApiTask(PROJECTS_PATH,
new ApiTask(request.getRequestURI(),
() -> deleteProjectWorkHorse(projectName, project), Response.Status.NO_CONTENT));
() -> {
deleteProjectWorkHorse(projectName, project);
return null;
},
Response.Status.NO_CONTENT));
}

private void deleteProjectWorkHorse(String projectName, Project project) {
Expand Down Expand Up @@ -214,7 +218,11 @@ public Response deleteProjectData(@Context HttpServletRequest request,
disableProject(projectName);

return ApiTaskManager.getInstance().submitApiTask(PROJECTS_PATH,
new ApiTask(request.getRequestURI(), () -> deleteProjectDataWorkHorse(projectName)));
new ApiTask(request.getRequestURI(),
() -> {
deleteProjectDataWorkHorse(projectName);
return null;
}));
}

private void deleteProjectDataWorkHorse(String projectName) {
Expand Down Expand Up @@ -249,7 +257,11 @@ public Response deleteHistoryCache(@Context HttpServletRequest request,
final String projectName = Laundromat.launderInput(projectNameParam);

return ApiTaskManager.getInstance().submitApiTask(PROJECTS_PATH,
new ApiTask(request.getRequestURI(), () -> deleteHistoryCacheWorkHorse(projectName)));
new ApiTask(request.getRequestURI(),
() -> {
deleteHistoryCacheWorkHorse(projectName);
return null;
}));
}

private void deleteHistoryCacheWorkHorse(String projectName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public Response getStatus(@PathParam("uuid") String uuid) {
return Response.status(Response.Status.NOT_FOUND).build();
}

if (apiTask.isCompleted()) {
return Response.status(apiTask.getResponseStatus()).build();
if (apiTask.isDone()) {
return apiTask.getResponse();
} else {
return Response.status(Response.Status.ACCEPTED).build();
}
Expand All @@ -70,8 +70,8 @@ public Response delete(@PathParam("uuid") String uuid) {
return Response.status(Response.Status.NOT_FOUND).build();
}

if (!apiTask.isCompleted()) {
LOGGER.log(Level.WARNING, "API task {0} not yet completed", apiTask);
if (!apiTask.isDone()) {
LOGGER.log(Level.WARNING, "API task {0} not yet done", apiTask);
return Response.status(Response.Status.BAD_REQUEST).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import jakarta.ws.rs.core.Response;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand All @@ -49,7 +52,8 @@ void testQueueName() {
assertEquals(name.substring(1), ApiTaskManager.getQueueName(name));
}

private void doNothing() {
private Object doNothing() {
return null;
}

@Test
Expand Down Expand Up @@ -77,6 +81,54 @@ void testTaskSubmitDelete() {
assertNull(apiTaskManager.getApiTask(uuidString));
}

@Test
void taskSubmitCallableWithException() {
ApiTaskManager apiTaskManager = ApiTaskManager.getInstance();
String name = "exception";
apiTaskManager.addPool(name, 1);
ApiTask apiTask = new ApiTask("foo",
() -> {
throw new Exception("foo");
});
apiTaskManager.submitApiTask(name, apiTask);
await().atMost(3, TimeUnit.SECONDS).until(apiTask::isDone);
assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), apiTask.getResponse().getStatus());
}

@Test
void taskSubmitCallableWithExceptionMapping() {
ApiTaskManager apiTaskManager = ApiTaskManager.getInstance();
String name = "exceptionMap";
apiTaskManager.addPool(name, 1);
final String exceptionText = "exception text";
ApiTask apiTask = new ApiTask("foo",
() -> {
throw new IllegalStateException(exceptionText);
},
Response.Status.NO_CONTENT,
Map.of(IllegalStateException.class, Response.Status.NOT_ACCEPTABLE));
apiTaskManager.submitApiTask(name, apiTask);
await().atMost(3, TimeUnit.SECONDS).until(apiTask::isDone);
Response response = apiTask.getResponse();
assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(), response.getStatus());
assertTrue(response.getEntity().toString().contains(exceptionText));
}

@Test
void testCallable() {
ApiTaskManager apiTaskManager = ApiTaskManager.getInstance();
String name = "payload";
apiTaskManager.addPool(name, 1);
final String payloadText = "payload text";
ApiTask apiTask = new ApiTask("payload", () -> payloadText);
apiTaskManager.submitApiTask(name, apiTask);
await().atMost(3, TimeUnit.SECONDS).until(apiTask::isDone);
Response response = apiTask.getResponse();
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
assertNotNull(response.getEntity());
assertTrue(response.getEntity().toString().contains(payloadText));
}

@Test
void testTaskInvalidUuid() {
ApiTaskManager apiTaskManager = ApiTaskManager.getInstance();
Expand Down
Loading