Commit 06eb317e authored by Pierre Dittgen's avatar Pierre Dittgen
Browse files

Fix flake8 issues

parent a3be893a
"""Validata UI."""
import logging
from datetime import timedelta
......
......@@ -87,7 +87,7 @@ BROWSERLESS_API_TOKEN = os.getenv("BROWSERLESS_API_TOKEN") or None
# Cache backend (default is SQLite)
CACHE_BACKEND = os.getenv("CACHE_BACKEND") or "sqlite"
log.info(f"Cache backend: {CACHE_BACKEND}")
log.info("Cache backend: %r", CACHE_BACKEND)
# Caching time for schema requests in minutes
CACHE_EXPIRE_AFTER = os.getenv("CACHE_EXPIRE_AFTER") or None
......
"""Config model using pydantic"""
"""Config model using pydantic."""
from typing import List, Optional, Union
from pydantic import BaseModel, HttpUrl, root_validator
class Link(BaseModel):
"""Link class."""
title: str
url: HttpUrl
class ExternalLink(BaseModel):
"""ExternalLink class."""
name: str
type: str
title: str
......@@ -18,16 +22,22 @@ class ExternalLink(BaseModel):
class Schema(BaseModel):
"""Schema class."""
name: str
repo_url: HttpUrl
class Catalog(BaseModel):
"""Catalog class."""
version: int
schemas: List[Schema]
class Section(BaseModel):
"""Section class."""
name: str
title: str
description: Optional[str] = None
......@@ -35,7 +45,8 @@ class Section(BaseModel):
links: Optional[List[ExternalLink]] = None
@root_validator
def check_catalog_or_links(cls, values):
def check_catalog_or_links(cls, values): # noqa
"""Check that catalog or links attributes is defined but not both."""
catalog, links = values.get("catalog"), values.get("links")
if catalog is None and links is None:
raise ValueError("catalog or links field must be defined")
......@@ -45,18 +56,26 @@ class Section(BaseModel):
class Footer(BaseModel):
"""Footer section."""
links: List[Link]
class Header(BaseModel):
"""Header section."""
links: List[Link]
class Homepage(BaseModel):
"""Homepage section."""
sections: List[Section]
class Config(BaseModel):
"""Config class defines header, footer and homepage sections."""
footer: Footer
header: Header
homepage: Homepage
......@@ -38,11 +38,13 @@ class BrowserlessPDFRenderer(PDFRenderer):
"""Browserless IO implementation."""
def __init__(self, api_url: str, api_token: str):
"""Create browserless.io implementation."""
log.info("BrowserlessPDFRenderer: creating instance with api_url = %r", api_url)
self.api_url = api_url
self.api_token = api_token
def render(self, url: str) -> bytes:
"""Call browserless.io service to generate PDF."""
headers = {
"Cache-Control": "no-cache",
"Content-Type": "application/json",
......@@ -82,6 +84,7 @@ class ChromiumHeadlessPDFRenderer(PDFRenderer):
"""Chromium implementation."""
def render(self, url: str) -> bytes:
"""Render PDF document using Chromium headless."""
# Create temp file to save validation report
# This temp file will be automatically deleted on context exit
with tempfile.NamedTemporaryFile(
......
"""
Util UI functions
"""
"""Util UI functions."""
from flask import flash
def flash_error(msg):
""" Flash bootstrap error message """
"""Flash bootstrap error message."""
flash(msg, "danger")
def flash_warning(msg):
""" Flash bootstrap warning message """
"""Flash bootstrap warning message."""
flash(msg, "warning")
def flash_success(msg):
""" Flash bootstrap success message """
"""Flash bootstrap success message."""
flash(msg, "success")
def flash_info(msg):
""" Flash bootstrap info message """
"""Flash bootstrap info message."""
flash(msg, "info")
"""Utility functions"""
"""Utility functions."""
import unicodedata
def strip_accents(s):
"""Remove accents from string, used to sort normalized strings"""
"""Remove accents from string, used to sort normalized strings."""
return "".join(
c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn"
)
"""
Routes
"""
"""Routes."""
import copy
import io
import json
......@@ -31,16 +29,15 @@ log = logging.getLogger(__name__)
def get_schema_catalog(section_name):
"""Return a schema catalog associated to a section_name"""
"""Return a schema catalog associated to a section_name."""
return schema_catalog_registry.build_schema_catalog(section_name)
class SchemaInstance:
"""Handy class to handle schema information"""
"""Handy class to handle schema information."""
def __init__(self, parameter_dict):
"""Initializes schema instance from requests dict and tableschema Catalog
(for name ref)."""
"""Initialize schema instance and tableschema catalog."""
self.section_name = None
self.section_title = None
self.name = None
......@@ -72,8 +69,8 @@ class SchemaInstance:
# Look for schema catalog first
try:
table_schema_catalog = get_schema_catalog(self.section_name)
except Exception as ex:
log.exception(ex)
except Exception:
log.exception("")
abort(400, "Erreur de traitement du catalogue")
if table_schema_catalog is None:
abort(400, "Catalogue indisponible")
......@@ -124,24 +121,26 @@ class SchemaInstance:
try:
self.schema = fetch_schema(self.url)
except json.JSONDecodeError as e:
log.exception(e)
flash_error("Le format du schéma n'est pas reconnu")
except json.JSONDecodeError:
err_msg = "Le format du schéma n'est pas reconnu"
log.exception(err_msg)
flash_error(err_msg)
abort(redirect(url_for("home")))
except Exception as e:
log.exception(e)
flash_error("Impossible de récupérer le schéma")
except Exception:
err_msg = "Impossible de récupérer le schéma"
log.exception(err_msg)
flash_error(err_msg)
abort(redirect(url_for("home")))
def request_parameters(self):
if self.name:
return {
"schema_name": self.schema_and_section_name,
"schema_ref": "" if self.ref is None else self.ref,
}
return {"schema_url": self.url}
"""Build request parameter dict to identify schema."""
return {
"schema_name": self.schema_and_section_name,
"schema_ref": "" if self.ref is None else self.ref,
} if self.name else {"schema_url": self.url}
def find_section_title(self, section_name):
"""Return section title or None if not found."""
if config.CONFIG:
for section in config.CONFIG.homepage.sections:
if section.name == section_name:
......@@ -166,11 +165,10 @@ def build_template_source_data(header, rows, preview_rows_nb=5):
def build_ui_errors(errors):
"""Add context to errors, converts markdown content to HTML"""
"""Add context to errors, converts markdown content to HTML."""
def improve_err(err):
"""Adds context info based on row-nb presence and converts content to HTML"""
"""Add context info based on row-nb presence and converts content to HTML."""
# Context
update_keys = {
"context": "body"
......@@ -196,7 +194,9 @@ def build_ui_errors(errors):
def create_validata_ui_report(rows_count: int, validata_core_report, schema_dict):
"""Creates an error report easier to handle and display in templates:
"""Create an error report easier to handle and display using templates.
improvements done:
- only one table
- errors are contextualized
- error-counts is ok
......@@ -310,7 +310,7 @@ def create_validata_ui_report(rows_count: int, validata_core_report, schema_dict
def compute_badge_message_and_color(badge):
"""Computes message and color from badge information"""
"""Compute message and color from badge information."""
structure = badge["structure"]
body = badge.get("body")
......@@ -333,8 +333,7 @@ def compute_badge_message_and_color(badge):
def get_badge_url_and_message(badge):
"""Gets badge url from badge information"""
"""Get badge url from badge information."""
msg, color = compute_badge_message_and_color(badge)
badge_url = "{}?{}".format(
urljoin(config.SHIELDS_IO_BASE_URL, "/static/v1.svg"),
......@@ -344,8 +343,7 @@ def get_badge_url_and_message(badge):
def validate(schema_instance: SchemaInstance, validata_resource: ValidataResource):
""" Validate source and display report """
"""Validate source and display report."""
def compute_resource_info(resource: ValidataResource):
source = resource.get_source()
return {
......@@ -463,7 +461,7 @@ def validate(schema_instance: SchemaInstance, validata_resource: ValidataResourc
def bytes_data(f):
""" Gets bytes data from Werkzeug FileStorage instance """
"""Get bytes data from Werkzeug FileStorage instance."""
iob = io.BytesIO()
f.save(iob)
iob.seek(0)
......@@ -475,7 +473,6 @@ def retrieve_schema_catalog(section: Section):
def format_error_message(err_message, exc):
"""Prepare a bootstrap error message with details if wanted."""
exception_text = "\n".join([str(arg) for arg in exc.args])
return f"""{err_msg}
......@@ -493,7 +490,6 @@ def retrieve_schema_catalog(section: Section):
return (schema_catalog, None)
except Exception as exc:
log.exception(exc)
err_msg = "une erreur s'est produite"
if isinstance(exc, requests.ConnectionError):
err_msg = "problème de connexion"
......@@ -501,6 +497,7 @@ def retrieve_schema_catalog(section: Section):
err_msg = "format JSON incorrect"
elif isinstance(exc, jsonschema.exceptions.ValidationError):
err_msg = "le catalogue ne respecte pas le schéma de référence"
log.exception(err_msg)
error_catalog = {
**{k: v for k, v in section.dict().items() if k != "catalog"},
......@@ -514,11 +511,10 @@ def retrieve_schema_catalog(section: Section):
@app.route("/")
def home():
""" Home page """
"""Home page."""
def iter_sections():
"""Yield sections of the home page, filled with schema metadata."""
# Iterate on all sections
for section in config.CONFIG.homepage.sections:
......@@ -580,7 +576,7 @@ def home():
@app.route("/pdf")
def pdf_report():
"""PDF report generation"""
"""PDF report generation."""
err_prefix = "Erreur de génération du rapport PDF"
url_param = request.args.get("url")
......@@ -627,13 +623,12 @@ def pdf_report():
def extract_schema_metadata(table_schema: frictionless.Schema):
"""Gets author, contibutor, version...metadata from schema header"""
"""Get author, contibutor, version...metadata from schema header."""
return {k: v for k, v in table_schema.items() if k != "fields"}
def compute_schema_info(table_schema: frictionless.Schema, schema_url):
"""Factor code for validator form page"""
"""Factor code for validator form page."""
# Schema URL + schema metadata info
schema_info = {
"path": schema_url,
......@@ -646,15 +641,14 @@ def compute_schema_info(table_schema: frictionless.Schema, schema_url):
def compute_validation_form_url(request_parameters: dict):
"""Computes validation form url with schema URL parameter"""
"""Compute validation form url with schema URL parameter."""
url = url_for("custom_validator")
return "{}?{}".format(url, urlencode(request_parameters))
@app.route("/table-schema", methods=["GET", "POST"])
def custom_validator():
"""Validator form"""
"""Display validator form."""
if request.method == "GET":
# input is a hidden form parameter to know
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment