improve deadlock detection for various versions of psycopg2

Psycopg2 has changed the kind of exception that is emitted on
deadlocks between versions 2.7 and 2.8. The code was already
trying to catch both kind of errors but because the
psycopg2.errors package is unknown in 2.7 and below, the
code would throw an exception on anything but a deadlock error.

This commit wraps the deadlock handling into a context manager
to avoid code duplication and uses module imports to detect if
the new error codes are available.

Also sets the required psycopg2 version to 2.7 or bigger as
versions below are difficult to test.
This commit is contained in:
Sarah Hoffmann
2021-02-25 17:36:31 +01:00
parent 72b01148d2
commit a1f0fc1a10
3 changed files with 152 additions and 23 deletions

View File

@@ -9,8 +9,41 @@ import logging
import psycopg2
from psycopg2.extras import wait_select
# psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
# module is available and adapt the error handling accordingly.
try:
import psycopg2.errors # pylint: disable=no-name-in-module,import-error
__has_psycopg2_errors__ = True
except ModuleNotFoundError:
__has_psycopg2_errors__ = False
LOG = logging.getLogger()
class DeadlockHandler:
""" Context manager that catches deadlock exceptions and calls
the given handler function. All other exceptions are passed on
normally.
"""
def __init__(self, handler):
self.handler = handler
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
if __has_psycopg2_errors__:
if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.handler()
return True
else:
if exc_type == psycopg2.extensions.TransactionRollbackError:
if exc_value.pgcode == '40P01':
self.handler()
return True
return False
class DBConnection:
""" A single non-blocking database connection.
"""
@@ -24,15 +57,22 @@ class DBConnection:
self.cursor = None
self.connect()
def close(self):
""" Close all open connections. Does not wait for pending requests.
"""
if self.conn is not None:
self.cursor.close()
self.conn.close()
self.conn = None
def connect(self):
""" (Re)connect to the database. Creates an asynchronous connection
with JIT and parallel processing disabled. If a connection was
already open, it is closed and a new connection established.
The caller must ensure that no query is pending before reconnecting.
"""
if self.conn is not None:
self.cursor.close()
self.conn.close()
self.close()
# Use a dict to hand in the parameters because async is a reserved
# word in Python3.
@@ -50,23 +90,18 @@ class DBConnection:
WHERE name = 'max_parallel_workers_per_gather';""")
self.wait()
def _deadlock_handler(self):
LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
def wait(self):
""" Block until any pending operation is done.
"""
while True:
try:
with DeadlockHandler(self._deadlock_handler):
wait_select(self.conn)
self.current_query = None
return
except psycopg2.extensions.TransactionRollbackError as error:
if error.pgcode == '40P01':
LOG.info("Deadlock detected (params = %s), retry.",
str(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.cursor.execute(self.current_query, self.current_params)
def perform(self, sql, args=None):
""" Send SQL query to the server. Returns immediately without
@@ -90,17 +125,9 @@ class DBConnection:
if self.current_query is None:
return True
try:
with DeadlockHandler(self._deadlock_handler):
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
except psycopg2.extensions.TransactionRollbackError as error:
if error.pgcode == '40P01':
LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.cursor.execute(self.current_query, self.current_params)
return False