Skip to content

Commit cffc07f

Browse files
authored
xds: add File-watcher certificate provider (#7590)
1 parent d154aa3 commit cffc07f

File tree

4 files changed

+956
-0
lines changed

4 files changed

+956
-0
lines changed
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds.internal.certprovider;
18+
19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import io.grpc.InternalLogId;
23+
import io.grpc.Status;
24+
import io.grpc.SynchronizationContext;
25+
import io.grpc.internal.TimeProvider;
26+
import io.grpc.xds.internal.sds.trust.CertificateUtils;
27+
28+
import java.io.ByteArrayInputStream;
29+
import java.nio.file.Files;
30+
import java.nio.file.Path;
31+
import java.nio.file.Paths;
32+
import java.nio.file.attribute.FileTime;
33+
import java.security.PrivateKey;
34+
import java.security.cert.X509Certificate;
35+
import java.util.Arrays;
36+
import java.util.concurrent.ScheduledExecutorService;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.logging.Level;
39+
import java.util.logging.Logger;
40+
41+
// TODO(sanjaypujare): abstract out common functionality into an an abstract superclass
42+
/** Implementation of {@link CertificateProvider} for file watching cert provider. */
43+
final class FileWatcherCertificateProvider extends CertificateProvider {
44+
private static final Logger logger =
45+
Logger.getLogger(FileWatcherCertificateProvider.class.getName());
46+
47+
private final SynchronizationContext syncContext;
48+
private final ScheduledExecutorService scheduledExecutorService;
49+
private final TimeProvider timeProvider;
50+
private final Path certFile;
51+
private final Path keyFile;
52+
private final Path trustFile;
53+
private final long refreshIntervalInSeconds;
54+
@VisibleForTesting SynchronizationContext.ScheduledHandle scheduledHandle;
55+
private FileTime lastModifiedTimeCert;
56+
private FileTime lastModifiedTimeKey;
57+
private FileTime lastModifiedTimeRoot;
58+
59+
FileWatcherCertificateProvider(
60+
DistributorWatcher watcher,
61+
boolean notifyCertUpdates,
62+
String certFile,
63+
String keyFile,
64+
String trustFile,
65+
long refreshIntervalInSeconds,
66+
ScheduledExecutorService scheduledExecutorService,
67+
TimeProvider timeProvider) {
68+
super(watcher, notifyCertUpdates);
69+
this.scheduledExecutorService =
70+
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
71+
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
72+
this.certFile = Paths.get(checkNotNull(certFile, "certFile"));
73+
this.keyFile = Paths.get(checkNotNull(keyFile, "keyFile"));
74+
this.trustFile = Paths.get(checkNotNull(trustFile, "trustFile"));
75+
this.refreshIntervalInSeconds = refreshIntervalInSeconds;
76+
this.syncContext = createSynchronizationContext(certFile);
77+
}
78+
79+
private SynchronizationContext createSynchronizationContext(String details) {
80+
final InternalLogId logId =
81+
InternalLogId.allocate("DynamicReloadingCertificateProvider", details);
82+
return new SynchronizationContext(
83+
new Thread.UncaughtExceptionHandler() {
84+
private boolean panicMode;
85+
86+
@Override
87+
public void uncaughtException(Thread t, Throwable e) {
88+
logger.log(
89+
Level.SEVERE,
90+
"[" + logId + "] Uncaught exception in the SynchronizationContext. Panic!",
91+
e);
92+
panic(e);
93+
}
94+
95+
void panic(final Throwable t) {
96+
if (panicMode) {
97+
// Preserve the first panic information
98+
return;
99+
}
100+
panicMode = true;
101+
close();
102+
}
103+
});
104+
}
105+
106+
@Override
107+
public void start() {
108+
scheduleNextRefreshCertificate(/* delayInSeconds= */0);
109+
}
110+
111+
@Override
112+
public void close() {
113+
if (scheduledHandle != null) {
114+
scheduledHandle.cancel();
115+
scheduledHandle = null;
116+
}
117+
getWatcher().close();
118+
}
119+
120+
private void scheduleNextRefreshCertificate(long delayInSeconds) {
121+
RefreshCertificateTask runnable = new RefreshCertificateTask();
122+
scheduledHandle =
123+
syncContext.schedule(runnable, delayInSeconds, TimeUnit.SECONDS, scheduledExecutorService);
124+
}
125+
126+
@VisibleForTesting
127+
void checkAndReloadCertificates() {
128+
try {
129+
try {
130+
FileTime currentCertTime = Files.getLastModifiedTime(certFile);
131+
FileTime currentKeyTime = Files.getLastModifiedTime(keyFile);
132+
if (!currentCertTime.equals(lastModifiedTimeCert)
133+
&& !currentKeyTime.equals(lastModifiedTimeKey)) {
134+
byte[] certFileContents = Files.readAllBytes(certFile);
135+
byte[] keyFileContents = Files.readAllBytes(keyFile);
136+
FileTime currentCertTime2 = Files.getLastModifiedTime(certFile);
137+
FileTime currentKeyTime2 = Files.getLastModifiedTime(keyFile);
138+
if (!currentCertTime2.equals(currentCertTime)) {
139+
return;
140+
}
141+
if (!currentKeyTime2.equals(currentKeyTime)) {
142+
return;
143+
}
144+
try (ByteArrayInputStream certStream = new ByteArrayInputStream(certFileContents);
145+
ByteArrayInputStream keyStream = new ByteArrayInputStream(keyFileContents)) {
146+
PrivateKey privateKey = CertificateUtils.getPrivateKey(keyStream);
147+
X509Certificate[] certs = CertificateUtils.toX509Certificates(certStream);
148+
getWatcher().updateCertificate(privateKey, Arrays.asList(certs));
149+
}
150+
lastModifiedTimeCert = currentCertTime;
151+
lastModifiedTimeKey = currentKeyTime;
152+
}
153+
} catch (Throwable t) {
154+
generateErrorIfCurrentCertExpired(t);
155+
}
156+
try {
157+
FileTime currentRootTime = Files.getLastModifiedTime(trustFile);
158+
if (currentRootTime.equals(lastModifiedTimeRoot)) {
159+
return;
160+
}
161+
byte[] rootFileContents = Files.readAllBytes(trustFile);
162+
FileTime currentRootTime2 = Files.getLastModifiedTime(trustFile);
163+
if (!currentRootTime2.equals(currentRootTime)) {
164+
return;
165+
}
166+
try (ByteArrayInputStream rootStream = new ByteArrayInputStream(rootFileContents)) {
167+
X509Certificate[] caCerts = CertificateUtils.toX509Certificates(rootStream);
168+
getWatcher().updateTrustedRoots(Arrays.asList(caCerts));
169+
}
170+
lastModifiedTimeRoot = currentRootTime;
171+
} catch (Throwable t) {
172+
getWatcher().onError(Status.fromThrowable(t));
173+
}
174+
} finally {
175+
scheduleNextRefreshCertificate(refreshIntervalInSeconds);
176+
}
177+
}
178+
179+
private void generateErrorIfCurrentCertExpired(Throwable t) {
180+
X509Certificate currentCert = getWatcher().getLastIdentityCert();
181+
if (currentCert != null) {
182+
long delaySeconds = computeDelaySecondsToCertExpiry(currentCert);
183+
if (delaySeconds > refreshIntervalInSeconds) {
184+
logger.log(Level.FINER, "reload certificate error", t);
185+
return;
186+
}
187+
// The current cert is going to expire in less than {@link refreshIntervalInSeconds}
188+
// Clear the current cert and notify our watchers thru {@code onError}
189+
getWatcher().clearValues();
190+
}
191+
getWatcher().onError(Status.fromThrowable(t));
192+
}
193+
194+
@SuppressWarnings("JdkObsolete")
195+
private long computeDelaySecondsToCertExpiry(X509Certificate lastCert) {
196+
checkNotNull(lastCert, "lastCert");
197+
return TimeUnit.NANOSECONDS.toSeconds(
198+
TimeUnit.MILLISECONDS.toNanos(lastCert.getNotAfter().getTime())
199+
- timeProvider.currentTimeNanos());
200+
}
201+
202+
@VisibleForTesting
203+
class RefreshCertificateTask implements Runnable {
204+
@Override
205+
public void run() {
206+
checkAndReloadCertificates();
207+
}
208+
}
209+
210+
abstract static class Factory {
211+
private static final Factory DEFAULT_INSTANCE =
212+
new Factory() {
213+
@Override
214+
FileWatcherCertificateProvider create(
215+
DistributorWatcher watcher,
216+
boolean notifyCertUpdates,
217+
String certFile,
218+
String keyFile,
219+
String trustFile,
220+
long refreshIntervalInSeconds,
221+
ScheduledExecutorService scheduledExecutorService,
222+
TimeProvider timeProvider) {
223+
return new FileWatcherCertificateProvider(
224+
watcher,
225+
notifyCertUpdates,
226+
certFile,
227+
keyFile,
228+
trustFile,
229+
refreshIntervalInSeconds,
230+
scheduledExecutorService,
231+
timeProvider);
232+
}
233+
};
234+
235+
static Factory getInstance() {
236+
return DEFAULT_INSTANCE;
237+
}
238+
239+
abstract FileWatcherCertificateProvider create(
240+
DistributorWatcher watcher,
241+
boolean notifyCertUpdates,
242+
String certFile,
243+
String keyFile,
244+
String trustFile,
245+
long refreshIntervalInSeconds,
246+
ScheduledExecutorService scheduledExecutorService,
247+
TimeProvider timeProvider);
248+
}
249+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds.internal.certprovider;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
22+
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
24+
import com.google.protobuf.Duration;
25+
import com.google.protobuf.util.Durations;
26+
import io.grpc.internal.JsonUtil;
27+
import io.grpc.internal.TimeProvider;
28+
import java.text.ParseException;
29+
import java.util.Map;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
33+
/**
34+
* Provider of {@link FileWatcherCertificateProvider}s.
35+
*/
36+
final class FileWatcherCertificateProviderProvider implements CertificateProviderProvider {
37+
38+
private static final String CERT_FILE_KEY = "certificate_file";
39+
private static final String KEY_FILE_KEY = "private_key_file";
40+
private static final String ROOT_FILE_KEY = "ca_certificate_file";
41+
private static final String REFRESH_INTERVAL_KEY = "refresh_interval";
42+
43+
@VisibleForTesting static final long REFRESH_INTERVAL_DEFAULT = 600L;
44+
45+
46+
static final String FILE_WATCHER_PROVIDER_NAME = "file_watcher";
47+
48+
static {
49+
CertificateProviderRegistry.getInstance()
50+
.register(
51+
new FileWatcherCertificateProviderProvider(
52+
FileWatcherCertificateProvider.Factory.getInstance(),
53+
ScheduledExecutorServiceFactory.DEFAULT_INSTANCE,
54+
TimeProvider.SYSTEM_TIME_PROVIDER));
55+
}
56+
57+
final FileWatcherCertificateProvider.Factory fileWatcherCertificateProviderFactory;
58+
private final ScheduledExecutorServiceFactory scheduledExecutorServiceFactory;
59+
private final TimeProvider timeProvider;
60+
61+
@VisibleForTesting
62+
FileWatcherCertificateProviderProvider(
63+
FileWatcherCertificateProvider.Factory fileWatcherCertificateProviderFactory,
64+
ScheduledExecutorServiceFactory scheduledExecutorServiceFactory,
65+
TimeProvider timeProvider) {
66+
this.fileWatcherCertificateProviderFactory = fileWatcherCertificateProviderFactory;
67+
this.scheduledExecutorServiceFactory = scheduledExecutorServiceFactory;
68+
this.timeProvider = timeProvider;
69+
}
70+
71+
@Override
72+
public String getName() {
73+
return FILE_WATCHER_PROVIDER_NAME;
74+
}
75+
76+
@Override
77+
public CertificateProvider createCertificateProvider(
78+
Object config, CertificateProvider.DistributorWatcher watcher, boolean notifyCertUpdates) {
79+
80+
Config configObj = validateAndTranslateConfig(config);
81+
return fileWatcherCertificateProviderFactory.create(
82+
watcher,
83+
notifyCertUpdates,
84+
configObj.certFile,
85+
configObj.keyFile,
86+
configObj.rootFile,
87+
configObj.refrehInterval,
88+
scheduledExecutorServiceFactory.create(),
89+
timeProvider);
90+
}
91+
92+
private static String checkForNullAndGet(Map<String, ?> map, String key) {
93+
return checkNotNull(JsonUtil.getString(map, key), "'" + key + "' is required in the config");
94+
}
95+
96+
private static Config validateAndTranslateConfig(Object config) {
97+
checkArgument(config instanceof Map, "Only Map supported for config");
98+
@SuppressWarnings("unchecked") Map<String, ?> map = (Map<String, ?>)config;
99+
100+
Config configObj = new Config();
101+
configObj.certFile = checkForNullAndGet(map, CERT_FILE_KEY);
102+
configObj.keyFile = checkForNullAndGet(map, KEY_FILE_KEY);
103+
configObj.rootFile = checkForNullAndGet(map, ROOT_FILE_KEY);
104+
String refreshIntervalString = JsonUtil.getString(map, REFRESH_INTERVAL_KEY);
105+
if (refreshIntervalString != null) {
106+
try {
107+
Duration duration = Durations.parse(refreshIntervalString);
108+
configObj.refrehInterval = duration.getSeconds();
109+
} catch (ParseException e) {
110+
throw new IllegalArgumentException(e);
111+
}
112+
}
113+
if (configObj.refrehInterval == null) {
114+
configObj.refrehInterval = REFRESH_INTERVAL_DEFAULT;
115+
}
116+
return configObj;
117+
}
118+
119+
abstract static class ScheduledExecutorServiceFactory {
120+
121+
private static final ScheduledExecutorServiceFactory DEFAULT_INSTANCE =
122+
new ScheduledExecutorServiceFactory() {
123+
124+
@Override
125+
ScheduledExecutorService create() {
126+
return Executors.newSingleThreadScheduledExecutor(
127+
new ThreadFactoryBuilder()
128+
.setNameFormat("fileWatcher" + "-%d")
129+
.setDaemon(true)
130+
.build());
131+
}
132+
};
133+
134+
abstract ScheduledExecutorService create();
135+
}
136+
137+
/** POJO class for storing various config values. */
138+
@VisibleForTesting
139+
static class Config {
140+
String certFile;
141+
String keyFile;
142+
String rootFile;
143+
Long refrehInterval;
144+
}
145+
}

0 commit comments

Comments
 (0)