""" Routes """ import copy import io import itertools import json import logging import subprocess import tempfile from datetime import datetime from operator import itemgetter from pathlib import Path from urllib.parse import quote_plus, urlencode import requests import tableschema from backports.datetime_fromisoformat import MonkeyPatch from commonmark import commonmark from flask import make_response, redirect, render_template, request, url_for from validata_core import compute_badge, csv_helpers, messages from validata_core.loaders import custom_loaders import tabulator from . import app, config, ui_config, table_schema_catalog, schema_from_url from .ui_util import flash_error, flash_warning from .validata_util import ValidataSource MonkeyPatch.patch_fromisoformat() log = logging.getLogger(__name__) class SchemaInstance(): """Handly class to handle schema information""" def __init__(self, url=None, name=None, ref=None, spec=None): """This function is not intended to be called directly but via from_parameters() static method!""" self.url = url self.name = name self.ref = ref self.spec = spec @staticmethod def from_parameters(parameter_dict, table_schema_catalog): """Initializes schema instance from requests dict and tableschema catalog (for name ref) """ schema_url, schema_name, schema_ref = None, None, None # From schema_url if 'schema_url' in parameter_dict: schema_url = parameter_dict["schema_url"] # from schema_name (and schema_ref) elif 'schema_name' in parameter_dict: schema_name = parameter_dict['schema_name'] schema_ref = parameter_dict.get('schema_ref') # Unknown schema name? table_schema_reference = table_schema_catalog.references.get(schema_name) if table_schema_reference is None: return None schema_url = table_schema_reference.get_schema_url() # else??? else: return None return SchemaInstance(schema_url, schema_name, schema_ref, schema_from_url(schema_url)) def request_parameters(self): if self.name: return { 'schema_name': self.name, 'schema_ref': '' if self.ref is None else self.ref } return { 'schema_url': self.url } def extract_source_data(source: ValidataSource, preview_rows_nb=5): """ Computes table preview """ def stringify(val): """ Transform value into string """ return '' if val is None else str(val) header = None rows = [] nb_rows = 0 options = {} if source.format == "csv": options['delimiter'] = csv_helpers.detect_dialect(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders).delimiter with tabulator.Stream(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders, **options) as stream: for row in stream: if header is None: header = ['' if v is None else v for v in row] else: rows.append(list(map(stringify, row))) nb_rows += 1 preview_rows_nb = min(preview_rows_nb, nb_rows) return {'header': header, 'rows_nb': nb_rows, 'data_rows': rows, 'preview_rows_nb': preview_rows_nb, 'preview_rows': rows[:preview_rows_nb]} def improve_errors(errors): """Add context to errors, converts markdown content to HTML""" def improve_err(err): """Adds context info based on row-nb presence and converts content to HTML""" # Context update_keys = { 'context': 'body' if 'row-number' in err and not err['row-number'] is None else 'table', } # markdown to HTML (with default values for 'title' and 'content') # Set default title if no title if not 'title' in err: update_keys['title'] = '[{}]'.format(err['code']) # Convert message to markdown only if no content # => for pre-checks errors if 'message' in err and not 'content' in err: update_keys['message'] = commonmark(err['message']) # Else, default message elif not 'message' in err or err['message'] is None: update_keys['message'] = '[{}]'.format(err['code']) # Message content md_content = '*content soon available*' if not 'content' in err else err['content'] update_keys['content'] = commonmark(md_content) return {**err, **update_keys} return list(map(improve_err, errors)) def create_validata_ui_report(validata_core_report, schema_dict): """ Creates an error report easier to handle and display in templates: - only one table - errors are contextualized - error-counts is ok - errors are grouped by lines - errors are separated into "structure" and "body" - error messages are improved """ report = copy.deepcopy(validata_core_report) # One table is enough del report['table-count'] report['table'] = report['tables'][0] del report['tables'] del report['table']['error-count'] del report['table']['time'] del report['table']['valid'] del report['valid'] # use _ instead of - to ease information picking in jinja2 template report['table']['row_count'] = report['table']['row-count'] # Handy col_count info headers = report['table'].get('headers', []) report['table']['col_count'] = len(headers) # Computes column info fields_dict = {f['name']: (f.get('title', 'titre non défini'), f.get('description', '')) for f in schema_dict.get('fields', [])} report['table']['headers_title'] = [fields_dict[h][0] if h in fields_dict else 'colonne inconnue' for h in headers] report['table']['headers_description'] = [fields_dict[h][1] if h in fields_dict else 'Cette colonne n\'est pas définie dans le schema' for h in headers] # Provide better (french) messages errors = improve_errors(report['table']['errors']) del report['table']['errors'] # Count errors report['error_count'] = len(errors) del report['error-count'] # Then group them in 2 groups : structure and body report['table']['errors'] = {'structure': [], 'body': []} for err in errors: if err['tag'] == 'structure': report['table']['errors']['structure'].append(err) else: report['table']['errors']['body'].append(err) # Checks if there are structure errors different to invalid-column-delimiter structure_errors = report['table']['errors']['structure'] report['table']['do_display_body_errors'] = len(structure_errors) == 0 or \ all(err['code'] == 'invalid-column-delimiter' for err in structure_errors) # Checks if a column comparison is needed header_errors = ('missing-headers', 'extra-headers', 'wrong-headers-order') structure_errors = [{**err, 'in_column_comp': err['code'] in header_errors} for err in structure_errors] report['table']['errors']['structure'] = structure_errors column_comparison_needed = any(err['in_column_comp'] == True for err in structure_errors) column_comparison_table = [] if column_comparison_needed: column_comparison_table = [] field_names = [f['name'] for f in schema_dict.get('fields', [])] has_case_errors = False for t in itertools.zip_longest(headers, field_names, fillvalue=''): status = 'ok' if t[0] == t[1] else 'ko' if not has_case_errors and status == 'ko' and t[0].lower() == t[1].lower(): has_case_errors = True column_comparison_table.append((*t, status)) info = {} info['table'] = column_comparison_table info['has_missing'] = len(headers) < len(field_names) info['has_case_errors'] = has_case_errors report['table']['column_comparison_info'] = info report['table']['column_comparison_needed'] = column_comparison_needed # Group body errors by row id rows = [] current_row_id = 0 for err in report['table']['errors']['body']: if not 'row-number' in err: print('ERR', err) row_id = err['row-number'] del err['row-number'] del err['context'] if row_id != current_row_id: current_row_id = row_id rows.append({'row_id': current_row_id, 'errors': {}}) column_id = err.get('column-number') if column_id is not None: del err['column-number'] rows[-1]['errors'][column_id] = err else: rows[-1]['errors']['row'] = err report['table']['errors']['body_by_rows'] = rows # Sort by error names in statistics stats = report['table']['error-stats'] code_title_map = messages.ERROR_MESSAGE_DEFAULT_TITLE for key in ('structure-errors', 'value-errors'): # convert dict into tuples with french title instead of error code # and sorts by title stats[key]['count-by-code'] = sorted([(code_title_map.get(k, k), v) for k, v in stats[key]['count-by-code'].items()], key=itemgetter(0)) return report def compute_badge_message_and_color(badge): """Computes message and color from badge information""" structure = badge['structure'] body = badge.get('body') # Bad structure, stop here if structure == 'KO': return ( 'structure invalide', 'red') # No body error if body == 'OK': return ('structure invalide', 'orange') if structure == 'WARN' else ('valide', 'green') # else compute quality ratio percent p = (1 - badge['error-ratio']) * 100.0 msg = 'cellules valides : {:.1f}%'.format(p) return (msg, 'red') if body == 'KO' else (msg, 'orange') def get_badge_url_and_message(badge): """Gets badge url from badge information""" msg, color = compute_badge_message_and_color(badge) return ('{}static/v1.svg?label=Validata&message={}&color={}'.format( config.SHIELDS_IO_BASE_URL, quote_plus(msg), color), msg) def validate(schema_instance: SchemaInstance, source: ValidataSource): """ Validate source and display report """ # Validation is done through http call to validata-api if config.API_VALIDATE_ENDPOINT is None: flash_error("No Validate endpoint defined :-(") return redirect(url_for("custom_validator")) api_url = config.API_VALIDATE_ENDPOINT # Useful to receive response as JSON headers = {"Accept": "application/json"} try: if source.is_url(): params = { "schema": schema_instance.url, "url": source.get_url(), } req = requests.get(api_url, params=params, headers=headers) else: files = {'file': (source.name, io.BytesIO(source.source))} data = {"schema": schema_instance.url} req = requests.post(api_url, data=data, files=files, headers=headers) # 400 if req.status_code == 400: json_response = req.json() flash_error("Une erreur est survenue durant la validation: {}" .format(json_response.get('message'))) return redirect(url_for("home")) if not req.ok: flash_error("Un erreur s'est produite côté serveur :-(") return redirect(url_for("home")) json_response = req.json() validata_core_report = json_response['report'] schema_dict = json_response['schema'] except requests.ConnectionError as err: logging.exception(err) flash_error(str(err)) return redirect(url_for('home')) # Computes badge from report and badge configuration badge = compute_badge(validata_core_report, config.BADGE_CONFIG) badge_url, badge_msg = get_badge_url_and_message(badge) source_errors = [err for err in validata_core_report['tables'][0]['errors'] if err['code'] == 'source-error'] if source_errors: err = source_errors[0] msg = "l'encodage du fichier est invalide. Veuillez le corriger" if 'charmap' in err[ 'message'] else err['message'] flash_error('Erreur de source : {}'.format(msg)) return redirect(url_for('custom_validator')) source_data = extract_source_data(source) # handle report date report_datetime = datetime.fromisoformat(validata_core_report['date']).astimezone() # Enhance validata_core_report validata_report = create_validata_ui_report(validata_core_report, schema_dict) # Display report to the user validator_form_url = compute_validation_form_url(schema_instance) schema_info, validator_title = compute_schema_info(schema_instance.spec) pdf_report_url = url_for('pdf_report')+'?'+urlencode(schema_instance.request_parameters()) return render_template('validation_report.html', title='Rapport de validation', schema_info=schema_info, report=validata_report, pdf_report_url=pdf_report_url, validation_date=report_datetime.strftime('le %d/%m/%Y à %Hh%M'), source=source, source_type=source.type, source_data=source_data, print_mode=request.args.get('print', 'false') == 'true', badge_url=badge_url, badge_msg=badge_msg, report_str=json.dumps(validata_report, sort_keys=True, indent=2), breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, {'url': validator_form_url, 'title': validator_title}, ]) def bytes_data(f): """ Gets bytes data from Werkzeug FileStorage instance """ iob = io.BytesIO() f.save(iob) iob.seek(0) return iob.getvalue() def hydrate_ui_config(ui_config, table_schema_catalog): hydrated_ui_config = ui_config.copy() table_schema_ref_list = [] for name, ref in sorted(table_schema_catalog.references.items(), key=itemgetter(0)): table_schema = ref.get_table_schema() info = { "name": name, **{k: v for k, v in table_schema.descriptor.items() if k != 'fields'} } table_schema_ref_list.append(info) # TODO: change this hard-coded affectation hydrated_ui_config['sections'][0]['catalog'] = table_schema_ref_list return hydrated_ui_config # Routes @app.route('/') def home(): """ Home page """ flash_warning('Ce service est fourni en mode beta - certains problèmes peuvent subsister - nous mettons tout en œuvre pour améliorer son fonctionnement en continu.') return render_template('home.html', title='Accueil', config=hydrate_ui_config(ui_config, table_schema_catalog)) @app.route('/pdf') def pdf_report(): """PDF report generation""" err_prefix = 'Erreur de génération du rapport PDF' url_param = request.args.get('url') if not url_param: flash_error(err_prefix + ': URL non fournie') return redirect(url_for('home')) schema_instance = SchemaInstance.from_parameters(request.args, table_schema_catalog) if schema_instance is None: flash_error(err_prefix + ': Information de schema non fournie') return redirect(url_for('home')) # Compute pdf url report base_url = url_for('custom_validator', _external=True) parameter_dict = { 'input': 'url', 'print': 'true', 'url': url_param, **schema_instance.request_parameters() } validation_url = base_url + '?' + urlencode(parameter_dict) # Create temp file to save validation report with tempfile.NamedTemporaryFile(prefix='validata_{}_report_'.format(datetime.now().timestamp()), suffix='.pdf') as tmpfile: tmp_pdf_report = Path(tmpfile.name) # Use chromium headless to generate PDF from validation report page cmd = ['chromium', '--headless', '--disable-gpu', '--print-to-pdf={}'.format(str(tmp_pdf_report)), validation_url] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: flash_error(err_prefix) log.error("Command %r returned an error: %r", cmd, result.stdout.decode('utf-8')) if tmp_pdf_report.exists(): tmp_pdf_report.unlink() return redirect(url_for('home')) # Send PDF report pdf_filename = 'Rapport de validation {}.pdf'.format(datetime.now().strftime('%d-%m-%Y %Hh%M')) response = make_response(tmp_pdf_report.read_bytes()) response.headers.set('Content-disposition', 'attachment', filename=pdf_filename) response.headers.set('Content-type', 'application/pdf') response.headers.set('Content-length', tmp_pdf_report.stat().st_size) tmp_pdf_report.unlink() return response def compute_schema_info(table_schema: tableschema.Schema): """Factor code for validator form page""" schema_info = {k: v for k, v in table_schema.descriptor.items() if k != 'fields'} title = "Schéma « {} »".format(schema_info.get('title')) return schema_info, title def compute_validation_form_url(schema_instance: SchemaInstance): """Computes validation form url with schema URL parameter""" url = url_for('custom_validator') param_list = ['{}={}'.format(k, quote_plus(v)) for k, v in schema_instance.request_parameters().items()] return "{}?{}".format(url, '&'.join(param_list)) @app.route('/table_schema', methods=['GET', 'POST']) def custom_validator(): """Validator form""" # Check that validata-api URL is set if config.API_VALIDATE_ENDPOINT is None: flash_error("URL de connexion à l'API non indiquée :-(") return redirect(url_for('home')) if request.method == 'GET': # input is a hidden form parameter to know # if this is the initial page display or if the validation has been asked for input_param = request.args.get('input') # url of resource to be validated url_param = request.args.get("url") schema_instance = SchemaInstance.from_parameters(request.args, table_schema_catalog) if schema_instance is None: flash_error("Aucun schéma passé en paramètre") return redirect(url_for('home')) # First form display if input_param is None: schema_info, title = compute_schema_info(schema_instance.spec) return render_template('validation_form.html', title=title, schema_info=schema_info, schema_params=schema_instance.request_parameters(), breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, ]) # Process URL else: if url_param is None or url_param == '': flash_error("Vous n'avez pas indiqué d'url à valider") return redirect(compute_validation_form_url(schema_instance)) try: return validate(schema_instance, ValidataSource('url', url_param, url_param)) except tabulator.exceptions.FormatError as e: flash_error('Erreur : Format de ressource non supporté') log.info(e) return redirect(compute_validation_form_url(schema_instance)) except tabulator.exceptions.HTTPError as e: flash_error('Erreur : impossible d\'accéder au fichier source en ligne') log.info(e) return redirect(compute_validation_form_url(schema_instance)) else: # POST schema_instance = SchemaInstance.from_parameters(request.form, table_schema_catalog) if schema_instance is None: flash_error('Aucun schéma défini') return redirect(url_for('home')) input_param = request.form.get('input') if input_param is None: flash_error("Vous n'avez pas indiqué de fichier à valider") return redirect(compute_validation_form_url(schema_instance)) # File validation if input_param == 'file': f = request.files.get('file') if f is None: flash_warning("Vous n'avez pas indiqué de fichier à valider") return redirect(compute_validation_form_url(schema_instance)) b_content = bytes_data(f) return validate(schema_instance, ValidataSource('file', f.filename, b_content)) return 'Bizarre, vous avez dit bizarre ?'