Skip to content

Commit 0a69b97

Browse files
committed
Refactor session handling in networking-mlnx
Due upstream changes in neutron to provide compatibility with SQLAlchemy 2.0. [1].[2], and because session.autocommit=False will be the only value accepted, refactor all session handling using context manager [1] https://opendev.org/openstack/neutron-lib/commit/ef1e8abb89141db39e62eaa1cbdb73aca1ceb647 [2] https://opendev.org/openstack/neutron/commit/88fb5416f7169bd3542bca6af0192f935c8cc18e Change-Id: I8e020a14b681d6e655b225ab764e332e59c13de7
1 parent 6427035 commit 0a69b97

File tree

8 files changed

+237
-212
lines changed

8 files changed

+237
-212
lines changed

networking_mlnx/db/db.py

Lines changed: 95 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
2727

2828

29-
def check_for_pending_or_processing_ops(session, object_uuid, operation=None):
30-
q = session.query(sdn_journal_db.SdnJournal).filter(
29+
@db_api.CONTEXT_READER
30+
def check_for_pending_or_processing_ops(context, object_uuid, operation=None):
31+
q = context.session.query(sdn_journal_db.SdnJournal).filter(
3132
or_(sdn_journal_db.SdnJournal.state == sdn_const.PENDING,
3233
sdn_journal_db.SdnJournal.state == sdn_const.PROCESSING),
3334
sdn_journal_db.SdnJournal.object_uuid == object_uuid)
@@ -36,11 +37,12 @@ def check_for_pending_or_processing_ops(session, object_uuid, operation=None):
3637
q = q.filter(sdn_journal_db.SdnJournal.operation.in_(operation))
3738
else:
3839
q = q.filter(sdn_journal_db.SdnJournal.operation == operation)
39-
return session.query(q.exists()).scalar()
40+
return context.session.query(q.exists()).scalar()
4041

4142

42-
def check_for_pending_delete_ops_with_parent(session, object_type, parent_id):
43-
rows = session.query(sdn_journal_db.SdnJournal).filter(
43+
@db_api.CONTEXT_READER
44+
def check_for_pending_delete_ops_with_parent(context, object_type, parent_id):
45+
rows = context.session.query(sdn_journal_db.SdnJournal).filter(
4446
or_(sdn_journal_db.SdnJournal.state == sdn_const.PENDING,
4547
sdn_journal_db.SdnJournal.state == sdn_const.PROCESSING),
4648
sdn_journal_db.SdnJournal.object_type == object_type,
@@ -54,123 +56,137 @@ def check_for_pending_delete_ops_with_parent(session, object_type, parent_id):
5456
return False
5557

5658

57-
def check_for_older_ops(session, row):
58-
q = session.query(sdn_journal_db.SdnJournal).filter(
59+
@db_api.CONTEXT_READER
60+
def check_for_older_ops(context, row):
61+
q = context.session.query(sdn_journal_db.SdnJournal).filter(
5962
or_(sdn_journal_db.SdnJournal.state == sdn_const.PENDING,
6063
sdn_journal_db.SdnJournal.state == sdn_const.PROCESSING),
6164
sdn_journal_db.SdnJournal.object_uuid == row.object_uuid,
6265
sdn_journal_db.SdnJournal.created_at < row.created_at,
6366
sdn_journal_db.SdnJournal.id != row.id)
64-
return session.query(q.exists()).scalar()
67+
return context.session.query(q.exists()).scalar()
6568

6669

67-
def get_all_db_rows(session):
68-
return session.query(sdn_journal_db.SdnJournal).all()
70+
@db_api.CONTEXT_READER
71+
def get_all_db_rows(context):
72+
return context.session.query(sdn_journal_db.SdnJournal).all()
6973

7074

71-
def get_all_db_rows_by_state(session, state):
72-
return session.query(sdn_journal_db.SdnJournal).filter_by(
75+
@db_api.CONTEXT_READER
76+
def get_all_db_rows_by_state(context, state):
77+
return context.session.query(sdn_journal_db.SdnJournal).filter_by(
7378
state=state).all()
7479

7580

81+
def _get_row_with_lock(session):
82+
row = session.query(sdn_journal_db.SdnJournal).filter_by(
83+
state=sdn_const.PENDING).order_by(
84+
asc(sdn_journal_db.SdnJournal.last_retried)).with_for_update(
85+
).first()
86+
return row
87+
88+
7689
# Retry deadlock exception for Galera DB.
7790
# If two (or more) different threads call this method at the same time, they
7891
# might both succeed in changing the same row to pending, but at least one
7992
# of them will get a deadlock from Galera and will have to retry the operation.
8093
@db_api.retry_db_errors
81-
def get_oldest_pending_db_row_with_lock(session):
82-
with session.begin():
83-
row = session.query(sdn_journal_db.SdnJournal).filter_by(
84-
state=sdn_const.PENDING).order_by(
85-
asc(sdn_journal_db.SdnJournal.last_retried)).with_for_update(
86-
).first()
87-
if row:
88-
update_db_row_state(session, row, sdn_const.PROCESSING)
89-
94+
@db_api.CONTEXT_WRITER
95+
def get_oldest_pending_db_row_with_lock(context):
96+
row = _get_row_with_lock(context.session)
97+
if row:
98+
_update_db_row_state(context.session, row, sdn_const.PROCESSING)
9099
return row
91100

92101

93102
@db_api.retry_db_errors
94-
def get_all_monitoring_db_row_by_oldest(session):
95-
with session.begin():
96-
rows = session.query(sdn_journal_db.SdnJournal).filter_by(
97-
state=sdn_const.MONITORING).order_by(
98-
asc(sdn_journal_db.SdnJournal.last_retried)).all()
103+
@db_api.CONTEXT_READER
104+
def get_all_monitoring_db_row_by_oldest(context):
105+
rows = context.session.query(sdn_journal_db.SdnJournal).filter_by(
106+
state=sdn_const.MONITORING).order_by(
107+
asc(sdn_journal_db.SdnJournal.last_retried)).all()
99108
return rows
100109

101110

102111
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
103-
def update_db_row_state(session, row, state):
112+
@db_api.CONTEXT_WRITER
113+
def update_db_row_state(context, row, state):
114+
_update_db_row_state(context.session, row, state)
115+
116+
117+
def _update_db_row_state(session, row, state):
104118
row.state = state
105119
session.merge(row)
106-
session.flush()
107120

108121

109122
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
110-
def update_db_row_job_id(session, row, job_id):
123+
@db_api.CONTEXT_WRITER
124+
def update_db_row_job_id(context, row, job_id):
111125
row.job_id = job_id
112-
session.merge(row)
113-
session.flush()
126+
context.session.merge(row)
114127

115128

116-
def update_pending_db_row_retry(session, row, retry_count):
129+
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
130+
@db_api.CONTEXT_WRITER
131+
def update_pending_db_row_retry(context, row, retry_count):
117132
if row.retry_count >= retry_count and retry_count != -1:
118-
update_db_row_state(session, row, sdn_const.FAILED)
133+
_update_db_row_state(context.session, row, sdn_const.FAILED)
119134
else:
120135
row.retry_count += 1
121-
update_db_row_state(session, row, sdn_const.PENDING)
136+
_update_db_row_state(context.session, row, sdn_const.PENDING)
122137

123138

124139
# This function is currently not used.
125140
# Deleted resources are marked as 'deleted' in the database.
126141
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
127-
def delete_row(session, row=None, row_id=None):
142+
@db_api.CONTEXT_WRITER
143+
def delete_row(context, row=None, row_id=None):
128144
if row_id:
129-
row = session.query(sdn_journal_db.SdnJournal).filter_by(
145+
row = context.session.query(sdn_journal_db.SdnJournal).filter_by(
130146
id=row_id).one()
131147
if row:
132-
session.delete(row)
133-
session.flush()
148+
context.session.delete(row)
134149

135150

136151
@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES)
137-
def create_pending_row(session, object_type, object_uuid,
152+
@db_api.CONTEXT_WRITER
153+
def create_pending_row(context, object_type, object_uuid,
138154
operation, data):
139155
data = jsonutils.dumps(data)
140156
row = sdn_journal_db.SdnJournal(object_type=object_type,
141157
object_uuid=object_uuid,
142158
operation=operation, data=data,
143159
created_at=func.now(),
144160
state=sdn_const.PENDING)
145-
session.add(row)
146-
# Keep session flush for unit tests. NOOP for L2/L3 events since calls are
147-
# made inside database session transaction with subtransactions=True.
148-
session.flush()
161+
context.session.add(row)
149162

150163

151-
@db_api.retry_db_errors
152164
def _update_maintenance_state(session, expected_state, state):
153-
with session.begin():
154-
row = session.query(sdn_maintenance_db.SdnMaintenance).filter_by(
155-
state=expected_state).with_for_update().one_or_none()
156-
if row is None:
157-
return False
165+
row = session.query(sdn_maintenance_db.SdnMaintenance).filter_by(
166+
state=expected_state).with_for_update().one_or_none()
167+
if row is None:
168+
return False
158169

159-
row.state = state
160-
return True
170+
row.state = state
171+
return True
161172

162173

163-
def lock_maintenance(session):
164-
return _update_maintenance_state(session, sdn_const.PENDING,
174+
@db_api.retry_db_errors
175+
@db_api.CONTEXT_WRITER
176+
def lock_maintenance(context):
177+
return _update_maintenance_state(context.session, sdn_const.PENDING,
165178
sdn_const.PROCESSING)
166179

167180

168-
def unlock_maintenance(session):
169-
return _update_maintenance_state(session, sdn_const.PROCESSING,
181+
@db_api.retry_db_errors
182+
@db_api.CONTEXT_WRITER
183+
def unlock_maintenance(context):
184+
return _update_maintenance_state(context.session, sdn_const.PROCESSING,
170185
sdn_const.PENDING)
171186

172187

173-
def update_maintenance_operation(session, operation=None):
188+
@db_api.CONTEXT_WRITER
189+
def update_maintenance_operation(context, operation=None):
174190
"""Update the current maintenance operation details.
175191
176192
The function assumes the lock is held, so it mustn't be run outside of a
@@ -180,28 +196,28 @@ def update_maintenance_operation(session, operation=None):
180196
if operation:
181197
op_text = operation.__name__
182198

183-
with session.begin():
184-
row = session.query(sdn_maintenance_db.SdnMaintenance).one_or_none()
185-
row.processing_operation = op_text
186-
187-
188-
def delete_rows_by_state_and_time(session, state, time_delta):
189-
with session.begin():
190-
now = session.execute(func.now()).scalar()
191-
session.query(sdn_journal_db.SdnJournal).filter(
192-
sdn_journal_db.SdnJournal.state == state,
193-
sdn_journal_db.SdnJournal.last_retried < now - time_delta).delete(
194-
synchronize_session=False)
195-
session.expire_all()
196-
197-
198-
def reset_processing_rows(session, max_timedelta):
199-
with session.begin():
200-
now = session.execute(func.now()).scalar()
201-
max_timedelta = datetime.timedelta(seconds=max_timedelta)
202-
rows = session.query(sdn_journal_db.SdnJournal).filter(
203-
sdn_journal_db.SdnJournal.last_retried < now - max_timedelta,
204-
sdn_journal_db.SdnJournal.state == sdn_const.PROCESSING,
205-
).update({'state': sdn_const.PENDING})
199+
row = context.session.query(
200+
sdn_maintenance_db.SdnMaintenance).one_or_none()
201+
row.processing_operation = op_text
202+
203+
204+
@db_api.CONTEXT_WRITER
205+
def delete_rows_by_state_and_time(context, state, time_delta):
206+
now = context.session.execute(func.now()).scalar()
207+
context.session.query(sdn_journal_db.SdnJournal).filter(
208+
sdn_journal_db.SdnJournal.state == state,
209+
sdn_journal_db.SdnJournal.last_retried < now - time_delta).delete(
210+
synchronize_session=False)
211+
context.session.expire_all()
212+
213+
214+
@db_api.CONTEXT_WRITER
215+
def reset_processing_rows(context, max_timedelta):
216+
now = context.session.execute(func.now()).scalar()
217+
max_timedelta = datetime.timedelta(seconds=max_timedelta)
218+
rows = context.session.query(sdn_journal_db.SdnJournal).filter(
219+
sdn_journal_db.SdnJournal.last_retried < now - max_timedelta,
220+
sdn_journal_db.SdnJournal.state == sdn_const.PROCESSING,
221+
).update({'state': sdn_const.PENDING})
206222

207223
return rows

networking_mlnx/journal/dependency_validations.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
2020

2121

22-
def _is_valid_operation(session, row):
22+
def _is_valid_operation(context, row):
2323
# Check if there are older updates in the queue
24-
if db.check_for_older_ops(session, row):
24+
if db.check_for_older_ops(context, row):
2525
return False
2626
return True
2727

2828

29-
def validate_network_operation(session, row):
29+
def validate_network_operation(context, row):
3030
"""Validate the network operation based on dependencies.
3131
3232
Validate network operation depending on whether it's dependencies
@@ -36,19 +36,19 @@ def validate_network_operation(session, row):
3636
# Check for any pending or processing create or update
3737
# ops on this uuid itself
3838
if db.check_for_pending_or_processing_ops(
39-
session, row.object_uuid, [sdn_const.PUT,
39+
context, row.object_uuid, [sdn_const.PUT,
4040
sdn_const.POST]):
4141
return False
4242
if db.check_for_pending_delete_ops_with_parent(
43-
session, sdn_const.PORT, row.object_uuid):
43+
context, sdn_const.PORT, row.object_uuid):
4444
return False
4545
elif (row.operation == sdn_const.PUT and
46-
not _is_valid_operation(session, row)):
46+
not _is_valid_operation(context, row)):
4747
return False
4848
return True
4949

5050

51-
def validate_port_operation(session, row):
51+
def validate_port_operation(context, row):
5252
"""Validate port operation based on dependencies.
5353
5454
Validate port operation depending on whether it's dependencies
@@ -59,10 +59,10 @@ def validate_port_operation(session, row):
5959
network_id = network_dict['network_id']
6060
# Check for pending or processing network operations
6161
ops = db.check_for_pending_or_processing_ops(
62-
session, network_id, [sdn_const.POST])
62+
context, network_id, [sdn_const.POST])
6363
if ops:
6464
return False
65-
return _is_valid_operation(session, row)
65+
return _is_valid_operation(context, row)
6666

6767

6868
_VALIDATION_MAP = {
@@ -71,13 +71,13 @@ def validate_port_operation(session, row):
7171
}
7272

7373

74-
def validate(session, row):
74+
def validate(context, row):
7575
"""Validate resource dependency in journaled operations.
7676
77-
:param session: db session
77+
:param context: context manager
7878
:param row: entry in journal entry to be validated
7979
"""
80-
return _VALIDATION_MAP[row.object_type](session, row)
80+
return _VALIDATION_MAP[row.object_type](context, row)
8181

8282

8383
def register_validator(object_type, validator):

0 commit comments

Comments
 (0)