#!/usr/bin/env python3 """ Routes """ import copy import json import subprocess import time from datetime import datetime from io import BytesIO from pathlib import Path from urllib.parse import quote_plus from commonmark import commonmark from flask import make_response, redirect, render_template, request, url_for import tabulator from validata_core import csv_helpers from validata_core.loaders import custom_loaders from validata_ui import app from validata_ui.ui_util import flash_error, flash_warning from validata_ui.validata_util import ValidataSource from validata_ui.validate_helper import ValidatorHelper def extract_source_data(source: ValidataSource, preview_rows_nb=5): """ Computes table preview """ def stringify(val): """ Transform value into string """ return '' if val is None else str(val) header = None rows = [] nb_rows = 0 options = {} if source.format == "csv": options['delimiter'] = csv_helpers.detect_dialect(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders).delimiter with tabulator.Stream(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders, **options) as stream: for row in stream: if header is None: header = ['' if v is None else v for v in row] else: rows.append(list(map(stringify, row))) nb_rows += 1 preview_rows_nb = min(preview_rows_nb, nb_rows) return {'header': header, 'rows_nb': nb_rows, 'data_rows': rows, 'preview_rows_nb': preview_rows_nb, 'preview_rows': rows[:preview_rows_nb]} def improve_errors(errors): """Add context to errors, converts markdown content to HTML""" def improve_err(err): """Adds context info based on row-nb presence and converts content to HTML""" # Context update_keys = { 'context': 'body' if 'row-number' in err and not err['row-number'] is None else 'table', } # markdown to HTML (with default values for 'title' and 'content') # Set default title if no title if not 'title' in err: update_keys['title'] = '[{}]'.format(err['code']) # Convert message to markdown only if no content # => for pre-checks errors if 'message' in err and not 'content' in err: update_keys['message'] = commonmark(err['message']) # Else, default message elif not 'message' in err or err['message'] is None: update_keys['message'] = '[{}]'.format(err['code']) # Message content md_content = '*content soon available*' if not 'content' in err else err['content'] update_keys['content'] = commonmark(md_content) return {**err, **update_keys} return list(map(improve_err, errors)) def create_validata_report(goodtables_report, schema): """ Creates an error report easier to handle and display in templates: - only one table - errors are contextualized - error-counts is ok - errors are grouped by lines - errors are separated into "structure" and "body" - error messages are improved """ report = copy.deepcopy(goodtables_report) # One table is enough del report['table-count'] report['table'] = report['tables'][0] del report['tables'] del report['table']['error-count'] del report['table']['time'] del report['table']['valid'] del report['valid'] # use _ instead of - to ease information picking in jinja2 template report['table']['row_count'] = report['table']['row-count'] # Handy col_count info headers = report['table'].get('headers', []) report['table']['col_count'] = len(headers) # Computes column info schema_fields = schema.get('fields', []) fields_dict = {f['name']: (f.get('title', 'titre non défini'), f.get('description', '')) for f in schema_fields} report['table']['headers_title'] = [fields_dict[h][0] if h in fields_dict else 'colonne inconnue' for h in headers] report['table']['headers_description'] = [fields_dict[h][1] if h in fields_dict else 'Cette colonne n\'est pas définie dans le schema' for h in headers] # Provide better (french) messages errors = improve_errors(report['table']['errors']) del report['table']['errors'] # Count errors report['error_count'] = len(errors) del report['error-count'] # Then group them in 2 groups : structure and body report['table']['errors'] = {'structure': [], 'body': []} for err in errors: if err['context'] != 'body': report['table']['errors']['structure'].append(err) else: report['table']['errors']['body'].append(err) # Checks if there are structure errors different to invalid-column-delimiter report['table']['display_body_errors'] = all(err['code'] == 'invalid-column-delimiter' for err in report['table']['errors']['structure']) # Group body errors by row id rows = [] current_row_id = 0 for err in report['table']['errors']['body']: if not 'row-number' in err: print('ERR', err) row_id = err['row-number'] del err['row-number'] del err['context'] if row_id != current_row_id: current_row_id = row_id rows.append({'row_id': current_row_id, 'errors': {}}) column_id = err.get('column-number') if column_id is not None: del err['column-number'] rows[-1]['errors'][column_id] = err else: rows[-1]['errors']['row'] = err report['table']['errors']['body_by_rows'] = rows return report def validate(schema_code, source: ValidataSource): """ Validate source and display report """ try: goodtables_report = ValidatorHelper.validate( schema_code=schema_code, force_strings=True, **source.get_tabulator_params(), ) except tabulator.exceptions.FormatError: flash_error('Erreur : format de fichier non supporté') return redirect(url_for('scdl_validator', val_code=schema_code)) source_data = extract_source_data(source) # handle report date date_str = goodtables_report['date'] report_date = datetime.strptime(date_str[:date_str.find('.')], '%Y-%m-%dT%H:%M:%S') # Enhance goodtables_report validata_report = create_validata_report(goodtables_report, ValidatorHelper.schema(schema_code).descriptor) # Display report to the user val_info = ValidatorHelper.schema_info(schema_code) return render_template('validation_report.html', title='Rapport de validation', val_info=ValidatorHelper.schema_info(schema_code), report=validata_report, validation_date=report_date.strftime('le %d/%m/%Y à %Hh%M'), source=source, source_type=source.type, source_data=source_data, print_mode=request.args.get('print', 'false') == 'true', report_str=json.dumps(validata_report, sort_keys=True, indent=2), breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, {'url': url_for('scdl_validator', val_code=schema_code), 'title': val_info['title']}]) def bytes_data(f): """ Gets bytes data from Werkzeug FileStorage instance """ iob = BytesIO() f.save(iob) iob.seek(0) return iob.getvalue() # Routes @app.route('/') def home(): """ Home page """ validators = ValidatorHelper.schema_info_list() flash_warning('Ce service est fourni en mode beta - certains problèmes peuvent subsister - nous mettons tout en œuvre pour améliorer son fonctionnement en continu') return render_template('home.html', title='Accueil', validators=validators) @app.route('/validators') def validators(): """ No validators page """ return redirect(url_for('home')) @app.route('/pdf/') def pdf_report(val_code): """PDF report generation""" err_prefix = 'Erreur de génération du rapport PDF: ' if not ValidatorHelper.schema_exist(val_code): flash_error(err_prefix + 'schéma inconnu') return redirect(url_for('home')) url_param = request.args.get('url') if not url_param: flash_error(err_prefix+'url non fournie') return redirect(url_for('home')) validation_url = '{}?input=url&print=true&url={}'.format(url_for('scdl_validator', val_code=val_code, _external=True), quote_plus(url_param)) # temporary pdf_report.pdf filename tmp_pdf_report = Path('/tmp') / 'validata_{}_{}_pdf_report.pdf'.format(val_code, str(time.time())) cmd = ['chromium', '--headless', '--disable-gpu', '--print-to-pdf={}'.format(str(tmp_pdf_report)), validation_url] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: flash_error(err_prefix + result.stdout) if tmp_pdf_report.exists(): tmp_pdf_report.unlink() return redirect(url_for('home')) response = make_response(tmp_pdf_report.open('rb').read()) response.headers.set('Content-disposition', 'attachment', filename='report.pdf') response.headers.set('Content-type', 'application/pdf') response.headers.set('Content-length', tmp_pdf_report.stat().st_size) tmp_pdf_report.unlink() return response @app.route('/validators/', methods=['GET', 'POST']) def scdl_validator(val_code): """ Validator page """ if not ValidatorHelper.schema_exist(val_code): flash_error('Validateur [{}] inconnu'.format(val_code)) return redirect(url_for('home')) if request.method == 'GET': val_info = ValidatorHelper.schema_info(val_code) input_param = request.args.get('input') # First form display if input_param is None or input_param not in ('url', 'example'): return render_template('validation_form.html', title=val_info['title'], val_info=val_info, breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, ]) # Process URL else: url_param = request.args.get('url') if url_param is None or url_param == '': flash_error("Vous n'avez pas indiqué d'url à valider") return redirect(url_for('scdl_validator', val_code=val_code)) return validate(val_code, ValidataSource('url', url_param, url_param)) else: # POST input_param = request.form.get('input') if input_param is None: flash_error('Aucun fichier à valider') return redirect(url_for('scdl_validator', val_code=val_code)) # File validation if input_param == 'file': f = request.files.get('file') if f is None: flash_warning("Vous n'avez pas indiqué de fichier à valider") return redirect(url_for('scdl_validator', val_code=val_code)) return validate(val_code, ValidataSource('file', f.filename, bytes_data(f))) return 'Bizarre, vous avez dit bizarre ?'