#!/usr/bin/env python3 """ Routes """ import copy import json import os from collections import OrderedDict from datetime import datetime from io import BytesIO from pathlib import Path from flask import Flask, jsonify, redirect, render_template, request, url_for import tabulator from validata_ui import app, error_messages from validata_ui.util import (ValidataSource, flash_error, flash_info, flash_success, flash_warning) from validata_ui.validate_helper import ValidatorHelper from validata_validate import csv_helpers from validata_validate.loaders import custom_loaders def extract_source_data(source: ValidataSource, preview_rows_nb=5): """ Computes table preview """ def stringify(val): """ Transform value into string """ return '' if val is None else str(val) header = None rows = [] nb_rows = 0 options = {} if source.format == "csv": options['delimiter'] = csv_helpers.detect_dialect(source.data, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders).delimiter with tabulator.Stream(source.data, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders, **options) as stream: for row in stream: if header is None: header = ['' if v is None else v for v in row] else: rows.append(list(map(stringify, row))) nb_rows += 1 preview_rows_nb = min(preview_rows_nb, nb_rows) return {'header': header, 'rows_nb': nb_rows, 'data_rows': rows, 'preview_rows_nb': preview_rows_nb, 'preview_rows': rows[:preview_rows_nb]} def improve_messages(errors, headers, schema): """ Translates and improve error messages """ def error_message_default_func(error, headers, schema): """ Sets a new better error message """ error['title'] = error['code'] error['content'] = error.get('message', 'pas d\'information complémentaire') return error improved_errors = [] for error in errors: improve_func = ERROR_MESSAGE_FUNC.get(error['code'], error_message_default_func) improved_errors.append(improve_func(error, headers, schema)) return improved_errors def contextualize(errors): """ add context to errors """ def add_context(err): """ Adds context info based on row-nb presence """ context = 'body' if 'row-number' in err and not err['row-number'] is None else 'table' return {**err, 'context': context} return list(map(add_context, errors)) def create_validata_report(goodtables_report, schema): """ Creates an error report easier to handle and display in templates: - only one table - errors are contextualized - error-counts is ok - errors are grouped by lines - errors are separated into "structure" and "body" - error messages are improved """ report = copy.deepcopy(goodtables_report) # One table is enough del report['table-count'] report['table'] = report['tables'][0] del report['tables'] del report['table']['error-count'] del report['table']['time'] del report['table']['valid'] del report['valid'] # use _ instead of - to ease information picking in jinja2 template report['table']['row_count'] = report['table']['row-count'] # Handy col_count info headers = report['table'].get('headers', []) report['table']['col_count'] = len(headers) # Computes column info schema_fields = schema.get('fields', []) fields_dict = {f['name']: (f.get('title', 'titre non défini'), f.get('description', '')) for f in schema_fields} report['table']['headers_title'] = [fields_dict[h][0] if h in fields_dict else 'colonne inconnue' for h in headers] report['table']['headers_description'] = [fields_dict[h][1] if h in fields_dict else 'Cette colonne n\'est pas définie dans le schema' for h in headers] # Add context to errors errors = contextualize(report['table']['errors']) del report['table']['errors'] # Provide better (french) messages errors = improve_messages(errors, headers, schema) # Count errors report['error_count'] = len(errors) del report['error-count'] # Then group them in 2 groups : structure and body report['table']['errors'] = {'structure': [], 'body': []} for err in errors: if err['context'] != 'body': report['table']['errors']['structure'].append(err) else: report['table']['errors']['body'].append(err) # Checks if there are structure errors different to invalid-column-delimiter report['table']['display_body_errors'] = all(err['code'] == 'invalid-column-delimiter' for err in report['table']['errors']['structure']) # Group body errors by row id rows = [] current_row_id = 0 for err in report['table']['errors']['body']: if not 'row-number' in err: print('ERR', err) row_id = err['row-number'] del err['row-number'] del err['context'] if row_id != current_row_id: current_row_id = row_id rows.append({'row_id': current_row_id, 'errors': {}}) column_id = err.get('column-number') if column_id is not None: del err['column-number'] rows[-1]['errors'][column_id] = err else: rows[-1]['errors']['row'] = err report['table']['errors']['by_rows'] = rows return report def validate(schema_code, source: ValidataSource): """ Validate source and display report """ try: goodtables_report = ValidatorHelper.validate(schema_code, **source.get_goodtables_source()) except tabulator.exceptions.FormatError: flash_error('Erreur : format de fichier non supporté') return redirect(url_for('scdl_validator', val_code=schema_code)) source_data = extract_source_data(source) validata_report = create_validata_report(goodtables_report, ValidatorHelper.schema(schema_code)) # return jsonify(validata_report) # Complete report val_info = ValidatorHelper.schema_info(schema_code) return render_template('validation_report.html', title='Rapport de validation', val_info=ValidatorHelper.schema_info(schema_code), report=validata_report, validation_date=datetime.now().strftime('le %d/%m/%Y à %Hh%M'), source=source, source_type=source.type, source_data=source_data, report_str=json.dumps(validata_report, sort_keys=True, indent=2), breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, {'url': url_for('scdl_validator', val_code=schema_code), 'title': val_info['title']}]) def bytes_data(f): """ Gets bytes data from Werkzeug FileStorage instance """ iob = BytesIO() f.save(iob) iob.seek(0) return iob.getvalue() # Routes @app.route('/') def home(): """ Home page """ validators = ValidatorHelper.schema_info_list() flash_warning('Ce service est fourni en mode beta - certains problèmes peuvent subsister - nous mettons tout en œuvre pour améliorer son fonctionnement en continu') return render_template('home.html', title='Accueil', validators=validators) @app.route('/validators') def validators(): """ No validators page """ return redirect(url_for('home')) @app.route('/validators/', methods=['GET', 'POST']) def scdl_validator(val_code): """ Validator page """ if not ValidatorHelper.schema_exist(val_code): flash_error('Validateur [{}] inconnu'.format(val_code)) return redirect(url_for('home')) if request.method == 'GET': val_info = ValidatorHelper.schema_info(val_code) input_param = request.args.get('input') # First form display if input_param is None or input_param not in ('url', 'example'): return render_template('validation_form.html', title=val_info['title'], val_info=val_info, breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, ]) # Process URL else: url = request.args.get('url') if url is None or url == '': flash_error("Vous n'avez pas indiqué d'url à valider") return redirect(url_for('scdl_validator', val_code=val_code)) return validate(val_code, ValidataSource(url, url, 'url')) else: # POST input_param = request.form.get('input') if input_param is None: flash_error('Aucun fichier à valider') return redirect(url_for('scdl_validator', val_code=val_code)) # File validation if input_param == 'file': f = request.files.get('file') if f is None: flash_warning("Vous n'avez pas indiqué de fichier à valider") return redirect(url_for('scdl_validator', val_code=val_code)) return validate(val_code, ValidataSource(bytes_data(f), f.filename, 'file')) return 'Bizarre, vous avez dit bizarre ?'