|
| 1 | +# pylint: disable=missing-docstring |
| 2 | +import os |
| 3 | +import threading |
| 4 | +import warnings |
| 5 | +from http.server import HTTPServer, SimpleHTTPRequestHandler |
| 6 | +from ssl import get_default_verify_paths, SSLContext, PROTOCOL_TLSv1_1, PROTOCOL_TLSv1_2 |
| 7 | +from typing import Callable |
| 8 | + |
| 9 | +import pytest |
| 10 | +import urllib3 |
| 11 | +from requests.exceptions import SSLError |
| 12 | + |
| 13 | +from ibm_cloud_sdk_core.base_service import BaseService |
| 14 | +from ibm_cloud_sdk_core.authenticators import NoAuthAuthenticator |
| 15 | + |
| 16 | + |
| 17 | +# The certificate files that are used in this tests are generated by this command: |
| 18 | +# pylint: disable=line-too-long,pointless-string-statement |
| 19 | +""" |
| 20 | +openssl req -x509 -out test_ssl.crt -keyout test_ssl.key \ |
| 21 | + -newkey rsa:2048 -nodes -sha256 -days 36500 \ |
| 22 | + -subj '/CN=localhost' -extensions EXT -config <( \ |
| 23 | + printf "[dn]\nCN=localhost\n[req]\ndistinguished_name = dn\n[EXT]\nsubjectAltName=DNS:localhost\nkeyUsage=digitalSignature\nextendedKeyUsage=serverAuth") |
| 24 | +""" |
| 25 | + |
| 26 | + |
| 27 | +# Load the certificate and the key files. |
| 28 | +cert = os.path.join(os.path.dirname(__file__), '../resources/test_ssl.crt') |
| 29 | +key = os.path.join(os.path.dirname(__file__), '../resources/test_ssl.key') |
| 30 | + |
| 31 | + |
| 32 | +def _local_server(tls_version: int, port: int) -> Callable: |
| 33 | + def decorator(test_function: Callable) -> Callable: |
| 34 | + def inner(): |
| 35 | + # Disable warnings caused by the self-signed certificate. |
| 36 | + urllib3.disable_warnings() |
| 37 | + |
| 38 | + # Build the SSL context for the server. |
| 39 | + ssl_context = SSLContext(tls_version) |
| 40 | + ssl_context.load_cert_chain(certfile=cert, keyfile=key) |
| 41 | + |
| 42 | + # Create and start the server on a separate thread. |
| 43 | + server = HTTPServer(('localhost', port), SimpleHTTPRequestHandler) |
| 44 | + server.socket = ssl_context.wrap_socket(server.socket, server_side=True) |
| 45 | + t = threading.Thread(target=server.serve_forever) |
| 46 | + t.start() |
| 47 | + |
| 48 | + # We run everything in a big try-except-finally block to make sure we always |
| 49 | + # shutdown the HTTP server gracefully. |
| 50 | + try: |
| 51 | + test_function() |
| 52 | + except Exception: # pylint: disable=try-except-raise |
| 53 | + raise |
| 54 | + finally: |
| 55 | + server.shutdown() |
| 56 | + t.join() |
| 57 | + # Re-enable warnings. |
| 58 | + warnings.resetwarnings() |
| 59 | + |
| 60 | + return inner |
| 61 | + |
| 62 | + return decorator |
| 63 | + |
| 64 | + |
| 65 | +@_local_server(PROTOCOL_TLSv1_1, 3333) |
| 66 | +def test_tls_v1_1(): |
| 67 | + service = BaseService(service_url='https://localhost:3333', authenticator=NoAuthAuthenticator()) |
| 68 | + prepped = service.prepare_request('GET', url='/') |
| 69 | + # The following request should fail, because the server will try |
| 70 | + # to use TLS v1.1 but that's not allowed in our client. |
| 71 | + with pytest.raises(Exception) as exception: |
| 72 | + service.send(prepped, verify=cert) |
| 73 | + # Errors can be differ based on the Python version. |
| 74 | + assert exception.type is SSLError or exception.type is ConnectionError |
| 75 | + |
| 76 | + |
| 77 | +@_local_server(PROTOCOL_TLSv1_2, 3334) |
| 78 | +def test_tls_v1_2(): |
| 79 | + service = BaseService(service_url='https://localhost:3334', authenticator=NoAuthAuthenticator()) |
| 80 | + |
| 81 | + # First call the server with the default configuration. |
| 82 | + # It should fail due to the self-signed SSL cert. |
| 83 | + assert service.disable_ssl_verification is False |
| 84 | + prepped = service.prepare_request('GET', url='/') |
| 85 | + with pytest.raises(SSLError, match='certificate verify failed: self-signed certificate'): |
| 86 | + res = service.send(prepped) |
| 87 | + |
| 88 | + # Next configure it to validate by using our local certificate. Should raise no exception. |
| 89 | + res = service.send(prepped, verify=cert) |
| 90 | + assert res is not None |
| 91 | + |
| 92 | + # Now disable the SSL verification. The request shouldn't raise any issue. |
| 93 | + service.set_disable_ssl_verification(True) |
| 94 | + assert service.disable_ssl_verification is True |
| 95 | + prepped = service.prepare_request('GET', url='/') |
| 96 | + res = service.send(prepped) |
| 97 | + assert res is not None |
| 98 | + |
| 99 | + # Lastly, try with an external URL. |
| 100 | + # This test case is mainly here to reproduce the regression |
| 101 | + # in the `requests` package that was introduced in `2.32.3`. |
| 102 | + # More details on the issue can be found here: https://github.com/psf/requests/issues/6730 |
| 103 | + service = BaseService(service_url='https://cloud.ibm.com', authenticator=NoAuthAuthenticator()) |
| 104 | + assert service.disable_ssl_verification is False |
| 105 | + |
| 106 | + ssl_context = service.http_adapter.poolmanager.connection_pool_kw.get("ssl_context") |
| 107 | + assert ssl_context is not None |
| 108 | + # In some cases (especially in Ubuntu containers that we use for testing on Travis) |
| 109 | + # the default CA certificates are stored in a different place, so let's try to |
| 110 | + # load those before making the final decision for this test case. |
| 111 | + if len(ssl_context.get_ca_certs()) == 0: |
| 112 | + try: |
| 113 | + default_ca_path = get_default_verify_paths().capath |
| 114 | + ssl_context.load_verify_locations(os.path.join(default_ca_path, 'ca-certificates.crt')) |
| 115 | + except: |
| 116 | + # Errors are ignored, let's jump straight to the assertion. |
| 117 | + pass |
| 118 | + |
| 119 | + assert len(ssl_context.get_ca_certs()) > 0 |
| 120 | + |
| 121 | + prepped = service.prepare_request('GET', url='/status') |
| 122 | + res = service.send(prepped) |
| 123 | + assert res is not None |
0 commit comments