forked from hans/Nominatim
organise python tests in subdirectories
The directories follow the same structure as the modules in nominatim/.
This commit is contained in:
107
test/python/db/test_async_connection.py
Normal file
107
test/python/db/test_async_connection.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""
|
||||
Tests for function providing a non-blocking query interface towards PostgreSQL.
|
||||
"""
|
||||
from contextlib import closing
|
||||
import concurrent.futures
|
||||
|
||||
import pytest
|
||||
import psycopg2
|
||||
|
||||
from nominatim.db.async_connection import DBConnection, DeadlockHandler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def conn(temp_db):
|
||||
with closing(DBConnection('dbname=' + temp_db)) as connection:
|
||||
yield connection
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def simple_conns(temp_db):
|
||||
conn1 = psycopg2.connect('dbname=' + temp_db)
|
||||
conn2 = psycopg2.connect('dbname=' + temp_db)
|
||||
|
||||
yield conn1.cursor(), conn2.cursor()
|
||||
|
||||
conn1.close()
|
||||
conn2.close()
|
||||
|
||||
|
||||
def test_simple_query(conn, temp_db_conn):
|
||||
conn.connect()
|
||||
|
||||
conn.perform('CREATE TABLE foo (id INT)')
|
||||
conn.wait()
|
||||
|
||||
temp_db_conn.table_exists('foo')
|
||||
|
||||
|
||||
def test_wait_for_query(conn):
|
||||
conn.connect()
|
||||
|
||||
conn.perform('SELECT pg_sleep(1)')
|
||||
|
||||
assert not conn.is_done()
|
||||
|
||||
conn.wait()
|
||||
|
||||
|
||||
def test_bad_query(conn):
|
||||
conn.connect()
|
||||
|
||||
conn.perform('SELECT efasfjsea')
|
||||
|
||||
with pytest.raises(psycopg2.ProgrammingError):
|
||||
conn.wait()
|
||||
|
||||
|
||||
def test_bad_query_ignore(temp_db):
|
||||
with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
|
||||
conn.connect()
|
||||
|
||||
conn.perform('SELECT efasfjsea')
|
||||
|
||||
conn.wait()
|
||||
|
||||
|
||||
def exec_with_deadlock(cur, sql, detector):
|
||||
with DeadlockHandler(lambda *args: detector.append(1)):
|
||||
cur.execute(sql)
|
||||
|
||||
|
||||
def test_deadlock(simple_conns):
|
||||
cur1, cur2 = simple_conns
|
||||
|
||||
cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
|
||||
INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
|
||||
cur1.connection.commit()
|
||||
|
||||
cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
|
||||
cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
|
||||
|
||||
# This is the tricky part of the test. The first SQL command runs into
|
||||
# a lock and blocks, so we have to run it in a separate thread. When the
|
||||
# second deadlocking SQL statement is issued, Postgresql will abort one of
|
||||
# the two transactions that cause the deadlock. There is no way to tell
|
||||
# which one of the two. Therefore wrap both in a DeadlockHandler and
|
||||
# expect that exactly one of the two triggers.
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
deadlock_check = []
|
||||
try:
|
||||
future = executor.submit(exec_with_deadlock, cur2,
|
||||
"UPDATE t1 SET t = 'y' WHERE id = 1",
|
||||
deadlock_check)
|
||||
|
||||
while not future.running():
|
||||
pass
|
||||
|
||||
|
||||
exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
|
||||
deadlock_check)
|
||||
finally:
|
||||
# Whatever happens, make sure the deadlock gets resolved.
|
||||
cur1.connection.rollback()
|
||||
|
||||
future.result()
|
||||
|
||||
assert len(deadlock_check) == 1
|
||||
105
test/python/db/test_connection.py
Normal file
105
test/python/db/test_connection.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""
|
||||
Tests for specialised conenction and cursor classes.
|
||||
"""
|
||||
import pytest
|
||||
import psycopg2
|
||||
|
||||
from nominatim.db.connection import connect, get_pg_env
|
||||
|
||||
@pytest.fixture
|
||||
def db(dsn):
|
||||
with connect(dsn) as conn:
|
||||
yield conn
|
||||
|
||||
|
||||
def test_connection_table_exists(db, table_factory):
|
||||
assert not db.table_exists('foobar')
|
||||
|
||||
table_factory('foobar')
|
||||
|
||||
assert db.table_exists('foobar')
|
||||
|
||||
|
||||
def test_connection_index_exists(db, table_factory, temp_db_cursor):
|
||||
assert not db.index_exists('some_index')
|
||||
|
||||
table_factory('foobar')
|
||||
temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
|
||||
|
||||
assert db.index_exists('some_index')
|
||||
assert db.index_exists('some_index', table='foobar')
|
||||
assert not db.index_exists('some_index', table='bar')
|
||||
|
||||
|
||||
def test_drop_table_existing(db, table_factory):
|
||||
table_factory('dummy')
|
||||
assert db.table_exists('dummy')
|
||||
|
||||
db.drop_table('dummy')
|
||||
assert not db.table_exists('dummy')
|
||||
|
||||
|
||||
def test_drop_table_non_existsing(db):
|
||||
db.drop_table('dfkjgjriogjigjgjrdghehtre')
|
||||
|
||||
|
||||
def test_drop_table_non_existing_force(db):
|
||||
with pytest.raises(psycopg2.ProgrammingError, match='.*does not exist.*'):
|
||||
db.drop_table('dfkjgjriogjigjgjrdghehtre', if_exists=False)
|
||||
|
||||
def test_connection_server_version_tuple(db):
|
||||
ver = db.server_version_tuple()
|
||||
|
||||
assert isinstance(ver, tuple)
|
||||
assert len(ver) == 2
|
||||
assert ver[0] > 8
|
||||
|
||||
|
||||
def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
|
||||
ver = db.postgis_version_tuple()
|
||||
|
||||
assert isinstance(ver, tuple)
|
||||
assert len(ver) == 2
|
||||
assert ver[0] >= 2
|
||||
|
||||
|
||||
def test_cursor_scalar(db, table_factory):
|
||||
table_factory('dummy')
|
||||
|
||||
with db.cursor() as cur:
|
||||
assert cur.scalar('SELECT count(*) FROM dummy') == 0
|
||||
|
||||
|
||||
def test_cursor_scalar_many_rows(db):
|
||||
with db.cursor() as cur:
|
||||
with pytest.raises(RuntimeError):
|
||||
cur.scalar('SELECT * FROM pg_tables')
|
||||
|
||||
|
||||
def test_cursor_scalar_no_rows(db, table_factory):
|
||||
table_factory('dummy')
|
||||
|
||||
with db.cursor() as cur:
|
||||
with pytest.raises(RuntimeError):
|
||||
cur.scalar('SELECT id FROM dummy')
|
||||
|
||||
|
||||
def test_get_pg_env_add_variable(monkeypatch):
|
||||
monkeypatch.delenv('PGPASSWORD', raising=False)
|
||||
env = get_pg_env('user=fooF')
|
||||
|
||||
assert env['PGUSER'] == 'fooF'
|
||||
assert 'PGPASSWORD' not in env
|
||||
|
||||
|
||||
def test_get_pg_env_overwrite_variable(monkeypatch):
|
||||
monkeypatch.setenv('PGUSER', 'some default')
|
||||
env = get_pg_env('user=overwriter')
|
||||
|
||||
assert env['PGUSER'] == 'overwriter'
|
||||
|
||||
|
||||
def test_get_pg_env_ignore_unknown():
|
||||
env = get_pg_env('client_encoding=stuff', base_env={})
|
||||
|
||||
assert env == {}
|
||||
41
test/python/db/test_properties.py
Normal file
41
test/python/db/test_properties.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
Tests for property table manpulation.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from nominatim.db import properties
|
||||
|
||||
@pytest.fixture
|
||||
def property_factory(property_table, temp_db_cursor):
|
||||
""" A function fixture that adds a property into the property table.
|
||||
"""
|
||||
def _add_property(name, value):
|
||||
temp_db_cursor.execute("INSERT INTO nominatim_properties VALUES(%s, %s)",
|
||||
(name, value))
|
||||
|
||||
return _add_property
|
||||
|
||||
|
||||
def test_get_property_existing(property_factory, temp_db_conn):
|
||||
property_factory('foo', 'bar')
|
||||
|
||||
assert properties.get_property(temp_db_conn, 'foo') == 'bar'
|
||||
|
||||
|
||||
def test_get_property_unknown(property_factory, temp_db_conn):
|
||||
property_factory('other', 'bar')
|
||||
|
||||
assert properties.get_property(temp_db_conn, 'foo') is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("prefill", (True, False))
|
||||
def test_set_property_new(property_factory, temp_db_conn, temp_db_cursor, prefill):
|
||||
if prefill:
|
||||
property_factory('something', 'bar')
|
||||
|
||||
properties.set_property(temp_db_conn, 'something', 'else')
|
||||
|
||||
assert temp_db_cursor.scalar("""SELECT value FROM nominatim_properties
|
||||
WHERE property = 'something'""") == 'else'
|
||||
|
||||
assert properties.get_property(temp_db_conn, 'something') == 'else'
|
||||
50
test/python/db/test_sql_preprocessor.py
Normal file
50
test/python/db/test_sql_preprocessor.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""
|
||||
Tests for SQL preprocessing.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from nominatim.db.sql_preprocessor import SQLPreprocessor
|
||||
|
||||
@pytest.fixture
|
||||
def sql_factory(tmp_path):
|
||||
def _mk_sql(sql_body):
|
||||
(tmp_path / 'test.sql').write_text("""
|
||||
CREATE OR REPLACE FUNCTION test() RETURNS TEXT
|
||||
AS $$
|
||||
BEGIN
|
||||
{}
|
||||
END;
|
||||
$$ LANGUAGE plpgsql IMMUTABLE;""".format(sql_body))
|
||||
return 'test.sql'
|
||||
|
||||
return _mk_sql
|
||||
|
||||
@pytest.mark.parametrize("expr,ret", [
|
||||
("'a'", 'a'),
|
||||
("'{{db.partitions|join}}'", '012'),
|
||||
("{% if 'country_name' in db.tables %}'yes'{% else %}'no'{% endif %}", "yes"),
|
||||
("{% if 'xxx' in db.tables %}'yes'{% else %}'no'{% endif %}", "no"),
|
||||
("'{{db.tablespace.address_data}}'", ""),
|
||||
("'{{db.tablespace.search_data}}'", 'TABLESPACE "dsearch"'),
|
||||
("'{{db.tablespace.address_index}}'", 'TABLESPACE "iaddress"'),
|
||||
("'{{db.tablespace.aux_data}}'", 'TABLESPACE "daux"')
|
||||
])
|
||||
def test_load_file_simple(sql_preprocessor_cfg, sql_factory,
|
||||
temp_db_conn, temp_db_cursor, monkeypatch,
|
||||
expr, ret):
|
||||
monkeypatch.setenv('NOMINATIM_TABLESPACE_SEARCH_DATA', 'dsearch')
|
||||
monkeypatch.setenv('NOMINATIM_TABLESPACE_ADDRESS_INDEX', 'iaddress')
|
||||
monkeypatch.setenv('NOMINATIM_TABLESPACE_AUX_DATA', 'daux')
|
||||
sqlfile = sql_factory("RETURN {};".format(expr))
|
||||
|
||||
SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg).run_sql_file(temp_db_conn, sqlfile)
|
||||
|
||||
assert temp_db_cursor.scalar('SELECT test()') == ret
|
||||
|
||||
|
||||
def test_load_file_with_params(sql_preprocessor, sql_factory, temp_db_conn, temp_db_cursor):
|
||||
sqlfile = sql_factory("RETURN '{{ foo }} {{ bar }}';")
|
||||
|
||||
sql_preprocessor.run_sql_file(temp_db_conn, sqlfile, bar='XX', foo='ZZ')
|
||||
|
||||
assert temp_db_cursor.scalar('SELECT test()') == 'ZZ XX'
|
||||
135
test/python/db/test_status.py
Normal file
135
test/python/db/test_status.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""
|
||||
Tests for status table manipulation.
|
||||
"""
|
||||
import datetime as dt
|
||||
|
||||
import pytest
|
||||
|
||||
import nominatim.db.status
|
||||
from nominatim.errors import UsageError
|
||||
|
||||
OSM_NODE_DATA = """\
|
||||
<osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
|
||||
<node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
|
||||
</node>
|
||||
</osm>
|
||||
"""
|
||||
|
||||
def iso_date(date):
|
||||
return dt.datetime.strptime(date, nominatim.db.status.ISODATE_FORMAT)\
|
||||
.replace(tzinfo=dt.timezone.utc)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_status_table(status_table):
|
||||
pass
|
||||
|
||||
|
||||
def test_compute_database_date_place_empty(place_table, temp_db_conn):
|
||||
with pytest.raises(UsageError):
|
||||
nominatim.db.status.compute_database_date(temp_db_conn)
|
||||
|
||||
|
||||
def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
|
||||
place_row(osm_type='N', osm_id=45673)
|
||||
|
||||
requested_url = []
|
||||
def mock_url(url):
|
||||
requested_url.append(url)
|
||||
return OSM_NODE_DATA
|
||||
|
||||
monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
|
||||
|
||||
date = nominatim.db.status.compute_database_date(temp_db_conn)
|
||||
|
||||
assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
|
||||
assert date == iso_date('2006-01-27T22:09:10')
|
||||
|
||||
|
||||
def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn):
|
||||
place_row(osm_type='N', osm_id=45673)
|
||||
|
||||
requested_url = []
|
||||
def mock_url(url):
|
||||
requested_url.append(url)
|
||||
return '<osm version="0.6" generator="OpenStre'
|
||||
|
||||
monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
|
||||
|
||||
with pytest.raises(UsageError):
|
||||
nominatim.db.status.compute_database_date(temp_db_conn)
|
||||
|
||||
|
||||
def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date)
|
||||
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
|
||||
{(date, None, True)}
|
||||
|
||||
|
||||
def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date)
|
||||
|
||||
assert temp_db_cursor.table_rows('import_status') == 1
|
||||
|
||||
date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
|
||||
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
|
||||
{(date, 456, False)}
|
||||
|
||||
|
||||
def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date)
|
||||
|
||||
assert temp_db_cursor.table_rows('import_status') == 1
|
||||
|
||||
nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
|
||||
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
|
||||
{(date, 456, False)}
|
||||
|
||||
|
||||
def test_get_status_empty_table(temp_db_conn):
|
||||
assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
|
||||
|
||||
|
||||
def test_get_status_success(temp_db_conn):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
|
||||
|
||||
assert nominatim.db.status.get_status(temp_db_conn) == \
|
||||
(date, 667, False)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("old_state", [True, False])
|
||||
@pytest.mark.parametrize("new_state", [True, False])
|
||||
def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
|
||||
nominatim.db.status.set_indexed(temp_db_conn, new_state)
|
||||
|
||||
assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
|
||||
|
||||
|
||||
def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
|
||||
nominatim.db.status.set_indexed(temp_db_conn, True)
|
||||
|
||||
assert temp_db_cursor.table_rows("import_status") == 0
|
||||
|
||||
|
||||
def test_log_status(temp_db_conn, temp_db_cursor):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
start = dt.datetime.now() - dt.timedelta(hours=1)
|
||||
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
|
||||
nominatim.db.status.log_status(temp_db_conn, start, 'index')
|
||||
|
||||
temp_db_conn.commit()
|
||||
|
||||
assert temp_db_cursor.table_rows("import_osmosis_log") == 1
|
||||
assert temp_db_cursor.scalar("SELECT batchseq FROM import_osmosis_log") == 56
|
||||
assert temp_db_cursor.scalar("SELECT event FROM import_osmosis_log") == 'index'
|
||||
154
test/python/db/test_utils.py
Normal file
154
test/python/db/test_utils.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""
|
||||
Tests for DB utility functions in db.utils
|
||||
"""
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
import nominatim.db.utils as db_utils
|
||||
from nominatim.errors import UsageError
|
||||
|
||||
def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile)
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
|
||||
|
||||
def test_execute_file_bad_file(dsn, tmp_path):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
db_utils.execute_file(dsn, tmp_path / 'test2.sql')
|
||||
|
||||
|
||||
def test_execute_file_bad_sql(dsn, tmp_path):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('CREATE STABLE test (id INT)')
|
||||
|
||||
with pytest.raises(UsageError):
|
||||
db_utils.execute_file(dsn, tmpfile)
|
||||
|
||||
|
||||
def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('CREATE STABLE test (id INT)')
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
|
||||
|
||||
|
||||
def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('INSERT INTO test VALUES(4)')
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
|
||||
|
||||
|
||||
def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
|
||||
tmpfile = tmp_path / 'test.sql'
|
||||
tmpfile.write_text('CREATE TABLE test (id INT)')
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
|
||||
|
||||
assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
|
||||
|
||||
|
||||
class TestCopyBuffer:
|
||||
TABLE_NAME = 'copytable'
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_test_table(self, table_factory):
|
||||
table_factory(self.TABLE_NAME, 'colA INT, colB TEXT')
|
||||
|
||||
|
||||
def table_rows(self, cursor):
|
||||
return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
|
||||
|
||||
|
||||
def test_copybuffer_empty(self):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.copy_out(None, "dummy")
|
||||
|
||||
|
||||
def test_all_columns(self, temp_db_cursor):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.add(3, 'hum')
|
||||
buf.add(None, 'f\\t')
|
||||
|
||||
buf.copy_out(temp_db_cursor, self.TABLE_NAME)
|
||||
|
||||
assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
|
||||
|
||||
|
||||
def test_selected_columns(self, temp_db_cursor):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.add('foo')
|
||||
|
||||
buf.copy_out(temp_db_cursor, self.TABLE_NAME,
|
||||
columns=['colB'])
|
||||
|
||||
assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
|
||||
|
||||
|
||||
def test_reordered_columns(self, temp_db_cursor):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.add('one', 1)
|
||||
buf.add(' two ', 2)
|
||||
|
||||
buf.copy_out(temp_db_cursor, self.TABLE_NAME,
|
||||
columns=['colB', 'colA'])
|
||||
|
||||
assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
|
||||
|
||||
|
||||
def test_special_characters(self, temp_db_cursor):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.add('foo\tbar')
|
||||
buf.add('sun\nson')
|
||||
buf.add('\\N')
|
||||
|
||||
buf.copy_out(temp_db_cursor, self.TABLE_NAME,
|
||||
columns=['colB'])
|
||||
|
||||
assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
|
||||
(None, 'sun\nson'),
|
||||
(None, '\\N')}
|
||||
|
||||
|
||||
|
||||
class TestCopyBufferJson:
|
||||
TABLE_NAME = 'copytable'
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_test_table(self, table_factory):
|
||||
table_factory(self.TABLE_NAME, 'colA INT, colB JSONB')
|
||||
|
||||
|
||||
def table_rows(self, cursor):
|
||||
cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
|
||||
results = {k: v for k,v in cursor}
|
||||
|
||||
assert len(results) == cursor.rowcount
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def test_json_object(self, temp_db_cursor):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.add(1, json.dumps({'test': 'value', 'number': 1}))
|
||||
|
||||
buf.copy_out(temp_db_cursor, self.TABLE_NAME)
|
||||
|
||||
assert self.table_rows(temp_db_cursor) == \
|
||||
{1: {'test': 'value', 'number': 1}}
|
||||
|
||||
|
||||
def test_json_object_special_chras(self, temp_db_cursor):
|
||||
with db_utils.CopyBuffer() as buf:
|
||||
buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
|
||||
|
||||
buf.copy_out(temp_db_cursor, self.TABLE_NAME)
|
||||
|
||||
assert self.table_rows(temp_db_cursor) == \
|
||||
{1: {'te\tst': 'va\nlue', 'nu"mber': None}}
|
||||
Reference in New Issue
Block a user