Skip to content

Commit 93b69a9

Browse files
authored
Make file watcher startable on any node (#101572)
The file settings service must only be started on the master node. However, the abstract super class for watching any file should be usable on any node type. This commit reworks the abstract file watcher to work on any node by moving the cluster state based logic to detect the master node directly into the file settings service. Additionally, if the file watcher cannot be started because the grandparent directory of the watched file does not exist, a warning is emitted. relates ES-6783
1 parent 553d2d4 commit 93b69a9

File tree

4 files changed

+95
-88
lines changed

4 files changed

+95
-88
lines changed

server/src/main/java/org/elasticsearch/common/file/AbstractFileWatchingService.java

Lines changed: 9 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,6 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13-
import org.elasticsearch.cluster.ClusterChangedEvent;
14-
import org.elasticsearch.cluster.ClusterState;
15-
import org.elasticsearch.cluster.ClusterStateListener;
16-
import org.elasticsearch.cluster.node.DiscoveryNode;
17-
import org.elasticsearch.cluster.service.ClusterService;
1813
import org.elasticsearch.common.Randomness;
1914
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2015
import org.elasticsearch.reservedstate.service.FileChangedListener;
@@ -27,8 +22,6 @@
2722
import java.nio.file.WatchKey;
2823
import java.nio.file.WatchService;
2924
import java.nio.file.attribute.BasicFileAttributes;
30-
import java.nio.file.attribute.FileTime;
31-
import java.time.Instant;
3225
import java.util.List;
3326
import java.util.concurrent.CopyOnWriteArrayList;
3427
import java.util.concurrent.ExecutionException;
@@ -52,11 +45,10 @@
5245
* An implementation class should override {@link #processFileChanges()} to define
5346
* the correct behavior.</p>
5447
*/
55-
public abstract class AbstractFileWatchingService extends AbstractLifecycleComponent implements ClusterStateListener {
48+
public abstract class AbstractFileWatchingService extends AbstractLifecycleComponent {
5649

5750
private static final Logger logger = LogManager.getLogger(AbstractFileWatchingService.class);
5851
private static final int REGISTER_RETRY_COUNT = 5;
59-
private final ClusterService clusterService;
6052
private final Path watchedFileDir;
6153
private final Path watchedFile;
6254
private final List<FileChangedListener> eventListeners;
@@ -65,10 +57,8 @@ public abstract class AbstractFileWatchingService extends AbstractLifecycleCompo
6557
private FileUpdateState fileUpdateState;
6658
private WatchKey settingsDirWatchKey;
6759
private WatchKey configDirWatchKey;
68-
private volatile boolean active = false;
6960

70-
public AbstractFileWatchingService(ClusterService clusterService, Path watchedFile) {
71-
this.clusterService = clusterService;
61+
public AbstractFileWatchingService(Path watchedFile) {
7262
this.watchedFile = watchedFile;
7363
this.watchedFileDir = watchedFile.getParent();
7464
this.eventListeners = new CopyOnWriteArrayList<>();
@@ -84,40 +74,6 @@ public AbstractFileWatchingService(ClusterService clusterService, Path watchedFi
8474
*/
8575
protected abstract void processFileChanges() throws InterruptedException, ExecutionException, IOException;
8676

87-
/**
88-
* There may be an indication in cluster state that the file we are watching
89-
* should be re-processed: for example, after cluster state has been restored
90-
* from a snapshot. By default, we do nothing, but this method should be overridden
91-
* if different behavior is desired.
92-
* @param clusterState State of the cluster
93-
* @return false, by default
94-
*/
95-
protected boolean shouldRefreshFileState(ClusterState clusterState) {
96-
return false;
97-
}
98-
99-
/**
100-
* 'Touches' the settings file so the file watcher will re-processes it.
101-
* <p>
102-
* The file processing is asynchronous, the cluster state or the file must be already updated such that
103-
* the version information in the file is newer than what's already saved as processed in the
104-
* cluster state.
105-
*
106-
* For snapshot restores we first must restore the snapshot and then force a refresh, since the cluster state
107-
* metadata version must be reset to 0 and saved in the cluster state.
108-
*/
109-
private void refreshExistingFileStateIfNeeded(ClusterState clusterState) {
110-
if (watching()) {
111-
if (shouldRefreshFileState(clusterState) && Files.exists(watchedFile())) {
112-
try {
113-
Files.setLastModifiedTime(watchedFile(), FileTime.from(Instant.now()));
114-
} catch (IOException e) {
115-
logger.warn("encountered I/O error trying to update file settings timestamp", e);
116-
}
117-
}
118-
}
119-
}
120-
12177
public final void addFileChangedListener(FileChangedListener listener) {
12278
eventListeners.add(listener);
12379
}
@@ -131,33 +87,12 @@ public final Path watchedFile() {
13187
}
13288

13389
@Override
134-
public final void clusterChanged(ClusterChangedEvent event) {
135-
ClusterState clusterState = event.state();
136-
if (clusterState.nodes().isLocalNodeElectedMaster()) {
137-
startWatcher(clusterState);
138-
} else if (event.previousState().nodes().isLocalNodeElectedMaster()) {
139-
stopWatcher();
140-
}
90+
protected void doStart() {
91+
startWatcher();
14192
}
14293

14394
@Override
144-
protected final void doStart() {
145-
// We start the file watcher when we know we are master from a cluster state change notification.
146-
// We need the additional active flag, since cluster state can change after we've shutdown the service
147-
// causing the watcher to start again.
148-
this.active = Files.exists(watchedFileDir().getParent());
149-
if (active == false) {
150-
// we don't have a config directory, we can't possibly launch the file settings service
151-
return;
152-
}
153-
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
154-
clusterService.addListener(this);
155-
}
156-
}
157-
158-
@Override
159-
protected final void doStop() {
160-
this.active = false;
95+
protected void doStop() {
16196
logger.debug("Stopping file watching service");
16297
stopWatcher();
16398
}
@@ -184,10 +119,9 @@ final boolean watchedFileChanged(Path path) throws IOException {
184119
return (previousUpdateState == null || previousUpdateState.equals(fileUpdateState) == false);
185120
}
186121

187-
private synchronized void startWatcher(ClusterState clusterState) {
188-
if (watching() || active == false) {
189-
refreshExistingFileStateIfNeeded(clusterState);
190-
122+
protected final synchronized void startWatcher() {
123+
if (Files.exists(watchedFileDir.getParent()) == false) {
124+
logger.warn("File watcher for [{}] cannot start because grandparent directory does not exist", watchedFile);
191125
return;
192126
}
193127

@@ -295,7 +229,7 @@ protected final void watcherThread() {
295229
}
296230
}
297231

298-
final synchronized void stopWatcher() {
232+
protected final synchronized void stopWatcher() {
299233
if (watching()) {
300234
logger.debug("stopping watcher ...");
301235
// make sure watch service is closed whatever

server/src/main/java/org/elasticsearch/reservedstate/service/FileSettingsService.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.cluster.ClusterChangedEvent;
1415
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.ClusterStateListener;
1517
import org.elasticsearch.cluster.metadata.Metadata;
1618
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
19+
import org.elasticsearch.cluster.node.DiscoveryNode;
1720
import org.elasticsearch.cluster.service.ClusterService;
1821
import org.elasticsearch.common.file.AbstractFileWatchingService;
1922
import org.elasticsearch.env.Environment;
@@ -22,6 +25,8 @@
2225
import java.io.BufferedInputStream;
2326
import java.io.IOException;
2427
import java.nio.file.Files;
28+
import java.nio.file.attribute.FileTime;
29+
import java.time.Instant;
2530
import java.util.concurrent.ExecutionException;
2631

2732
import static org.elasticsearch.xcontent.XContentType.JSON;
@@ -37,14 +42,16 @@
3742
* the service as a listener to cluster state changes, so that we can enable the file watcher thread when this
3843
* node becomes a master node.
3944
*/
40-
public class FileSettingsService extends AbstractFileWatchingService {
45+
public class FileSettingsService extends AbstractFileWatchingService implements ClusterStateListener {
4146

4247
private static final Logger logger = LogManager.getLogger(FileSettingsService.class);
4348

4449
public static final String SETTINGS_FILE_NAME = "settings.json";
4550
public static final String NAMESPACE = "file_settings";
4651
public static final String OPERATOR_DIRECTORY = "operator";
52+
private final ClusterService clusterService;
4753
private final ReservedClusterStateService stateService;
54+
private volatile boolean active = false;
4855

4956
/**
5057
* Constructs the {@link FileSettingsService}
@@ -54,10 +61,70 @@ public class FileSettingsService extends AbstractFileWatchingService {
5461
* @param environment we need the environment to pull the location of the config and operator directories
5562
*/
5663
public FileSettingsService(ClusterService clusterService, ReservedClusterStateService stateService, Environment environment) {
57-
super(clusterService, environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY).resolve(SETTINGS_FILE_NAME));
64+
super(environment.configFile().toAbsolutePath().resolve(OPERATOR_DIRECTORY).resolve(SETTINGS_FILE_NAME));
65+
this.clusterService = clusterService;
5866
this.stateService = stateService;
5967
}
6068

69+
@Override
70+
protected void doStart() {
71+
// We start the file watcher when we know we are master from a cluster state change notification.
72+
// We need the additional active flag, since cluster state can change after we've shutdown the service
73+
// causing the watcher to start again.
74+
this.active = Files.exists(watchedFileDir().getParent());
75+
if (active == false) {
76+
// we don't have a config directory, we can't possibly launch the file settings service
77+
return;
78+
}
79+
if (DiscoveryNode.isMasterNode(clusterService.getSettings())) {
80+
clusterService.addListener(this);
81+
}
82+
}
83+
84+
@Override
85+
protected void doStop() {
86+
this.active = false;
87+
super.doStop();
88+
}
89+
90+
@Override
91+
public final void clusterChanged(ClusterChangedEvent event) {
92+
ClusterState clusterState = event.state();
93+
if (clusterState.nodes().isLocalNodeElectedMaster()) {
94+
synchronized (this) {
95+
if (watching() || active == false) {
96+
refreshExistingFileStateIfNeeded(clusterState);
97+
return;
98+
}
99+
startWatcher();
100+
}
101+
} else if (event.previousState().nodes().isLocalNodeElectedMaster()) {
102+
stopWatcher();
103+
}
104+
}
105+
106+
/**
107+
* 'Touches' the settings file so the file watcher will re-processes it.
108+
* <p>
109+
* The file processing is asynchronous, the cluster state or the file must be already updated such that
110+
* the version information in the file is newer than what's already saved as processed in the
111+
* cluster state.
112+
*
113+
* For snapshot restores we first must restore the snapshot and then force a refresh, since the cluster state
114+
* metadata version must be reset to 0 and saved in the cluster state.
115+
*/
116+
private void refreshExistingFileStateIfNeeded(ClusterState clusterState) {
117+
if (watching()) {
118+
if (shouldRefreshFileState(clusterState) && Files.exists(watchedFile())) {
119+
try {
120+
Files.setLastModifiedTime(watchedFile(), FileTime.from(Instant.now()));
121+
} catch (IOException e) {
122+
logger.warn("encountered I/O error trying to update file settings timestamp", e);
123+
}
124+
}
125+
}
126+
}
127+
61128
/**
62129
* Used by snapshot restore service {@link org.elasticsearch.snapshots.RestoreService} to prepare the reserved
63130
* state of the snapshot for the current cluster.
@@ -95,8 +162,7 @@ public void handleSnapshotRestore(ClusterState clusterState, Metadata.Builder md
95162
* @param clusterState State of the cluster
96163
* @return true if file settings metadata version is exactly 0, false otherwise.
97164
*/
98-
@Override
99-
protected boolean shouldRefreshFileState(ClusterState clusterState) {
165+
private boolean shouldRefreshFileState(ClusterState clusterState) {
100166
// We check if the version was reset to 0, and force an update if a file exists. This can happen in situations
101167
// like snapshot restores.
102168
ReservedStateMetadata fileSettingsMetadata = clusterState.metadata().reservedStateMetadata().get(NAMESPACE);

server/src/test/java/org/elasticsearch/common/file/AbstractFileWatchingServiceTests.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.elasticsearch.common.file;
1010

11-
import org.elasticsearch.cluster.ClusterChangedEvent;
1211
import org.elasticsearch.cluster.ClusterName;
1312
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -58,13 +57,13 @@ class TestFileWatchingService extends AbstractFileWatchingService {
5857

5958
private final CountDownLatch countDownLatch;
6059

61-
TestFileWatchingService(ClusterService clusterService, Path watchedFile) {
62-
super(clusterService, watchedFile);
60+
TestFileWatchingService(Path watchedFile) {
61+
super(watchedFile);
6362
this.countDownLatch = null;
6463
}
6564

66-
TestFileWatchingService(ClusterService clusterService, Path watchedFile, CountDownLatch countDownLatch) {
67-
super(clusterService, watchedFile);
65+
TestFileWatchingService(Path watchedFile, CountDownLatch countDownLatch) {
66+
super(watchedFile);
6867
this.countDownLatch = countDownLatch;
6968
}
7069

@@ -99,7 +98,7 @@ public void setUp() throws Exception {
9998

10099
Files.createDirectories(env.configFile());
101100

102-
fileWatchingService = new TestFileWatchingService(clusterService, getWatchedFilePath(env));
101+
fileWatchingService = new TestFileWatchingService(getWatchedFilePath(env));
103102
}
104103

105104
@After
@@ -110,7 +109,6 @@ public void tearDown() throws Exception {
110109

111110
public void testStartStop() {
112111
fileWatchingService.start();
113-
fileWatchingService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
114112
assertTrue(fileWatchingService.watching());
115113
fileWatchingService.stop();
116114
assertFalse(fileWatchingService.watching());
@@ -143,10 +141,9 @@ public void testWatchedFile() throws Exception {
143141
public void testCallsProcessing() throws Exception {
144142
CountDownLatch processFileLatch = new CountDownLatch(1);
145143

146-
AbstractFileWatchingService service = new TestFileWatchingService(clusterService, getWatchedFilePath(env), processFileLatch);
144+
AbstractFileWatchingService service = new TestFileWatchingService(getWatchedFilePath(env), processFileLatch);
147145

148146
service.start();
149-
service.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
150147
assertTrue(service.watching());
151148

152149
Files.createDirectories(service.watchedFileDir());

server/src/test/java/org/elasticsearch/reservedstate/service/FileSettingsServiceTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,16 @@ public void tearDown() throws Exception {
112112
threadpool.shutdownNow();
113113
}
114114

115+
public void testStartStop() {
116+
fileSettingsService.start();
117+
assertFalse(fileSettingsService.watching());
118+
fileSettingsService.clusterChanged(new ClusterChangedEvent("test", clusterService.state(), ClusterState.EMPTY_STATE));
119+
assertTrue(fileSettingsService.watching());
120+
fileSettingsService.stop();
121+
assertFalse(fileSettingsService.watching());
122+
fileSettingsService.close();
123+
}
124+
115125
public void testOperatorDirName() {
116126
Path operatorPath = fileSettingsService.watchedFileDir();
117127
assertTrue(operatorPath.startsWith(env.configFile()));

0 commit comments

Comments
 (0)