""" Routes """ import copy import io import itertools import json import logging import subprocess import tempfile from datetime import datetime from operator import itemgetter from pathlib import Path from urllib.parse import quote_plus import requests from backports.datetime_fromisoformat import MonkeyPatch from commonmark import commonmark from flask import make_response, redirect, render_template, request, url_for from validata_core import compute_badge, csv_helpers, messages from validata_core.loaders import custom_loaders import tabulator from . import app, config, ui_config, schema_info_map from .ui_util import flash_error, flash_warning from .validata_util import ValidataSource MonkeyPatch.patch_fromisoformat() log = logging.getLogger(__name__) def extract_source_data(source: ValidataSource, preview_rows_nb=5): """ Computes table preview """ def stringify(val): """ Transform value into string """ return '' if val is None else str(val) header = None rows = [] nb_rows = 0 options = {} if source.format == "csv": options['delimiter'] = csv_helpers.detect_dialect(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders).delimiter with tabulator.Stream(source.source, format=source.format, scheme=source.scheme, custom_loaders=custom_loaders, **options) as stream: for row in stream: if header is None: header = ['' if v is None else v for v in row] else: rows.append(list(map(stringify, row))) nb_rows += 1 preview_rows_nb = min(preview_rows_nb, nb_rows) return {'header': header, 'rows_nb': nb_rows, 'data_rows': rows, 'preview_rows_nb': preview_rows_nb, 'preview_rows': rows[:preview_rows_nb]} def improve_errors(errors): """Add context to errors, converts markdown content to HTML""" def improve_err(err): """Adds context info based on row-nb presence and converts content to HTML""" # Context update_keys = { 'context': 'body' if 'row-number' in err and not err['row-number'] is None else 'table', } # markdown to HTML (with default values for 'title' and 'content') # Set default title if no title if not 'title' in err: update_keys['title'] = '[{}]'.format(err['code']) # Convert message to markdown only if no content # => for pre-checks errors if 'message' in err and not 'content' in err: update_keys['message'] = commonmark(err['message']) # Else, default message elif not 'message' in err or err['message'] is None: update_keys['message'] = '[{}]'.format(err['code']) # Message content md_content = '*content soon available*' if not 'content' in err else err['content'] update_keys['content'] = commonmark(md_content) return {**err, **update_keys} return list(map(improve_err, errors)) def create_validata_ui_report(validata_core_report, schema_dict): """ Creates an error report easier to handle and display in templates: - only one table - errors are contextualized - error-counts is ok - errors are grouped by lines - errors are separated into "structure" and "body" - error messages are improved """ report = copy.deepcopy(validata_core_report) # One table is enough del report['table-count'] report['table'] = report['tables'][0] del report['tables'] del report['table']['error-count'] del report['table']['time'] del report['table']['valid'] del report['valid'] # use _ instead of - to ease information picking in jinja2 template report['table']['row_count'] = report['table']['row-count'] # Handy col_count info headers = report['table'].get('headers', []) report['table']['col_count'] = len(headers) # Computes column info fields_dict = {f['name']: (f.get('title', 'titre non défini'), f.get('description', '')) for f in schema_dict.get('fields', [])} report['table']['headers_title'] = [fields_dict[h][0] if h in fields_dict else 'colonne inconnue' for h in headers] report['table']['headers_description'] = [fields_dict[h][1] if h in fields_dict else 'Cette colonne n\'est pas définie dans le schema' for h in headers] # Provide better (french) messages errors = improve_errors(report['table']['errors']) del report['table']['errors'] # Count errors report['error_count'] = len(errors) del report['error-count'] # Then group them in 2 groups : structure and body report['table']['errors'] = {'structure': [], 'body': []} for err in errors: if err['tag'] == 'structure': report['table']['errors']['structure'].append(err) else: report['table']['errors']['body'].append(err) # Checks if there are structure errors different to invalid-column-delimiter structure_errors = report['table']['errors']['structure'] report['table']['do_display_body_errors'] = len(structure_errors) == 0 or \ all(err['code'] == 'invalid-column-delimiter' for err in structure_errors) # Checks if a column comparison is needed header_errors = ('missing-headers', 'extra-headers', 'wrong-headers-order') structure_errors = [{**err, 'in_column_comp': err['code'] in header_errors} for err in structure_errors] report['table']['errors']['structure'] = structure_errors column_comparison_needed = any(err['in_column_comp'] == True for err in structure_errors) column_comparison_table = [] if column_comparison_needed: column_comparison_table = [] field_names = [f['name'] for f in schema_dict.get('fields', [])] has_case_errors = False for t in itertools.zip_longest(headers, field_names, fillvalue=''): status = 'ok' if t[0] == t[1] else 'ko' if not has_case_errors and status == 'ko' and t[0].lower() == t[1].lower(): has_case_errors = True column_comparison_table.append((*t, status)) info = {} info['table'] = column_comparison_table info['has_missing'] = len(headers) < len(field_names) info['has_case_errors'] = has_case_errors report['table']['column_comparison_info'] = info report['table']['column_comparison_needed'] = column_comparison_needed # Group body errors by row id rows = [] current_row_id = 0 for err in report['table']['errors']['body']: if not 'row-number' in err: print('ERR', err) row_id = err['row-number'] del err['row-number'] del err['context'] if row_id != current_row_id: current_row_id = row_id rows.append({'row_id': current_row_id, 'errors': {}}) column_id = err.get('column-number') if column_id is not None: del err['column-number'] rows[-1]['errors'][column_id] = err else: rows[-1]['errors']['row'] = err report['table']['errors']['body_by_rows'] = rows # Sort by error names in statistics stats = report['table']['error-stats'] code_title_map = messages.ERROR_MESSAGE_DEFAULT_TITLE for key in ('structure-errors', 'value-errors'): # convert dict into tuples with french title instead of error code # and sorts by title stats[key]['count-by-code'] = sorted([(code_title_map.get(k, k), v) for k, v in stats[key]['count-by-code'].items()], key=itemgetter(0)) return report def compute_badge_message_and_color(badge): """Computes message and color from badge information""" structure = badge['structure'] body = badge.get('body') # Bad structure, stop here if structure == 'KO': return ( 'structure invalide', 'red') # No body error if body == 'OK': return ('structure invalide', 'orange') if structure == 'WARN' else ('valide', 'green') # else compute quality ratio percent p = (1 - badge['error-ratio']) * 100.0 msg = 'cellules valides : {:.1f}%'.format(p) return (msg, 'red') if body == 'KO' else (msg, 'orange') def get_badge_url_and_message(badge): """Gets badge url from badge information""" msg, color = compute_badge_message_and_color(badge) return ('{}static/v1.svg?label=Validata&message={}&color={}'.format( config.SHIELDS_IO_BASE_URL, quote_plus(msg), color), msg) def validate(schema_url, source: ValidataSource): """ Validate source and display report """ if config.API_VALIDATE_ENDPOINT is None: flash_error("No Validate endpoint defined :-(") return redirect(url_for("custom_validator")) api_url = config.API_VALIDATE_ENDPOINT headers = {"Accept": "application/json"} try: if source.is_url(): params = { "schema": schema_url, "url": source.get_url(), } req = requests.get(api_url, params=params, headers=headers) else: files = {'file': (source.name, io.BytesIO(source.source))} data = {'schema': schema_url} req = requests.post(api_url, data=data, files=files, headers=headers) # 400 if req.status_code == 400: json_response = req.json() flash_error("Une erreur est survenue durant la validation: {}" .format(json_response.get('message'))) return redirect(url_for("home")) if not req.ok: flash_error("Un erreur s'est produite côté serveur :-(") return redirect(url_for("home")) json_response = req.json() validata_core_report = json_response['report'] schema_dict = json_response['schema'] except requests.ConnectionError as err: logging.exception(err) flash_error(str(err)) return redirect(url_for('home')) # Computes badge from report and badge configuration badge = compute_badge(validata_core_report, config.BADGE_CONFIG) badge_url, badge_msg = get_badge_url_and_message(badge) source_errors = [err for err in validata_core_report['tables'][0]['errors'] if err['code'] == 'source-error'] if source_errors: err = source_errors[0] msg = "l'encodage du fichier est invalide. Veuillez le corriger" if 'charmap' in err[ 'message'] else err['message'] flash_error('Erreur de source : {}'.format(msg)) return redirect(url_for('custom_validator')) source_data = extract_source_data(source) # handle report date report_datetime = datetime.fromisoformat(validata_core_report['date']).astimezone() # Enhance validata_core_report validata_report = create_validata_ui_report(validata_core_report, schema_dict) # Display report to the user validator_form_url = url_for("custom_validator")+'?schema='+quote_plus(schema_url) val_info, validator_title = compute_validator_info(schema_url) return render_template('validation_report.html', title='Rapport de validation', val_info=val_info, report=validata_report, schema_url=schema_url, validation_date=report_datetime.strftime('le %d/%m/%Y à %Hh%M'), source=source, source_type=source.type, source_data=source_data, print_mode=request.args.get('print', 'false') == 'true', badge_url=badge_url, badge_msg=badge_msg, report_str=json.dumps(validata_report, sort_keys=True, indent=2), breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, {'url': validator_form_url, 'title': validator_title}, ]) def bytes_data(f): """ Gets bytes data from Werkzeug FileStorage instance """ iob = io.BytesIO() f.save(iob) iob.seek(0) return iob.getvalue() # Routes @app.route('/') def home(): """ Home page """ flash_warning('Ce service est fourni en mode beta - certains problèmes peuvent subsister - nous mettons tout en œuvre pour améliorer son fonctionnement en continu.') return render_template('home.html', title='Accueil', config=ui_config) @app.route('/validators') def validators(): """ No validators page """ return redirect(url_for('home')) @app.route('/pdf') def pdf_report(): """PDF report generation""" err_prefix = 'Erreur de génération du rapport PDF' url_param = request.args.get('url') if not url_param: flash_error(err_prefix + ': URL non fournie') return redirect(url_for('home')) schema_param = request.args.get('schema') if not schema_param: flash_error(err_prefix + ': URL de schema non fournie') return redirect(url_for('home')) validation_url = '{}?input=url&print=true&url={}&schema={}'.format(url_for('custom_validator', _external=True), quote_plus(url_param), quote_plus(schema_param)) with tempfile.NamedTemporaryFile(prefix='validata_{}_report_'.format(datetime.now().timestamp()), suffix='.pdf') as tmpfile: tmp_pdf_report = Path(tmpfile.name) cmd = ['chromium', '--headless', '--disable-gpu', '--print-to-pdf={}'.format(str(tmp_pdf_report)), validation_url] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if result.returncode != 0: flash_error(err_prefix) log.error("Command %r returned an error: %r", cmd, result.stdout.decode('utf-8')) if tmp_pdf_report.exists(): tmp_pdf_report.unlink() return redirect(url_for('home')) pdf_filename = 'Rapport de validation {}.pdf'.format(datetime.now().strftime('%d-%m-%Y %Hh%M')) response = make_response(tmp_pdf_report.read_bytes()) response.headers.set('Content-disposition', 'attachment', filename=pdf_filename) response.headers.set('Content-type', 'application/pdf') response.headers.set('Content-length', tmp_pdf_report.stat().st_size) tmp_pdf_report.unlink() return response def compute_validator_info(schema_url): """Factor code for validator form page""" val_info = None title = "Schéma personnalisé" if schema_url in schema_info_map: val_info = schema_info_map.get(schema_url) title = "Schéma « {} »".format(val_info['title']) return val_info, title @app.route('/validators/form', methods=['GET', 'POST']) def custom_validator(): """ Validator page """ if config.API_VALIDATE_ENDPOINT is None: flash_error("URL de connexion à l'API non indiquée :-(") return redirect(url_for('home')) def validation_form_url(schema_url): """Computes validation form url with schema URL parameter""" return "{}?schema={}".format(url_for('custom_validator'), quote_plus(schema_url)) if request.method == 'GET': input_param = request.args.get('input') url_param = request.args.get("url") schema_param = request.args.get("schema") if schema_param is None or schema_param == '': flash_error("Vous n'avez pas indiqué d'url de schéma") return redirect(url_for('home')) # First form display if input_param is None: val_info, title = compute_validator_info(schema_param) return render_template('validation_form.html', title=title, val_info=val_info, schema_url=schema_param, breadcrumbs=[{'url': url_for('home'), 'title': 'Accueil'}, ]) # Process URL else: if url_param is None or url_param == '': flash_error("Vous n'avez pas indiqué d'url à valider") return redirect(validation_form_url(schema_param)) try: return validate(schema_param, ValidataSource('url', url_param, url_param)) except tabulator.exceptions.FormatError as e: flash_error('Erreur : Format de ressource non supporté') log.info(e) return redirect(validation_form_url(schema_param)) except tabulator.exceptions.HTTPError as e: flash_error('Erreur : impossible d\'accéder au fichier source en ligne') log.info(e) return redirect(validation_form_url(schema_param)) else: # POST schema_param = request.form.get('schema') if schema_param is None: flash_error('Aucun schéma défini') return redirect(url_for('home')) input_param = request.form.get('input') if input_param is None: flash_error("Vous n'avez pas indiqué de fichier à valider") return redirect(validation_form_url(schema_param)) # File validation if input_param == 'file': f = request.files.get('file') if f is None: flash_warning("Vous n'avez pas indiqué de fichier à valider") return redirect(validation_form_url(schema_param)) b_content = bytes_data(f) return validate(schema_param, ValidataSource('file', f.filename, b_content)) return 'Bizarre, vous avez dit bizarre ?'