Skip to content

Commit 07fe599

Browse files
author
Pan
committed
Added native clients proxying/tunnelling implementation, tests, exception.
1 parent 0e1e5c7 commit 07fe599

File tree

9 files changed

+285
-31
lines changed

9 files changed

+285
-31
lines changed

Changelog.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
Change Log
22
============
33

4+
1.3.0
5+
++++++
6+
7+
Changes
8+
---------
9+
10+
* Native clients proxy implementation
11+
* Native clients connection and authentication retry mechanism
12+
13+
Proxy/tunnelling implementation is experimental - please report any issues.
14+
415
1.2.1
516
++++++
617

pssh/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,7 @@ class SFTPError(Exception):
5757
class SFTPIOError(SFTPError):
5858
"""Raised on SFTP IO errors"""
5959
pass
60+
61+
62+
class ProxyError(Exception):
63+
"""Raised on proxy errors"""

pssh/pssh2_client.py

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1717

1818
import logging
19+
from gevent import sleep
1920

2021
from .base_pssh import BaseParallelSSHClient
2122
from .constants import DEFAULT_RETRIES, RETRY_DELAY
2223
from .ssh2_client import SSHClient
24+
from .exceptions import ProxyError
25+
from .tunnel import Tunnel
2326

2427

2528
logger = logging.getLogger(__name__)
@@ -30,7 +33,9 @@ class ParallelSSHClient(BaseParallelSSHClient):
3033

3134
def __init__(self, hosts, user=None, password=None, port=None, pkey=None,
3235
num_retries=DEFAULT_RETRIES, timeout=None, pool_size=10,
33-
allow_agent=True, host_config=None, retry_delay=RETRY_DELAY):
36+
allow_agent=True, host_config=None, retry_delay=RETRY_DELAY,
37+
proxy_host=None, proxy_port=22,
38+
proxy_user=None, proxy_password=None, proxy_pkey=None):
3439
"""
3540
:param hosts: Hosts to connect to
3641
:type hosts: list(str)
@@ -64,23 +69,47 @@ def __init__(self, hosts, user=None, password=None, port=None, pkey=None,
6469
not all hosts use the same configuration.
6570
:type host_config: dict
6671
:param allow_agent: (Optional) set to False to disable connecting to
67-
the system's SSH agent
72+
the system's SSH agent.
6873
:type allow_agent: bool
74+
:param proxy_host: (Optional) SSH host to tunnel connection through
75+
so that SSH clients connect to host via client -> proxy_host -> host
76+
:type proxy_host: str
77+
:param proxy_port: (Optional) SSH port to use to login to proxy host if
78+
set. Defaults to 22.
79+
:type proxy_port: int
80+
:param proxy_user: (Optional) User to login to ``proxy_host`` as.
81+
Defaults to logged in user.
82+
:type proxy_user: str
83+
:param proxy_password: (Optional) Password to login to ``proxy_host``
84+
with. Defaults to no password.
85+
:type proxy_password: str
86+
:param proxy_pkey: (Optional) Private key file to be used for
87+
authentication with ``proxy_host``. Defaults to available keys from
88+
SSHAgent and user's SSH identities.
89+
:type proxy_pkey: Private key file path to use. Note that the public
90+
key file pair *must* also exist in the same location with name
91+
``<pkey>.pub``.
6992
"""
7093
BaseParallelSSHClient.__init__(
7194
self, hosts, user=user, password=password, port=port, pkey=pkey,
7295
allow_agent=allow_agent, num_retries=num_retries,
7396
timeout=timeout, pool_size=pool_size,
7497
host_config=host_config, retry_delay=retry_delay)
98+
self.proxy_host = proxy_host
99+
self.proxy_port = proxy_port
100+
self.proxy_pkey = proxy_pkey
101+
self.proxy_user = proxy_user
102+
self.proxy_password = proxy_password
103+
self._tunnels = {}
75104

76105
def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
77106
use_pty=False, host_args=None, shell=None,
78107
encoding='utf-8'):
79108
"""Run command on all hosts in parallel, honoring self.pool_size,
80109
and return output dictionary.
81110
82-
This function will block until all commands have been successfully
83-
received by remote servers and then return immediately.
111+
This function will block until all commands have been received
112+
by remote servers and then return immediately.
84113
85114
More explicitly, function will return after connection and
86115
authentication establishment and after commands have been accepted by
@@ -139,7 +168,10 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
139168
:raises: :py:class:`TypeError` on not enough host arguments for cmd
140169
string format
141170
:raises: :py:class:`KeyError` on no host argument key in arguments
142-
dict for cmd string format"""
171+
dict for cmd string format
172+
:raises: :py:class:`pssh.exceptions.ProxyErrors` on errors connecting
173+
to proxy if a proxy host has been set.
174+
"""
143175
return BaseParallelSSHClient.run_command(
144176
self, command, stop_on_errors=stop_on_errors, host_args=host_args,
145177
user=user, shell=shell, sudo=sudo,
@@ -184,11 +216,34 @@ def _get_exit_code(self, channel):
184216
return
185217
return channel.get_exit_status()
186218

219+
def _start_tunnel(self, host):
220+
tunnel = Tunnel(
221+
self.proxy_host, host, self.port, user=self.proxy_user,
222+
password=self.proxy_password, port=self.proxy_port,
223+
pkey=self.proxy_pkey, num_retries=self.num_retries,
224+
timeout=self.timeout, retry_delay=self.retry_delay,
225+
allow_agent=self.allow_agent)
226+
tunnel.daemon = True
227+
tunnel.start()
228+
self._tunnels[host] = tunnel
229+
while not tunnel.tunnel_open.is_set():
230+
logger.debug("Waiting for tunnel to become active")
231+
sleep(.1)
232+
if not tunnel.is_alive():
233+
msg = "Proxy authentication failed"
234+
logger.error(msg)
235+
raise ProxyError(msg)
236+
return tunnel
237+
187238
def _make_ssh_client(self, host):
188239
if host not in self.host_clients or self.host_clients[host] is None:
240+
if self.proxy_host is not None:
241+
tunnel = self._start_tunnel(host)
189242
_user, _port, _password, _pkey = self._get_host_config_values(host)
243+
_host = host if self.proxy_host is None else '127.0.0.1'
244+
_port = _port if self.proxy_host is None else tunnel.listen_port
190245
self.host_clients[host] = SSHClient(
191-
host, user=_user, password=_password, port=_port, pkey=_pkey,
246+
_host, user=_user, password=_password, port=_port, pkey=_pkey,
192247
num_retries=self.num_retries, timeout=self.timeout,
193248
allow_agent=self.allow_agent, retry_delay=self.retry_delay)
194249

pssh/pssh_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ def run_command(self, command, sudo=False, user=None, stop_on_errors=True,
128128
"""Run command on all hosts in parallel, honoring self.pool_size,
129129
and return output buffers.
130130
131-
This function will block until all commands have been *sent* to remote
132-
servers and then return immediately
131+
This function will block until all commands have been received
132+
by remote servers and then return immediately.
133133
134134
More explicitly, function will return after connection and
135-
authentication establishment and after commands have been sent to
135+
authentication establishment and after commands have been received by
136136
successfully established SSH channels.
137137
138138
Any connection and/or authentication exceptions will be raised here

pssh/ssh2_client.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@
2525
WIN_PLATFORM = False
2626
from socket import gaierror as sock_gaierror, error as sock_error
2727

28-
from gevent import sleep, get_hub
29-
from gevent import socket
28+
from gevent import sleep, socket, get_hub
3029
from gevent.hub import Hub
3130
from ssh2.error_codes import LIBSSH2_ERROR_EAGAIN
3231
from ssh2.exceptions import AuthenticationError, AgentError, \
@@ -89,63 +88,77 @@ def __init__(self, host,
8988
the system's SSH agent
9089
:type allow_agent: bool
9190
"""
92-
# proxy_host=None, proxy_port=22, proxy_user=None,
93-
# proxy_password=None, proxy_pkey=None,
9491
self.host = host
9592
self.user = user if user else None
96-
if self.user is None and WIN_PLATFORM is False:
93+
if self.user is None and not WIN_PLATFORM:
9794
self.user = pwd.getpwuid(os.geteuid()).pw_name
98-
elif self.user is None and WIN_PLATFORM is True:
95+
elif self.user is None and WIN_PLATFORM:
9996
raise ValueError("Must provide user parameter on Windows")
10097
self.password = password
10198
self.port = port if port else 22
10299
self.pkey = pkey
103100
self.num_retries = num_retries
104-
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
101+
self.sock = None
105102
self.timeout = timeout * 1000 if timeout else None
106103
self.retry_delay = retry_delay
107104
self.allow_agent = allow_agent
108-
self._connect()
105+
self.session = None
106+
self._connect(self.host, self.port)
109107
THREAD_POOL.apply(self._init)
110108

111-
def _init(self):
109+
def _connect_init_retry(self, retries):
110+
retries += 1
111+
self.session = None
112+
if not self.sock.closed:
113+
self.sock.close()
114+
sleep(self.retry_delay)
115+
self._connect(self.host, self.port, retries=retries)
116+
return self._init(retries=retries)
117+
118+
def _init(self, retries=1):
112119
self.session = Session()
113120
if self.timeout:
114121
self.session.set_timeout(self.timeout)
115122
try:
116123
self.session.handshake(self.sock)
117124
except SessionHandshakeError as ex:
125+
while retries < self.num_retries:
126+
return self._connect_init_retry(retries)
118127
msg = "Error connecting to host %s:%s - %s"
119128
raise SessionError(msg, self.host, self.port, ex)
120129
try:
121130
self.auth()
122131
except AuthenticationError as ex:
132+
while retries < self.num_retries:
133+
return self._connect_init_retry(retries)
123134
msg = "Authentication error while connecting to %s:%s - %s"
124135
raise AuthenticationException(msg, self.host, self.port, ex)
125136
self.session.set_blocking(0)
126137

127-
def _connect(self, retries=1):
138+
def _connect(self, host, port, retries=1):
139+
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
140+
logger.debug("Connecting to %s:%s", host, port)
128141
try:
129-
self.sock.connect((self.host, self.port))
142+
self.sock.connect((host, port))
130143
except sock_gaierror as ex:
131144
logger.error("Could not resolve host '%s' - retry %s/%s",
132-
self.host, retries, self.num_retries)
145+
host, retries, self.num_retries)
133146
while retries < self.num_retries:
134147
sleep(self.retry_delay)
135-
return self._connect(retries=retries+1)
148+
return self._connect(host, port, retries=retries+1)
136149
raise UnknownHostException("Unknown host %s - %s - retry %s/%s",
137-
self.host, str(ex.args[1]), retries,
150+
host, str(ex.args[1]), retries,
138151
self.num_retries)
139152
except sock_error as ex:
140153
logger.error("Error connecting to host '%s:%s' - retry %s/%s",
141-
self.host, self.port, retries, self.num_retries)
154+
host, port, retries, self.num_retries)
142155
while retries < self.num_retries:
143156
sleep(self.retry_delay)
144-
return self._connect(retries=retries+1)
157+
return self._connect(host, port, retries=retries+1)
145158
error_type = ex.args[1] if len(ex.args) > 1 else ex.args[0]
146159
raise ConnectionErrorException(
147160
"Error connecting to host '%s:%s' - %s - retry %s/%s",
148-
self.host, self.port, str(error_type), retries,
161+
host, port, str(error_type), retries,
149162
self.num_retries,)
150163

151164
def _pkey_auth(self):

0 commit comments

Comments
 (0)