Skip to content

Commit 8dd49eb

Browse files
committed
Fixed #113 - a race condition that could result in duplicate events to be emitted on reconnect
1 parent f608aa8 commit 8dd49eb

File tree

4 files changed

+236
-111
lines changed

4 files changed

+236
-111
lines changed

changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22
All notable changes to this project will be documented in this file.
33
This project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-20
6+
7+
### Fixed
8+
- A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)).
9+
510
## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31
611

712
### Fixed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Lines changed: 120 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public X509Certificate[] getAcceptedIssuers() {
140140
private SocketFactory socketFactory;
141141
private SSLSocketFactory sslSocketFactory;
142142

143-
private PacketChannel channel;
143+
private volatile PacketChannel channel;
144144
private volatile boolean connected;
145145

146146
private ThreadFactory threadFactory;
@@ -150,9 +150,8 @@ public X509Certificate[] getAcceptedIssuers() {
150150
private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3);
151151

152152
private volatile ExecutorService keepAliveThreadExecutor;
153-
private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6);
154153

155-
private final Lock shutdownLock = new ReentrantLock();
154+
private final Lock connectLock = new ReentrantLock();
156155

157156
/**
158157
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
@@ -397,69 +396,84 @@ public void setThreadFactory(ThreadFactory threadFactory) {
397396
* @throws IOException if anything goes wrong while trying to connect
398397
*/
399398
public void connect() throws IOException {
400-
if (connected) {
399+
if (!connectLock.tryLock()) {
401400
throw new IllegalStateException("BinaryLogClient is already connected");
402401
}
403-
GreetingPacket greetingPacket;
402+
boolean notifyWhenDisconnected = false;
404403
try {
405404
try {
406-
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
407-
socket.connect(new InetSocketAddress(hostname, port));
408-
channel = new PacketChannel(socket);
409-
if (channel.getInputStream().peek() == -1) {
410-
throw new EOFException();
405+
channel = openChannel();
406+
GreetingPacket greetingPacket = receiveGreeting();
407+
authenticate(greetingPacket);
408+
connectionId = greetingPacket.getThreadId();
409+
if (binlogFilename == null) {
410+
fetchBinlogFilenameAndPosition();
411411
}
412+
if (binlogPosition < 4) {
413+
if (logger.isLoggable(Level.WARNING)) {
414+
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
415+
}
416+
binlogPosition = 4;
417+
}
418+
ChecksumType checksumType = fetchBinlogChecksum();
419+
if (checksumType != ChecksumType.NONE) {
420+
confirmSupportOfChecksum(checksumType);
421+
}
422+
requestBinaryLogStream();
412423
} catch (IOException e) {
413-
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
414-
". Please make sure it's running.", e);
424+
disconnectChannel();
425+
throw e;
415426
}
416-
greetingPacket = receiveGreeting();
417-
authenticate(greetingPacket);
418-
if (binlogFilename == null) {
419-
fetchBinlogFilenameAndPosition();
420-
}
421-
if (binlogPosition < 4) {
422-
if (logger.isLoggable(Level.WARNING)) {
423-
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
427+
connected = true;
428+
notifyWhenDisconnected = true;
429+
if (logger.isLoggable(Level.INFO)) {
430+
String position;
431+
synchronized (gtidSetAccessLock) {
432+
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
424433
}
425-
binlogPosition = 4;
434+
logger.info("Connected to " + hostname + ":" + port + " at " + position +
435+
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
426436
}
427-
ChecksumType checksumType = fetchBinlogChecksum();
428-
if (checksumType != ChecksumType.NONE) {
429-
confirmSupportOfChecksum(checksumType);
437+
synchronized (lifecycleListeners) {
438+
for (LifecycleListener lifecycleListener : lifecycleListeners) {
439+
lifecycleListener.onConnect(this);
440+
}
430441
}
431-
requestBinaryLogStream();
432-
} catch (IOException e) {
433-
if (channel != null && channel.isOpen()) {
434-
channel.close();
442+
if (keepAlive && !isKeepAliveThreadRunning()) {
443+
spawnKeepAliveThread();
435444
}
436-
throw e;
437-
}
438-
connected = true;
439-
connectionId = greetingPacket.getThreadId();
440-
if (logger.isLoggable(Level.INFO)) {
441-
String position;
445+
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
442446
synchronized (gtidSetAccessLock) {
443-
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
447+
if (gtidSet != null) {
448+
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
449+
}
444450
}
445-
logger.info("Connected to " + hostname + ":" + port + " at " + position +
446-
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
447-
}
448-
synchronized (lifecycleListeners) {
449-
for (LifecycleListener lifecycleListener : lifecycleListeners) {
450-
lifecycleListener.onConnect(this);
451+
listenForEventPackets();
452+
} finally {
453+
connectLock.unlock();
454+
if (notifyWhenDisconnected) {
455+
synchronized (lifecycleListeners) {
456+
for (LifecycleListener lifecycleListener : lifecycleListeners) {
457+
lifecycleListener.onDisconnect(this);
458+
}
459+
}
451460
}
452461
}
453-
if (keepAlive && !isKeepAliveThreadRunning()) {
454-
spawnKeepAliveThread();
455-
}
456-
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
457-
synchronized (gtidSetAccessLock) {
458-
if (gtidSet != null) {
459-
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
462+
}
463+
464+
private PacketChannel openChannel() throws IOException {
465+
try {
466+
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
467+
socket.connect(new InetSocketAddress(hostname, port));
468+
PacketChannel channel = new PacketChannel(socket);
469+
if (channel.getInputStream().peek() == -1) {
470+
throw new EOFException();
460471
}
472+
return channel;
473+
} catch (IOException e) {
474+
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
475+
". Please make sure it's running.", e);
461476
}
462-
listenForEventPackets();
463477
}
464478

465479
private GreetingPacket receiveGreeting() throws IOException {
@@ -540,51 +554,46 @@ private void authenticate(GreetingPacket greetingPacket) throws IOException {
540554
}
541555

542556
private void spawnKeepAliveThread() {
543-
keepAliveThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
557+
final ExecutorService threadExecutor =
558+
Executors.newSingleThreadExecutor(new ThreadFactory() {
544559

545-
@Override
546-
public Thread newThread(Runnable runnable) {
547-
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
548-
}
549-
});
550-
keepAliveThreadExecutor.submit(new Runnable() {
560+
@Override
561+
public Thread newThread(Runnable runnable) {
562+
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
563+
}
564+
});
565+
threadExecutor.submit(new Runnable() {
551566
@Override
552567
public void run() {
553-
while (true) {
568+
while (!threadExecutor.isShutdown()) {
554569
try {
555570
Thread.sleep(keepAliveInterval);
556571
} catch (InterruptedException e) {
557572
// expected in case of disconnect
558573
}
559-
shutdownLock.lock();
574+
if (threadExecutor.isShutdown()) {
575+
return;
576+
}
560577
try {
561-
if (keepAliveThreadExecutor.isShutdown()) {
562-
return;
578+
channel.write(new PingCommand());
579+
} catch (IOException e) {
580+
if (logger.isLoggable(Level.INFO)) {
581+
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
563582
}
564583
try {
565-
channel.write(new PingCommand());
566-
} catch (IOException e) {
567-
if (logger.isLoggable(Level.INFO)) {
568-
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
569-
}
570-
try {
571-
if (isConnected()) {
572-
disconnectChannel();
573-
}
574-
connect(keepAliveConnectTimeout);
575-
} catch (Exception ce) {
576-
if (logger.isLoggable(Level.WARNING)) {
577-
logger.warning("Failed to restore connection to " + hostname + ":" + port +
578-
". Next attempt in " + keepAliveInterval + "ms");
579-
}
584+
terminateConnect();
585+
connect(keepAliveConnectTimeout);
586+
} catch (Exception ce) {
587+
if (logger.isLoggable(Level.WARNING)) {
588+
logger.warning("Failed to restore connection to " + hostname + ":" + port +
589+
". Next attempt in " + keepAliveInterval + "ms");
580590
}
581591
}
582-
} finally {
583-
shutdownLock.unlock();
584592
}
585593
}
586594
}
587595
});
596+
keepAliveThreadExecutor = threadExecutor;
588597
}
589598

590599
private Thread newNamedThread(Runnable runnable, String threadName) {
@@ -895,7 +904,7 @@ public void registerLifecycleListener(LifecycleListener lifecycleListener) {
895904
/**
896905
* Unregister all lifecycle listener of specific type.
897906
*/
898-
public synchronized void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
907+
public void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
899908
synchronized (lifecycleListeners) {
900909
Iterator<LifecycleListener> iterator = lifecycleListeners.iterator();
901910
while (iterator.hasNext()) {
@@ -910,7 +919,7 @@ public synchronized void unregisterLifecycleListener(Class<? extends LifecycleLi
910919
/**
911920
* Unregister single lifecycle listener.
912921
*/
913-
public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) {
922+
public void unregisterLifecycleListener(LifecycleListener eventListener) {
914923
synchronized (lifecycleListeners) {
915924
lifecycleListeners.remove(eventListener);
916925
}
@@ -922,48 +931,49 @@ public synchronized void unregisterLifecycleListener(LifecycleListener eventList
922931
* As the result following {@link #connect()} resumes client from where it left off.
923932
*/
924933
public void disconnect() throws IOException {
925-
shutdownLock.lock();
926-
try {
927-
if (isKeepAliveThreadRunning()) {
928-
keepAliveThreadExecutor.shutdownNow();
929-
}
930-
disconnectChannel();
931-
} finally {
932-
shutdownLock.unlock();
934+
terminateKeepAliveThread();
935+
terminateConnect();
936+
}
937+
938+
private void terminateKeepAliveThread() {
939+
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
940+
if (keepAliveThreadExecutor == null) {
941+
return;
933942
}
934-
if (isKeepAliveThreadRunning()) {
935-
waitForKeepAliveThreadToBeTerminated();
943+
keepAliveThreadExecutor.shutdownNow();
944+
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
945+
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
946+
// ignore
936947
}
937948
}
938949

939-
private void waitForKeepAliveThreadToBeTerminated() {
940-
boolean terminated = false;
950+
private static boolean awaitTerminationInterruptibly(ExecutorService executorService, long timeout, TimeUnit unit) {
941951
try {
942-
terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout,
943-
TimeUnit.MILLISECONDS);
952+
return executorService.awaitTermination(timeout, unit);
944953
} catch (InterruptedException e) {
945-
if (logger.isLoggable(Level.WARNING)) {
946-
logger.log(Level.WARNING, e.getMessage());
947-
}
954+
return false;
948955
}
949-
if (!terminated) {
950-
throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " +
951-
keepAliveThreadShutdownTimeout + "ms");
956+
}
957+
958+
private void terminateConnect() throws IOException {
959+
do {
960+
disconnectChannel();
961+
} while (!tryLockInterruptibly(connectLock, 1000, TimeUnit.MILLISECONDS));
962+
connectLock.unlock();
963+
}
964+
965+
private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit) {
966+
try {
967+
return lock.tryLock(time, unit);
968+
} catch (InterruptedException e) {
969+
return false;
952970
}
953971
}
954972

955973
private void disconnectChannel() throws IOException {
956-
try {
957-
connected = false;
958-
if (channel != null && channel.isOpen()) {
959-
channel.close();
960-
}
961-
} finally {
962-
synchronized (lifecycleListeners) {
963-
for (LifecycleListener lifecycleListener : lifecycleListeners) {
964-
lifecycleListener.onDisconnect(this);
965-
}
966-
}
974+
connected = false;
975+
if (channel != null && channel.isOpen()) {
976+
channel.close();
967977
}
968978
}
969979

0 commit comments

Comments
 (0)