Skip to content

Combined PR: SR-9033 handle EPOLLHUP #478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/event/event_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ _dispatch_get_buffer_size(dispatch_muxnote_t dmn, bool writer)
return (uintptr_t)n;
}

static void
_dispatch_event_merge_hangup(dispatch_unote_t du)
{
// consumed by dux_merge_evt()
_dispatch_retain_unote_owner(du);
dispatch_unote_state_t du_state = _dispatch_unote_state(du);
du_state |= DU_STATE_NEEDS_DELETE;
du_state &= ~DU_STATE_ARMED;
_dispatch_unote_state_set(du, du_state);
uintptr_t data = 0; // EOF
os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed);
dux_merge_evt(du._du, EV_DELETE|EV_DISPATCH, data, 0);
}

static void
_dispatch_event_merge_fd(dispatch_muxnote_t dmn, uint32_t events)
{
Expand Down Expand Up @@ -583,6 +597,20 @@ _dispatch_event_merge_fd(dispatch_muxnote_t dmn, uint32_t events)
}
}

// SR-9033: EPOLLHUP is an unmaskable event which we must respond to
if (events & EPOLLHUP) {
LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_hangup(du);
}
LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) {
dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul);
_dispatch_event_merge_hangup(du);
}
epoll_ctl(_dispatch_epfd, EPOLL_CTL_DEL, dmn->dmn_fd, NULL);
return;
}

events = _dispatch_muxnote_armed_events(dmn);
if (events) _dispatch_epoll_update(dmn, events, EPOLL_CTL_MOD);
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ set(DISPATCH_C_TESTS
starfish
data
io_net
io_pipe_close
select)

# Tests that usually pass, but occasionally fail.
Expand Down Expand Up @@ -194,6 +195,8 @@ foreach(test ${DISPATCH_C_TESTS})
dispatch_${test}.c)
endforeach()

set_tests_properties(dispatch_io_pipe_close PROPERTIES TIMEOUT 5)

# test dispatch API for various C/CXX language variants
add_unit_test(dispatch_c99 NO_BSD_OVERLAY SOURCES dispatch_c99.c)
add_unit_test(dispatch_plusplus SOURCES dispatch_plusplus.cpp)
Expand Down
71 changes: 71 additions & 0 deletions tests/dispatch_io_pipe_close.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2019 Apple Inc. All rights reserved.
*
* @APPLE_APACHE_LICENSE_HEADER_START@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @APPLE_APACHE_LICENSE_HEADER_END@
*/

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <limits.h>
#include <errno.h>

#include <bsdtests.h>
#include "dispatch_test.h"
#include <dispatch/dispatch.h>

int
main() {
int pipe_fds[2] = { -1, -1 };
int pipe_err = pipe(pipe_fds);
int readFD = pipe_fds[0];
int writeFD = pipe_fds[1];

dispatch_test_start(NULL);
if (pipe_err) {
test_errno("pipe", errno, 0);
test_stop();
_Exit(EXIT_FAILURE);
}

printf("readFD=%d, writeFD=%d\n", readFD, writeFD);
dispatch_queue_t q = dispatch_queue_create("q", NULL);
dispatch_io_t io = dispatch_io_create(DISPATCH_IO_STREAM, readFD, q, ^(int err) {
printf("cleanup, err=%d\n", err);
close(readFD);
printf("all done\n");
test_stop();
_Exit(EXIT_SUCCESS);
});
dispatch_io_set_low_water(io, 0);
dispatch_io_read(io, 0, UINT_MAX, q, ^(bool done, dispatch_data_t data, int err) {
printf("read: \%d, %zu, %d\n", done, data == NULL ? 0 : dispatch_data_get_size(data), err);
if (data != NULL && dispatch_data_get_size(data) > 0) {
// will only happen once
printf("closing writeFD\n");
close(writeFD);
dispatch_after(DISPATCH_TIME_NOW + 1, q, ^{
dispatch_io_close(io, 0);
});
}
});
dispatch_resume(io);
printf("writing\n");
write(writeFD, "x", 1);
printf("wrtten\n");
dispatch_main();
}