# SPDX-License-Identifier: GPL-3.0-or-later # # This file is part of Nominatim. (https://nominatim.org) # # Copyright (C) 2023 by the Nominatim developer community. # For a full list of authors see the git log. """ Functions for specialised logging with HTML output. """ from typing import Any, cast from contextvars import ContextVar import textwrap import io import sqlalchemy as sa from sqlalchemy.ext.asyncio import AsyncConnection try: from pygments import highlight from pygments.lexers import PythonLexer, PostgresLexer from pygments.formatters import HtmlFormatter CODE_HIGHLIGHT = True except ModuleNotFoundError: CODE_HIGHLIGHT = False class BaseLogger: """ Interface for logging function. The base implementation does nothing. Overwrite the functions in derived classes which implement logging functionality. """ def get_buffer(self) -> str: """ Return the current content of the log buffer. """ return '' def function(self, func: str, **kwargs: Any) -> None: """ Start a new debug chapter for the given function and its parameters. """ def section(self, heading: str) -> None: """ Start a new section with the given title. """ def comment(self, text: str) -> None: """ Add a simple comment to the debug output. """ def var_dump(self, heading: str, var: Any) -> None: """ Print the content of the variable to the debug output prefixed by the given heading. """ def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None: """ Print the SQL for the given statement. """ def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> str: """ Return the comiled version of the statement. """ try: return str(cast('sa.ClauseElement', statement) .compile(conn.sync_engine, compile_kwargs={"literal_binds": True})) except sa.exc.CompileError: pass except NotImplementedError: pass return str(cast('sa.ClauseElement', statement).compile(conn.sync_engine)) class HTMLLogger(BaseLogger): """ Logger that formats messages in HTML. """ def __init__(self) -> None: self.buffer = io.StringIO() def get_buffer(self) -> str: return HTML_HEADER + self.buffer.getvalue() + HTML_FOOTER def function(self, func: str, **kwargs: Any) -> None: self._write(f"
Parameters:
{text}
") def var_dump(self, heading: str, var: Any) -> None: self._write(f'{sqlstr}{sqlstr}')
def _python_var(self, var: Any) -> str:
if CODE_HIGHLIGHT:
fmt = highlight(repr(var), PythonLexer(), HtmlFormatter(nowrap=True))
return f'{fmt}{str(var)}'
def _write(self, text: str) -> None:
""" Add the raw text to the debug output.
"""
self.buffer.write(text)
class TextLogger(BaseLogger):
""" Logger creating output suitable for the console.
"""
def __init__(self) -> None:
self.buffer = io.StringIO()
def get_buffer(self) -> str:
return self.buffer.getvalue()
def function(self, func: str, **kwargs: Any) -> None:
self._write(f"#### Debug output for {func}()\n\nParameters:\n")
for name, value in kwargs.items():
self._write(f' {name}: {self._python_var(value)}\n')
self._write('\n')
def section(self, heading: str) -> None:
self._write(f"\n# {heading}\n\n")
def comment(self, text: str) -> None:
self._write(f"{text}\n")
def var_dump(self, heading: str, var: Any) -> None:
self._write(f'{heading}:\n {self._python_var(var)}\n\n')
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement), width=78))
self._write(f"| {sqlstr}\n\n")
def _python_var(self, var: Any) -> str:
return str(var)
def _write(self, text: str) -> None:
self.buffer.write(text)
logger: ContextVar[BaseLogger] = ContextVar('logger', default=BaseLogger())
def set_log_output(fmt: str) -> None:
""" Enable collecting debug information.
"""
if fmt == 'html':
logger.set(HTMLLogger())
elif fmt == 'text':
logger.set(TextLogger())
else:
logger.set(BaseLogger())
def log() -> BaseLogger:
""" Return the logger for the current context.
"""
return logger.get()
def get_and_disable() -> str:
""" Return the current content of the debug buffer and disable logging.
"""
buf = logger.get().get_buffer()
logger.set(BaseLogger())
return buf
HTML_HEADER: str = """