correctly quote strings when copying in data

Encapsulate the copy string in a class that ensures that
copy lines are written with correct quoting.
This commit is contained in:
Sarah Hoffmann
2021-06-10 09:36:43 +02:00
parent 2f6e4edcdb
commit a0a7b05c9f
5 changed files with 202 additions and 52 deletions

View File

@@ -4,6 +4,7 @@ Helper functions for handling DB accesses.
import subprocess
import logging
import gzip
import io
from nominatim.db.connection import get_pg_env
from nominatim.errors import UsageError
@@ -57,3 +58,49 @@ def execute_file(dsn, fname, ignore_errors=False, pre_code=None, post_code=None)
if ret != 0 or remain > 0:
raise UsageError("Failed to execute SQL file.")
# List of characters that need to be quoted for the copy command.
_SQL_TRANSLATION = {ord(u'\\') : u'\\\\',
ord(u'\t') : u'\\t',
ord(u'\n') : u'\\n'}
class CopyBuffer:
""" Data collector for the copy_from command.
"""
def __init__(self):
self.buffer = io.StringIO()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.buffer is not None:
self.buffer.close()
def add(self, *data):
""" Add another row of data to the copy buffer.
"""
first = True
for column in data:
if first:
first = False
else:
self.buffer.write('\t')
if column is None:
self.buffer.write('\\N')
else:
self.buffer.write(str(column).translate(_SQL_TRANSLATION))
self.buffer.write('\n')
def copy_out(self, cur, table, columns=None):
""" Copy all collected data into the given table.
"""
if self.buffer.tell() > 0:
self.buffer.seek(0)
cur.copy_from(self.buffer, table, columns=columns)

View File

@@ -93,7 +93,7 @@ class ICURuleLoader:
def _load_from_yaml(self):
rules = yaml.load(self.configfile.read_text())
rules = yaml.safe_load(self.configfile.read_text())
self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
@@ -122,6 +122,9 @@ class ICURuleLoader:
"""
content = self._get_section(rules, section)
if content is None:
return ''
if isinstance(content, str):
return (self.configfile.parent / content).read_text().replace('\n', ' ')
@@ -160,4 +163,5 @@ class ICURuleLoader:
abbrterms = (norm.transliterate(t.strip()) for t in parts[1].split(','))
for full, abbr in itertools.product(fullterms, abbrterms):
self.abbreviations[full].append(abbr)
if full and abbr:
self.abbreviations[full].append(abbr)

View File

@@ -14,6 +14,7 @@ import psycopg2.extras
from nominatim.db.connection import connect
from nominatim.db.properties import set_property, get_property
from nominatim.db.utils import CopyBuffer
from nominatim.db.sql_preprocessor import SQLPreprocessor
from nominatim.tokenizer.icu_rule_loader import ICURuleLoader
from nominatim.tokenizer.icu_name_processor import ICUNameProcessor, ICUNameProcessorRules
@@ -134,7 +135,7 @@ class LegacyICUTokenizer:
@define('CONST_Term_Normalization_Rules', "{0.term_normalization}");
@define('CONST_Transliteration', "{0.naming_rules.search_rules}");
require_once('{1}/tokenizer/legacy_icu_tokenizer.php');
""".format(self, phpdir)))
""".format(self, phpdir))) # pylint: disable=missing-format-attribute
def _save_config(self, config):
@@ -171,14 +172,15 @@ class LegacyICUTokenizer:
words[term] += cnt
# copy them back into the word table
copystr = io.StringIO(''.join(('{}\t{}\n'.format(*args) for args in words.items())))
with CopyBuffer() as copystr:
for args in words.items():
copystr.add(*args)
with conn.cursor() as cur:
copystr.seek(0)
cur.copy_from(copystr, 'word', columns=['word_token', 'search_name_count'])
cur.execute("""UPDATE word SET word_id = nextval('seq_word')
WHERE word_id is null""")
with conn.cursor() as cur:
copystr.copy_out(cur, 'word',
columns=['word_token', 'search_name_count'])
cur.execute("""UPDATE word SET word_id = nextval('seq_word')
WHERE word_id is null""")
conn.commit()
@@ -265,7 +267,6 @@ class LegacyICUNameAnalyzer:
table.
"""
to_delete = []
copystr = io.StringIO()
with self.conn.cursor() as cur:
# This finds us the rows in location_postcode and word that are
# missing in the other table.
@@ -278,26 +279,25 @@ class LegacyICUNameAnalyzer:
ON pc = word) x
WHERE pc is null or word is null""")
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
copystr.write(postcode)
copystr.write('\t ')
copystr.write(self.name_processor.get_search_normalized(postcode))
copystr.write('\tplace\tpostcode\t0\n')
with CopyBuffer() as copystr:
for postcode, word in cur:
if postcode is None:
to_delete.append(word)
else:
copystr.add(
postcode,
' ' + self.name_processor.get_search_normalized(postcode),
'place', 'postcode', 0)
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if to_delete:
cur.execute("""DELETE FROM WORD
WHERE class ='place' and type = 'postcode'
and word = any(%s)
""", (to_delete, ))
if copystr.getvalue():
copystr.seek(0)
cur.copy_from(copystr, 'word',
columns=['word', 'word_token', 'class', 'type',
'search_name_count'])
copystr.copy_out(cur, 'word',
columns=['word', 'word_token', 'class', 'type',
'search_name_count'])
def update_special_phrases(self, phrases, should_replace):
@@ -331,34 +331,24 @@ class LegacyICUNameAnalyzer:
"""
to_add = new_phrases - existing_phrases
copystr = io.StringIO()
added = 0
for word, cls, typ, oper in to_add:
term = self.name_processor.get_search_normalized(word)
if term:
copystr.write(word)
copystr.write('\t ')
copystr.write(term)
copystr.write('\t')
copystr.write(cls)
copystr.write('\t')
copystr.write(typ)
copystr.write('\t')
copystr.write(oper if oper in ('in', 'near') else '\\N')
copystr.write('\t0\n')
added += 1
with CopyBuffer() as copystr:
for word, cls, typ, oper in to_add:
term = self.name_processor.get_search_normalized(word)
if term:
copystr.add(word, term, cls, typ,
oper if oper in ('in', 'near') else None, 0)
added += 1
if copystr.tell() > 0:
copystr.seek(0)
cursor.copy_from(copystr, 'word',
copystr.copy_out(cursor, 'word',
columns=['word', 'word_token', 'class', 'type',
'operator', 'search_name_count'])
return added
def _remove_special_phrases(self, cursor, new_phrases, existing_phrases):
@staticmethod
def _remove_special_phrases(cursor, new_phrases, existing_phrases):
""" Remove all phrases from the databse that are no longer in the
new phrase list.
"""