Skip to content

Commit f7ced5b

Browse files
vkryachkorlazo
andauthored
Limited concurrency/pausable executors (#4459)
* Prototype: limited pausable executors * Update * Add copyrights * fix race * Apply suggestions from code review Co-authored-by: Rodrigo Lazo <[email protected]> * Address review comments. * fix --------- Co-authored-by: Rodrigo Lazo <[email protected]>
1 parent 5c11b8a commit f7ced5b

13 files changed

+769
-1
lines changed

firebase-common/src/main/java/com/google/firebase/concurrent/ExecutorsRegistrar.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class ExecutorsRegistrar implements ComponentRegistrar {
6464
Process.THREAD_PRIORITY_BACKGROUND
6565
+ Process.THREAD_PRIORITY_LESS_FAVORABLE))));
6666

67-
private static final Lazy<ScheduledExecutorService> SCHEDULER =
67+
static final Lazy<ScheduledExecutorService> SCHEDULER =
6868
new Lazy<>(
6969
() ->
7070
Executors.newSingleThreadScheduledExecutor(

firebase-common/src/main/java/com/google/firebase/concurrent/FirebaseExecutors.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package com.google.firebase.concurrent;
1616

1717
import java.util.concurrent.Executor;
18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.ScheduledExecutorService;
1820

1921
/** Provides commonly useful executors. */
2022
public class FirebaseExecutors {
@@ -33,6 +35,94 @@ public static Executor newSequentialExecutor(Executor delegate) {
3335
return new SequentialExecutor(delegate);
3436
}
3537

38+
/**
39+
* Returns an {@link Executor} that limits the number of running tasks at a given time.
40+
*
41+
* <p>The executor uses the {@code delegate} in order to {@link Executor#execute(Runnable)
42+
* execute} each task, and does not create any threads of its own.
43+
*
44+
* @param delegate {@link Executor} used to execute tasks
45+
* @param concurrency max number of tasks executing concurrently
46+
*/
47+
public static Executor newLimitedConcurrencyExecutor(Executor delegate, int concurrency) {
48+
return new LimitedConcurrencyExecutor(delegate, concurrency);
49+
}
50+
51+
/**
52+
* Returns a {@link ExecutorService} that limits the number of running tasks at a given time.
53+
*
54+
* <p>The executor uses delegate in order to {@link Executor#execute(Runnable) execute} each task,
55+
* and does not create any threads of its own.
56+
*
57+
* @param delegate {@link ExecutorService} used to execute tasks
58+
* @param concurrency max number of tasks executing concurrently
59+
*/
60+
public static ExecutorService newLimitedConcurrencyExecutorService(
61+
ExecutorService delegate, int concurrency) {
62+
return new LimitedConcurrencyExecutorService(delegate, concurrency);
63+
}
64+
65+
/**
66+
* Returns a {@link ScheduledExecutorService} that limits the number of running tasks at a given
67+
* time.
68+
*
69+
* <p>The executor uses delegate in order to {@link Executor#execute(Runnable) execute} each task,
70+
* and does not create any threads of its own.
71+
*
72+
* @param delegate {@link ExecutorService} used to execute tasks
73+
* @param concurrency max number of tasks executing concurrently
74+
*/
75+
public static ScheduledExecutorService newLimitedConcurrencyScheduledExecutorService(
76+
ExecutorService delegate, int concurrency) {
77+
return new DelegatingScheduledExecutorService(
78+
newLimitedConcurrencyExecutorService(delegate, concurrency),
79+
ExecutorsRegistrar.SCHEDULER.get());
80+
}
81+
82+
/**
83+
* Returns a {@link PausableExecutor }.
84+
*
85+
* <p>The executor does not create any threads of its own and instead delegates to the {@code
86+
* delegate} executor.
87+
*
88+
* <p>While {@link PausableExecutor#pause() paused}, the executor queues tasks which are executed
89+
* when the executor is {@link PausableExecutor#resume() resumed}, tasks that are already being
90+
* executed will not be paused and will run to completion.
91+
*/
92+
public static PausableExecutor newPausableExecutor(Executor delegate) {
93+
return new PausableExecutorImpl(false, delegate);
94+
}
95+
96+
/**
97+
* Returns a {@link PausableExecutorService }.
98+
*
99+
* <p>The executor does not create any threads of its own and instead delegates to the {@code
100+
* delegate} executor.
101+
*
102+
* <p>While {@link PausableExecutor#pause() paused}, the executor queues tasks which are executed
103+
* when the executor is {@link PausableExecutor#resume() resumed}, tasks that are already being
104+
* executed will not be paused and will run to completion.
105+
*/
106+
public static PausableExecutorService newPausableExecutorService(ExecutorService delegate) {
107+
return new PausableExecutorServiceImpl(false, delegate);
108+
}
109+
110+
/**
111+
* Returns a {@link PausableScheduledExecutorService }.
112+
*
113+
* <p>The executor does not create any threads of its own and instead delegates to the {@code
114+
* delegate} executor.
115+
*
116+
* <p>While {@link PausableExecutor#pause() paused}, the executor queues tasks which are executed
117+
* when the executor is {@link PausableExecutor#resume() resumed}, tasks that are already being
118+
* executed will not be paused and will run to completion.
119+
*/
120+
public static PausableScheduledExecutorService newPausableScheduledExecutorService(
121+
ScheduledExecutorService delegate) {
122+
return new PausableScheduledExecutorServiceImpl(
123+
newPausableExecutorService(delegate), ExecutorsRegistrar.SCHEDULER.get());
124+
}
125+
36126
/** Returns a direct executor. */
37127
public static Executor directExecutor() {
38128
return DirectExecutor.INSTANCE;
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.concurrent;
16+
17+
import com.google.firebase.components.Preconditions;
18+
import java.util.concurrent.Executor;
19+
import java.util.concurrent.LinkedBlockingQueue;
20+
import java.util.concurrent.Semaphore;
21+
22+
/**
23+
* An {@link Executor}that limits the number of concurrent tasks executing at a given time.
24+
*
25+
* <p>Delegates actual execution to the {@code delegate} executor and does not create threads of it
26+
* own.
27+
*
28+
* <p>The executor is fair: has FIFO semantics for submitted tasks.
29+
*/
30+
class LimitedConcurrencyExecutor implements Executor {
31+
32+
private final Executor delegate;
33+
34+
private final Semaphore semaphore;
35+
36+
private final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
37+
38+
LimitedConcurrencyExecutor(Executor delegate, int concurrency) {
39+
Preconditions.checkArgument(concurrency > 0, "concurrency must be positive.");
40+
this.delegate = delegate;
41+
semaphore = new Semaphore(concurrency, true);
42+
}
43+
44+
@Override
45+
public void execute(Runnable command) {
46+
queue.offer(command);
47+
maybeEnqueueNext();
48+
}
49+
50+
private void maybeEnqueueNext() {
51+
while (semaphore.tryAcquire()) {
52+
Runnable next = queue.poll();
53+
if (next != null) {
54+
delegate.execute(decorate(next));
55+
} else {
56+
semaphore.release();
57+
break;
58+
}
59+
}
60+
}
61+
62+
private Runnable decorate(Runnable command) {
63+
return () -> {
64+
try {
65+
command.run();
66+
} finally {
67+
semaphore.release();
68+
maybeEnqueueNext();
69+
}
70+
};
71+
}
72+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.concurrent;
16+
17+
import java.util.Collection;
18+
import java.util.List;
19+
import java.util.concurrent.Callable;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Future;
23+
import java.util.concurrent.FutureTask;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
27+
final class LimitedConcurrencyExecutorService extends LimitedConcurrencyExecutor
28+
implements ExecutorService {
29+
private final ExecutorService delegate;
30+
31+
LimitedConcurrencyExecutorService(ExecutorService delegate, int concurrency) {
32+
super(delegate, concurrency);
33+
this.delegate = delegate;
34+
}
35+
36+
@Override
37+
public void shutdown() {
38+
throw new UnsupportedOperationException("Shutting down is not allowed.");
39+
}
40+
41+
@Override
42+
public List<Runnable> shutdownNow() {
43+
throw new UnsupportedOperationException("Shutting down is not allowed.");
44+
}
45+
46+
@Override
47+
public boolean isShutdown() {
48+
return delegate.isShutdown();
49+
}
50+
51+
@Override
52+
public boolean isTerminated() {
53+
return delegate.isTerminated();
54+
}
55+
56+
@Override
57+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
58+
return delegate.awaitTermination(timeout, unit);
59+
}
60+
61+
@Override
62+
public <T> Future<T> submit(Callable<T> task) {
63+
FutureTask<T> ft = new FutureTask<>(task);
64+
execute(ft);
65+
return ft;
66+
}
67+
68+
@Override
69+
public <T> Future<T> submit(Runnable task, T result) {
70+
return submit(
71+
() -> {
72+
task.run();
73+
return result;
74+
});
75+
}
76+
77+
@Override
78+
public Future<?> submit(Runnable task) {
79+
return submit(
80+
() -> {
81+
task.run();
82+
return null;
83+
});
84+
}
85+
86+
@Override
87+
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
88+
throws InterruptedException {
89+
return delegate.invokeAll(tasks);
90+
}
91+
92+
@Override
93+
public <T> List<Future<T>> invokeAll(
94+
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
95+
throws InterruptedException {
96+
return delegate.invokeAll(tasks, timeout, unit);
97+
}
98+
99+
@Override
100+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
101+
throws ExecutionException, InterruptedException {
102+
return delegate.invokeAny(tasks);
103+
}
104+
105+
@Override
106+
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
107+
throws ExecutionException, InterruptedException, TimeoutException {
108+
return delegate.invokeAny(tasks, timeout, unit);
109+
}
110+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.concurrent;
16+
17+
import java.util.concurrent.Executor;
18+
19+
/**
20+
* An {@link Executor} that can be paused/resumed.
21+
*
22+
* <p>When the executor is {@link #pause() paused}, tasks get queued and execute when {@link
23+
* #resume()} is called.
24+
*/
25+
public interface PausableExecutor extends Executor {
26+
27+
void pause();
28+
29+
void resume();
30+
31+
boolean isPaused();
32+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.concurrent;
16+
17+
import androidx.annotation.VisibleForTesting;
18+
import java.util.concurrent.Executor;
19+
import java.util.concurrent.LinkedBlockingQueue;
20+
21+
final class PausableExecutorImpl implements PausableExecutor {
22+
private volatile boolean paused;
23+
private final Executor delegate;
24+
25+
@VisibleForTesting final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
26+
27+
PausableExecutorImpl(boolean paused, Executor delegate) {
28+
this.paused = paused;
29+
this.delegate = delegate;
30+
}
31+
32+
@Override
33+
public void execute(Runnable command) {
34+
queue.offer(command);
35+
maybeEnqueueNext();
36+
}
37+
38+
private void maybeEnqueueNext() {
39+
if (paused) {
40+
return;
41+
}
42+
Runnable next = queue.poll();
43+
while (next != null) {
44+
delegate.execute(next);
45+
if (!paused) {
46+
next = queue.poll();
47+
} else {
48+
next = null;
49+
}
50+
}
51+
}
52+
53+
@Override
54+
public void pause() {
55+
paused = true;
56+
}
57+
58+
@Override
59+
public void resume() {
60+
paused = false;
61+
maybeEnqueueNext();
62+
}
63+
64+
@Override
65+
public boolean isPaused() {
66+
return paused;
67+
}
68+
}

0 commit comments

Comments
 (0)