Skip to content

[#247] PortManager__Generic uses lock-dirs for reserved ports #255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions testgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from .utils import \
reserve_port, \
release_port, \
bound_ports, \
get_bin_path, \
get_pg_config, \
get_pg_version
Expand All @@ -51,6 +50,7 @@
from .config import testgres_config

from .operations.os_ops import OsOperations, ConnectionParams
from .operations.os_ops import OsLockObj
from .operations.local_ops import LocalOperations
from .operations.remote_ops import RemoteOperations

Expand All @@ -64,7 +64,8 @@
"XLogMethod", "IsolationLevel", "NodeStatus", "ProcessType", "DumpFormat",
"PostgresNode", "NodeApp",
"PortManager",
"reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version",
"reserve_port", "release_port", "get_bin_path", "get_pg_config", "get_pg_version",
"First", "Any",
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams"
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams",
"OsLockObj",
]
4 changes: 4 additions & 0 deletions testgres/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
TMP_CACHE = 'tgsc_'
TMP_BACKUP = 'tgsb_'

TMP_TESTGRES = "testgres"

TMP_TESTGRES_PORTS = TMP_TESTGRES + "/ports"

# path to control file
XLOG_CONTROL_FILE = "global/pg_control"

Expand Down
94 changes: 86 additions & 8 deletions testgres/impl/port_manager__generic.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,142 @@
from ..operations.os_ops import OsOperations
from ..operations.os_ops import OsLockObj

from ..port_manager import PortManager
from ..exceptions import PortForException
from .. import consts

import os
import threading
import random
import typing
import logging


class PortManager__Generic(PortManager):
C_MIN_PORT_NUMBER = 1024
C_MAX_PORT_NUMBER = 65535

_os_ops: OsOperations
_guard: object
# TODO: is there better to use bitmap fot _available_ports?
_available_ports: typing.Set[int]
_reserved_ports: typing.Set[int]
_reserved_ports: typing.Dict[int, OsLockObj]

_lock_dir: str

def __init__(self, os_ops: OsOperations):
assert __class__.C_MIN_PORT_NUMBER <= __class__.C_MAX_PORT_NUMBER

assert os_ops is not None
assert isinstance(os_ops, OsOperations)
self._os_ops = os_ops
self._guard = threading.Lock()
self._available_ports: typing.Set[int] = set(range(1024, 65535))
self._reserved_ports: typing.Set[int] = set()

self._available_ports = set(
range(__class__.C_MIN_PORT_NUMBER, __class__.C_MAX_PORT_NUMBER + 1)
)
assert len(self._available_ports) == (
(__class__.C_MAX_PORT_NUMBER - __class__.C_MIN_PORT_NUMBER) + 1
)

self._reserved_ports = dict()
self._lock_dir = None

def reserve_port(self) -> int:
assert self._guard is not None
assert type(self._available_ports) == set # noqa: E721t
assert type(self._reserved_ports) == set # noqa: E721
assert type(self._reserved_ports) == dict # noqa: E721
assert isinstance(self._os_ops, OsOperations)

with self._guard:
if self._lock_dir is None:
temp_dir = self._os_ops.get_tempdir()
assert type(temp_dir) == str # noqa: E721
lock_dir = os.path.join(temp_dir, consts.TMP_TESTGRES_PORTS)
assert type(lock_dir) == str # noqa: E721
self._os_ops.makedirs(lock_dir)
self._lock_dir = lock_dir

assert self._lock_dir is not None
assert type(self._lock_dir) == str # noqa: E721

t = tuple(self._available_ports)
assert len(t) == len(self._available_ports)
sampled_ports = random.sample(t, min(len(t), 100))
t = None

for port in sampled_ports:
assert type(port) == int # noqa: E721
assert not (port in self._reserved_ports)
assert port in self._available_ports

assert port >= __class__.C_MIN_PORT_NUMBER
assert port <= __class__.C_MAX_PORT_NUMBER

if not self._os_ops.is_port_free(port):
continue

self._reserved_ports.add(port)
self._available_ports.discard(port)
try:
lock_path = self.helper__make_lock_path(port)
lock_obj = self._os_ops.create_lock_fs_obj(lock_path) # raise
except: # noqa: 722
continue

assert isinstance(lock_obj, OsLockObj)
assert self._os_ops.path_exists(lock_path)

try:
self._reserved_ports[port] = lock_obj
except: # noqa: 722
assert not (port in self._reserved_ports)
lock_obj.release()
raise

assert port in self._reserved_ports
self._available_ports.discard(port)
assert not (port in self._available_ports)
__class__.helper__send_debug_msg("Port {} is reserved.".format(port))
return port

raise PortForException("Can't select a port.")

def release_port(self, number: int) -> None:
assert type(number) == int # noqa: E721
assert number >= __class__.C_MIN_PORT_NUMBER
assert number <= __class__.C_MAX_PORT_NUMBER

assert self._guard is not None
assert type(self._reserved_ports) == set # noqa: E721
assert type(self._reserved_ports) == dict # noqa: E721

with self._guard:
assert number in self._reserved_ports
assert not (number in self._available_ports)
self._available_ports.add(number)
self._reserved_ports.discard(number)
lock_obj = self._reserved_ports.pop(number)
assert not (number in self._reserved_ports)
assert number in self._available_ports
assert isinstance(lock_obj, OsLockObj)
lock_obj.release()

__class__.helper__send_debug_msg("Port {} is released.", number)
return

@staticmethod
def helper__send_debug_msg(msg_template: str, *args) -> None:
assert msg_template is not None
assert str is not None
assert type(msg_template) == str # noqa: E721
assert type(args) == tuple # noqa: E721
assert msg_template != ""
s = "[port manager] "
s += msg_template.format(*args)
logging.debug(s)

def helper__make_lock_path(self, port_number: int) -> str:
assert type(port_number) == int # noqa: E721
# You have to call the reserve_port at first!
assert type(self._lock_dir) == str # noqa: E721

result = os.path.join(self._lock_dir, str(port_number) + ".lock")
assert type(result) == str # noqa: E721
return result
22 changes: 22 additions & 0 deletions testgres/operations/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ..exceptions import ExecUtilException
from ..exceptions import InvalidOperationException
from .os_ops import ConnectionParams, OsOperations, get_default_encoding
from .os_ops import OsLockObj
from .raise_error import RaiseError
from .helpers import Helpers

Expand All @@ -28,6 +29,23 @@
CMD_TIMEOUT_SEC = 60


class LocalOsLockFsObj(OsLockObj):
_path: str

def __init__(self, path: str):
assert type(path) == str # noqa: str
self._path = path
os.mkdir(path) # throw
assert os.path.exists(path)
self._path = path

def release(self) -> None:
assert type(self._path) == str # noqa: str
assert os.path.exists(self._path)
os.rmdir(self._path)
self._path = None


class LocalOperations(OsOperations):
sm_single_instance: OsOperations = None
sm_single_instance_guard = threading.Lock()
Expand Down Expand Up @@ -535,3 +553,7 @@ def get_tempdir(self) -> str:
assert type(r) == str # noqa: E721
assert os.path.exists(r)
return r

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
return LocalOsLockFsObj(path)
9 changes: 9 additions & 0 deletions testgres/operations/os_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def get_default_encoding():
return locale.getencoding() or 'UTF-8'


class OsLockObj:
def release(self) -> None:
raise NotImplementedError()


class OsOperations:
def __init__(self, username=None):
self.ssh_key = None
Expand Down Expand Up @@ -133,3 +138,7 @@ def is_port_free(self, number: int):

def get_tempdir(self) -> str:
raise NotImplementedError()

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
raise NotImplementedError()
32 changes: 32 additions & 0 deletions testgres/operations/remote_ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import getpass
import os
import platform
Expand All @@ -10,6 +12,7 @@
from ..exceptions import ExecUtilException
from ..exceptions import InvalidOperationException
from .os_ops import OsOperations, ConnectionParams, get_default_encoding
from .os_ops import OsLockObj
from .raise_error import RaiseError
from .helpers import Helpers

Expand Down Expand Up @@ -40,6 +43,31 @@ def cmdline(self):
return cmdline.split()


class RemoteOsLockFsObj(OsLockObj):
_os_ops: RemoteOperations
_path: str

def __init__(self, os_ops: RemoteOperations, path: str):
assert isinstance(os_ops, RemoteOperations)
assert type(path) == str # noqa: str

os_ops.makedir(path) # throw
assert os_ops.path_exists(path)

self._os_ops = os_ops
self._path = path

def release(self) -> None:
assert type(self._path) == str # noqa: str
assert isinstance(self._os_ops, RemoteOperations)
assert self._os_ops.path_exists(self._path)

self._os_ops.rmdir(self._path) # throw

self._path = None
self._os_ops = None


class RemoteOperations(OsOperations):
def __init__(self, conn_params: ConnectionParams):
if not platform.system().lower() == "linux":
Expand Down Expand Up @@ -687,6 +715,10 @@ def get_tempdir(self) -> str:
assert type(temp_dir) == str # noqa: E721
return temp_dir

def create_lock_fs_obj(self, path: str) -> OsLockObj:
assert type(path) == str # noqa: E721
return RemoteOsLockFsObj(self, path)

@staticmethod
def _is_port_free__process_0(error: str) -> bool:
assert type(error) == str # noqa: E721
Expand Down
5 changes: 1 addition & 4 deletions testgres/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
#
_old_port_manager = PortManager__Generic(LocalOperations.get_single_instance())

# ports used by nodes
bound_ports = _old_port_manager._reserved_ports


# re-export version type
class PgVer(Version):
Expand All @@ -46,7 +43,7 @@ def __init__(self, version: str) -> None:

def internal__reserve_port():
"""
Generate a new port and add it to 'bound_ports'.
Reserve a port.
"""
return _old_port_manager.reserve_port()

Expand Down
35 changes: 35 additions & 0 deletions tests/test_os_ops_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from testgres import InvalidOperationException
from testgres import ExecUtilException
from testgres.operations.os_ops import OsLockObj

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import Future as ThreadFuture
Expand Down Expand Up @@ -1113,3 +1114,37 @@ class tadWorkerData:

logging.info("Test is finished! Total error count is {}.".format(nErrors))
return

def test_create_lock_fs_obj(self, os_ops: OsOperations):
assert isinstance(os_ops, OsOperations)

tmp = os_ops.mkdtemp()
assert type(tmp) == str # noqa: E721
assert os_ops.path_exists(tmp)
logging.info("tmp dir is [{}]".format(tmp))

p1 = os.path.join(tmp, "a.lock")
obj1 = os_ops.create_lock_fs_obj(p1)
assert obj1 is not None
assert isinstance(obj1, OsLockObj)
assert os_ops.path_exists(tmp)
assert os_ops.path_exists(p1)

while True:
try:
os_ops.create_lock_fs_obj(p1)
except Exception as e:
logging.info("OK. We got the error ({}): {}".format(type(e).__name__, e))
break
raise Exception("We wait the exception!")

assert isinstance(obj1, OsLockObj)
assert os_ops.path_exists(tmp)
assert os_ops.path_exists(p1)

obj1.release()
assert not os_ops.path_exists(p1)

assert os_ops.path_exists(tmp)
os_ops.rmdir(tmp)
assert not os_ops.path_exists(tmp)
Loading