Skip to content

Commit 088b1f7

Browse files
author
Pan
committed
Added timeout functionality to join and output reading for native clients.
1 parent 9e79ebe commit 088b1f7

File tree

8 files changed

+482
-354
lines changed

8 files changed

+482
-354
lines changed

Changelog.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
Change Log
22
============
33

4+
1.3.1
5+
++++++
6+
7+
Changes
8+
--------
9+
10+
* Added ``timeout`` optional parameter to ``join`` and ``run_command``, for reading output, on native clients.
11+
12+
Fixes
13+
------
14+
15+
* From source builds when Cython is installed with recent versions of ``ssh2-python``.
16+
417
1.3.0
518
++++++
619

pssh/native/_ssh2.c

Lines changed: 393 additions & 331 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pssh/native/_ssh2.pyx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ from ..exceptions import SessionError
3939
cdef bytes LINESEP = b'\n'
4040

4141

42-
def _read_output(Session session, read_func):
42+
def _read_output(Session session, read_func, timeout=None):
4343
cdef Py_ssize_t _size
4444
cdef bytes _data
4545
cdef bytes remainder = b""
@@ -51,8 +51,10 @@ def _read_output(Session session, read_func):
5151
_size, _data = read_func()
5252
while _size == LIBSSH2_ERROR_EAGAIN or _size > 0:
5353
if _size == LIBSSH2_ERROR_EAGAIN:
54-
_wait_select(_sock, _session, None)
54+
_wait_select(_sock, _session, timeout)
5555
_size, _data = read_func()
56+
if timeout is not None and _size == LIBSSH2_ERROR_EAGAIN:
57+
break
5658
while _size > 0:
5759
while _pos < _size:
5860
linesep = _data[:_size].find(LINESEP, _pos)

pssh/output.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,6 @@ def __repr__(self):
8383
stdout=self.stdout, stdin=self.stdin, stderr=self.stderr,
8484
exception=self.exception, linesep=linesep,
8585
exit_code=self.exit_code)
86+
87+
def __str__(self):
88+
return self.__repr__()

pssh/pssh2_client.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def __init__(self, hosts, user=None, password=None, port=None, pkey=None,
104104

105105
def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
106106
use_pty=False, host_args=None, shell=None,
107-
encoding='utf-8'):
107+
encoding='utf-8', timeout=None):
108108
"""Run command on all hosts in parallel, honoring self.pool_size,
109109
and return output dictionary.
110110
@@ -152,6 +152,10 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
152152
:param encoding: Encoding to use for output. Must be valid
153153
`Python codec <https://docs.python.org/2.7/library/codecs.html>`_
154154
:type encoding: str
155+
:param timeout: (Optional) Timeout in seconds for reading from stdout
156+
or stderr. Defaults to no timeout. Reading from stdout/stderr will
157+
timeout after this many seconds if remote output is not ready.
158+
:type timeout: int
155159
156160
:rtype: Dictionary with host as key and
157161
:py:class:`pssh.output.HostOutput` as value as per
@@ -169,24 +173,24 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
169173
string format
170174
:raises: :py:class:`KeyError` on no host argument key in arguments
171175
dict for cmd string format
172-
:raises: :py:class:`pssh.exceptions.ProxyErrors` on errors connecting
176+
:raises: :py:class:`pssh.exceptions.ProxyError` on errors connecting
173177
to proxy if a proxy host has been set.
174178
"""
175179
return BaseParallelSSHClient.run_command(
176180
self, command, stop_on_errors=stop_on_errors, host_args=host_args,
177181
user=user, shell=shell, sudo=sudo,
178-
encoding=encoding, use_pty=use_pty)
182+
encoding=encoding, use_pty=use_pty, timeout=timeout)
179183

180184
def _run_command(self, host, command, sudo=False, user=None,
181185
shell=None, use_pty=False,
182-
encoding='utf-8'):
186+
encoding='utf-8', timeout=None):
183187
"""Make SSHClient if needed, run command on host"""
184188
self._make_ssh_client(host)
185189
return self.host_clients[host].run_command(
186190
command, sudo=sudo, user=user, shell=shell,
187-
use_pty=use_pty, encoding=encoding)
191+
use_pty=use_pty, encoding=encoding, timeout=timeout)
188192

189-
def join(self, output, consume_output=False):
193+
def join(self, output, consume_output=False, timeout=None):
190194
"""Wait until all remote commands in output have finished
191195
and retrieve exit codes. Does *not* block other commands from
192196
running in parallel.
@@ -198,11 +202,17 @@ def join(self, output, consume_output=False):
198202
buffers. Output buffers will be empty after ``join`` if set
199203
to ``True``. Must be set to ``True`` to allow host logger to log
200204
output on call to ``join`` when host logger has been enabled.
201-
:type consume_output: bool"""
205+
:type consume_output: bool
206+
:param timeout: Timeout in seconds if remote command is not yet
207+
finished.
208+
:type timeout: int
209+
210+
:rtype: ``None``"""
202211
for host in output:
203212
if host not in self.host_clients or self.host_clients[host] is None:
204213
continue
205-
self.host_clients[host].wait_finished(output[host].channel)
214+
self.host_clients[host].wait_finished(output[host].channel,
215+
timeout=timeout)
206216
if consume_output:
207217
for line in output[host].stdout:
208218
pass
@@ -217,6 +227,8 @@ def _get_exit_code(self, channel):
217227
return channel.get_exit_status()
218228

219229
def _start_tunnel(self, host):
230+
if host in self._tunnels:
231+
return self._tunnels[host]
220232
tunnel = Tunnel(
221233
self.proxy_host, host, self.port, user=self.proxy_user,
222234
password=self.proxy_password, port=self.proxy_port,
@@ -284,7 +296,7 @@ def copy_file(self, local_file, remote_file, recurse=False):
284296
285297
.. note ::
286298
287-
Remote directories in `remote_file` that do not exist will be
299+
Remote directories in ``remote_file`` that do not exist will be
288300
created as long as permissions allow.
289301
290302
"""

pssh/ssh2_client.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -253,35 +253,43 @@ def execute(self, cmd, use_pty=False, channel=None):
253253
self._eagain(channel.execute, cmd)
254254
return channel
255255

256-
def read_stderr(self, channel):
256+
def read_stderr(self, channel, timeout=None):
257257
"""Read standard error buffer from channel.
258258
259259
:param channel: Channel to read output from.
260260
:type channel: :py:class:`ssh2.channel.Channel`
261261
"""
262-
return _read_output(self.session, channel.read_stderr)
262+
return _read_output(self.session, channel.read_stderr, timeout=timeout)
263263

264-
def read_output(self, channel):
264+
def read_output(self, channel, timeout=None):
265265
"""Read standard output buffer from channel.
266266
267267
:param channel: Channel to read output from.
268268
:type channel: :py:class:`ssh2.channel.Channel`
269269
"""
270-
return _read_output(self.session, channel.read)
270+
return _read_output(self.session, channel.read, timeout=timeout)
271271

272-
def wait_finished(self, channel):
272+
def wait_finished(self, channel, timeout=None):
273273
"""Wait for EOF from channel, close channel and wait for
274274
close acknowledgement.
275275
276276
Used to wait for remote command completion and be able to gather
277277
exit code.
278278
279-
:param channel: The channel to use
279+
:param channel: The channel to use.
280280
:type channel: :py:class:`ssh2.channel.Channel`
281281
"""
282282
if channel is None:
283283
return
284-
self._eagain(channel.wait_eof)
284+
# If .eof() returns EAGAIN after a select with a timeout, it means
285+
# it reached timeout without EOF and the connection should not be
286+
# closed as the command is still running.
287+
ret = channel.wait_eof()
288+
while ret == LIBSSH2_ERROR_EAGAIN:
289+
wait_select(self.session, timeout=timeout)
290+
ret = channel.wait_eof()
291+
if ret == LIBSSH2_ERROR_EAGAIN and timeout is not None:
292+
return
285293
self._eagain(channel.close)
286294
self._eagain(channel.wait_closed)
287295

@@ -317,7 +325,7 @@ def read_output_buffer(self, output_buffer, prefix=None,
317325

318326
def run_command(self, command, sudo=False, user=None,
319327
use_pty=False, shell=None,
320-
encoding='utf-8'):
328+
encoding='utf-8', timeout=None):
321329
"""Run remote command.
322330
323331
:param command: Command to run.
@@ -352,9 +360,10 @@ def run_command(self, command, sudo=False, user=None,
352360
channel = self.execute(_command, use_pty=use_pty)
353361
return channel, self.host, \
354362
self.read_output_buffer(
355-
self.read_output(channel), encoding=encoding), \
363+
self.read_output(channel, timeout=timeout),
364+
encoding=encoding), \
356365
self.read_output_buffer(
357-
self.read_stderr(channel), encoding=encoding,
366+
self.read_stderr(channel, timeout=timeout), encoding=encoding,
358367
prefix='\t[err]'), channel
359368

360369
def _make_sftp(self):

setup.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# License along with this library; if not, write to the Free Software
1414
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1515

16+
import os
1617
import platform
1718
from setuptools import setup, find_packages
1819
from platform import python_version
@@ -36,7 +37,11 @@
3637
'optimize.use_switch': True,
3738
'wraparound': False,
3839
}
39-
cython_args = {'cython_directives': cython_directives} if USING_CYTHON else {}
40+
_embedded_lib = bool(os.environ.get('EMBEDDED_LIB', 1))
41+
42+
cython_args = {'cython_directives': cython_directives,
43+
'cython_compile_time_env': {'EMBEDDED_LIB': _embedded_lib},
44+
} if USING_CYTHON else {}
4045

4146
_libs = ['ssh2'] if platform.system() != 'Windows' else [
4247
# For libssh2 OpenSSL backend on Windows.
@@ -47,6 +52,7 @@
4752

4853
ext = 'pyx' if USING_CYTHON else 'c'
4954
_comp_args = ["-O3"] if platform.system() != 'Windows' else None
55+
5056
extensions = [
5157
Extension('pssh.native._ssh2',
5258
sources=['pssh/native/_ssh2.%s' % ext],

tests/test_pssh_ssh2_client.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ def test_run_command_user_sudo(self):
11101110
self.client.join(output)
11111111
stderr = list(output[self.host].stderr)
11121112
self.assertTrue(len(stderr) > 0)
1113-
self.assertTrue(output[self.host].exit_code == 1)
1113+
self.assertEqual(output[self.host].exit_code, 1)
11141114

11151115
def test_run_command_shell(self):
11161116
output = self.client.run_command(self.cmd, shell="bash -c")
@@ -1163,6 +1163,27 @@ def test_host_no_client(self):
11631163
output = {'blah': None}
11641164
self.client.join(output)
11651165

1166+
def test_join_timeout(self):
1167+
client = ParallelSSHClient([self.host], port=self.port,
1168+
pkey=self.user_key)
1169+
output = client.run_command('sleep 2')
1170+
client.join(output, timeout=1)
1171+
self.assertFalse(output[self.host].channel.eof())
1172+
client.join(output, timeout=2)
1173+
self.assertTrue(output[self.host].channel.eof())
1174+
1175+
def test_read_timeout(self):
1176+
client = ParallelSSHClient([self.host], port=self.port,
1177+
pkey=self.user_key)
1178+
output = client.run_command('sleep 2', timeout=1)
1179+
stdout = list(output[self.host].stdout)
1180+
self.assertFalse(output[self.host].channel.eof())
1181+
self.assertEqual(len(stdout), 0)
1182+
list(output[self.host].stdout)
1183+
list(output[self.host].stdout)
1184+
client.join(output)
1185+
self.assertTrue(output[self.host].channel.eof())
1186+
11661187
## OpenSSHServer needs to run in its own thread for this test to work
11671188
## Race conditions otherwise.
11681189
#

0 commit comments

Comments
 (0)