#!/usr/bin/env python3 """ Routes """ import copy import itertools import json import subprocess import time from datetime import datetime from io import BytesIO from operator import itemgetter from pathlib import Path from urllib.parse import quote_plus from backports.datetime_fromisoformat import MonkeyPatch from commonmark import commonmark from flask import make_response, redirect, render_template, request, url_for import tabulator from validata_core import csv_helpers, messages from validata_core.loaders import custom_loaders from validata_ui import app from validata_ui.ui_util import flash_error, flash_warning from validata_ui.validata_util import ValidataSource from validata_ui.validate_helper import ValidatorHelper MonkeyPatch.patch_fromisoformat() def extract_source_data(source: ValidataSource, preview_rows_nb=5): """ Computes table preview """ def stringify(val): """ Transform value into string """ return '' if val is None else str(val) header = None rows = [] nb_rows = 0 options = {} if source.format == "csv": options['delimiter'] = csv_helpers.detect_dialect(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders).delimiter with tabulator.Stream(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders, **options) as stream: for row in stream: if header is None: header = ['' if v is None else v for v in row] else: rows.append(list(map(stringify, row))) nb_rows += 1 preview_rows_nb = min(preview_rows_nb, nb_rows) return {'header': header, 'rows_nb': nb_rows, 'data_rows': rows, 'preview_rows_nb': preview_rows_nb, 'preview_rows': rows[:preview_rows_nb]} def improve_errors(errors): """Add context to errors, converts markdown content to HTML""" def improve_err(err): """Adds context info based on row-nb presence and converts content to HTML""" # Context update_keys = { 'context': 'body' if 'row-number' in err and not err['row-number'] is None else 'table', } # markdown to HTML (with default values for 'title' and 'content') # Set default title if no title if not 'title' in err: update_keys['title'] = '[{}]'.format(err['code']) # Convert message to markdown only if no content # => for pre-checks errors if 'message' in err and not 'content' in err: update_keys['message'] = commonmark(err['message']) # Else, default message elif not 'message' in err or err['message'] is None: update_keys['message'] = '[{}]'.format(err['code']) # Message content md_content = '*content soon available*' if not 'content' in err else err['content'] update_keys['content'] = commonmark(md_content) return {**err, **update_keys} return list(map(improve_err, errors)) def create_validata_ui_report(validata_core_report, schema): """ Creates an error report easier to handle and display in templates: - only one table - errors are contextualized - error-counts is ok - errors are grouped by lines - errors are separated into "structure" and "body" - error messages are improved """ report = copy.deepcopy(validata_core_report) # One table is enough del report['table-count'] report['table'] = report['tables'][0] del report['tables'] del report['table']['error-count'] del report['table']['time'] del report['table']['valid'] del report['valid'] # use _ instead of - to ease information picking in jinja2 template report['table']['row_count'] = report['table']['row-count'] # Handy col_count info headers = report['table'].get('headers', []) report['table']['col_count'] = len(headers) # Computes column info schema_fields = schema.get('fields', []) fields_dict = {f['name']: (f.get('title', 'titre non défini'), f.get('description', '')) for f in schema_fields} report['table']['headers_title'] = [fields_dict[h][0] if h in fields_dict else 'colonne inconnue' for h in headers] report['table']['headers_description'] = [fields_dict[h][1] if h in fields_dict else 'Cette colonne n\'est pas définie dans le schema' for h in headers] # Provide better (french) messages errors = improve_errors(report['table']['errors']) del report['table']['errors'] # Count errors report['error_count'] = len(errors) del report['error-count'] # Then group them in 2 groups : structure and body report['table']['errors'] = {'structure': [], 'body': []} for err in errors: if err['tag'] == 'structure': report['table']['errors']['structure'].append(err) else: report['table']['errors']['body'].append(err) # Checks if there are structure errors different to invalid-column-delimiter structure_errors = report['table']['errors']['structure'] report['table']['do_display_body_errors'] = len(structure_errors) == 0 or \ all(err['code'] == 'invalid-column-delimiter' for err in structure_errors) # Checks if a column comparison is needed header_errors = ('missing-headers', 'extra-headers', 'wrong-headers-order') structure_errors = [{**err, 'in_column_comp': err['code'] in header_errors} for err in structure_errors] report['table']['errors']['structure'] = structure_errors column_comparison_needed = any(err['in_column_comp'] == True for err in structure_errors) column_comparison_table = [] if column_comparison_needed: column_comparison_table = [] field_names = [f['name'] for f in schema_fields] has_case_errors = False for t in itertools.zip_longest(headers, field_names, fillvalue=''): status = 'ok' if t[0] == t[1] else 'ko' if not has_case_errors and status == 'ko' and t[0].lower() == t[1].lower(): has_case_errors = True column_comparison_table.append((*t, status)) info = {} info['table'] = column_comparison_table info['has_missing'] = len(headers) < len(field_names) info['has_case_errors'] = has_case_errors report['table']['column_comparison_info'] = info report['table']['column_comparison_needed'] = column_comparison_needed # Group body errors by row id rows = [] current_row_id = 0 for err in report['table']['errors']['body']: if not 'row-number' in err: print('ERR', err) row_id = err['row-number'] del err['row-number'] del err['context'] if row_id != current_row_id: current_row_id = row_id rows.append({'row_id': current_row_id, 'errors': {}}) column_id = err.get('column-number') if column_id is not None: del err['column-number'] rows[-1]['errors'][column_id] = err else: rows[-1]['errors']['row'] = err report['table']['errors']['body_by_rows'] = rows # Sort by error names in statistics stats = report['table']['error-stats'] code_title_map = messages.ERROR_MESSAGE_DEFAULT_TITLE for key in ('structure-errors', 'value-errors'): # convert dict into tuples with french title instead of error code # and sorts by title stats[key]['count-by-code'] = sorted([(code_title_map.get(k, k), v) for k, v in stats[key]['count-by-code'].items()], key=itemgetter(0)) return report def validate(schema_code, source: ValidataSource): """ Validate source and display report """ try: validata_core_report = ValidatorHelper.validate( schema_code=schema_code, force_strings=True, **source.get_tabulator_params(), ) except tabulator.exceptions.FormatError: flash_error('Erreur : format de fichier non supporté') return redirect(url_for('scdl_validator', val_code=schema_code)) source_data = extract_source_data(source) # handle report date report_datetime = datetime.fromisoformat(validata_core_report['date']).astimezone() # Enhance validata_core_report validata_report = create_validata_ui_report(validata_core_report, ValidatorHelper.schema(schema_code).descriptor) # Display report to the user val_info = ValidatorHelper.schema_info(schema_code) return render_template('validation_report.html', title='Rapport de validation', val_info=ValidatorHelper.schema_info(schema_code), report=validata_report, validation_date=report_datetime.strftime('le %d/%m/%Y à %Hh%M'), source=source, source_type=source.type, source_data=source_data, print_mode=request.args.get('print', 'false') == 'true', report_str=json.dumps(validata_report, sort_keys=True, indent=2), breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, {'url': url_for('scdl_validator', val_code=schema_code), 'title': val_info['title']}]) def bytes_data(f): """ Gets bytes data from Werkzeug FileStorage instance """ iob = BytesIO() f.save(iob) iob.seek(0) return iob.getvalue() # Routes @app.route('/') def home(): """ Home page """ validators = ValidatorHelper.schema_info_list() flash_warning('Ce service est fourni en mode beta - certains problèmes peuvent subsister - nous mettons tout en œuvre pour améliorer son fonctionnement en continu') return render_template('home.html', title='Accueil', validators=validators) @app.route('/validators') def validators(): """ No validators page """ return redirect(url_for('home')) @app.route('/pdf/') def pdf_report(val_code): """PDF report generation""" err_prefix = 'Erreur de génération du rapport PDF' if not ValidatorHelper.schema_exist(val_code): flash_error(err_prefix + ': schéma inconnu') return redirect(url_for('scdl_validator', val_code=val_code)) url_param = request.args.get('url') if not url_param: flash_error(err_prefix + ': URL non fournie') return redirect(url_for('scdl_validator', val_code=val_code)) validation_url = '{}?input=url&print=true&url={}'.format(url_for('scdl_validator', val_code=val_code, _external=True), quote_plus(url_param)) # temporary pdf_report.pdf filename tmp_pdf_report = Path('/tmp') / 'validata_{}_{}_pdf_report.pdf'.format(val_code, str(time.time())) cmd = ['chromium', '--headless', '--disable-gpu', '--print-to-pdf={}'.format(str(tmp_pdf_report)), validation_url] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: flash_error(err_prefix) log.error("Command %r returned an error: %r", cmd, result.stdout.decode('utf-8')) if tmp_pdf_report.exists(): tmp_pdf_report.unlink() return redirect(url_for('scdl_validator', val_code=val_code)) response = make_response(tmp_pdf_report.open('rb').read()) response.headers.set('Content-disposition', 'attachment', filename='report.pdf') response.headers.set('Content-type', 'application/pdf') response.headers.set('Content-length', tmp_pdf_report.stat().st_size) tmp_pdf_report.unlink() return response @app.route('/validators/', methods=['GET', 'POST']) def scdl_validator(val_code): """ Validator page """ if not ValidatorHelper.schema_exist(val_code): flash_error('Validateur [{}] inconnu'.format(val_code)) return redirect(url_for('home')) if request.method == 'GET': val_info = ValidatorHelper.schema_info(val_code) input_param = request.args.get('input') # First form display if input_param is None or input_param not in ('url', 'example'): return render_template('validation_form.html', title=val_info['title'], val_info=val_info, breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, ]) # Process URL else: url_param = request.args.get('url') if url_param is None or url_param == '': flash_error("Vous n'avez pas indiqué d'url à valider") return redirect(url_for('scdl_validator', val_code=val_code)) return validate(val_code, ValidataSource('url', url_param, url_param)) else: # POST input_param = request.form.get('input') if input_param is None: flash_error('Aucun fichier à valider') return redirect(url_for('scdl_validator', val_code=val_code)) # File validation if input_param == 'file': f = request.files.get('file') if f is None: flash_warning("Vous n'avez pas indiqué de fichier à valider") return redirect(url_for('scdl_validator', val_code=val_code)) return validate(val_code, ValidataSource('file', f.filename, bytes_data(f))) return 'Bizarre, vous avez dit bizarre ?'