#!/usr/bin/env python3
import re
from datetime import datetime
import ujson as json
FRENCH_DATE_RE = re.compile(r'^[0-3]\d/[0-1]\d/[12]\d{3}$')
DATETIME_RE = re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$')
def u_err(err, title, content):
""" Update error """
err['title'] = title
err['content'] = content
return err
def et_join(values):
"""french enum
>>> et_join([])
''
>>> et_join(['a'])
'a'
>>> et_join(['a','b'])
'a et b'
>>> et_join(['a','b','c'])
'a, b et c'
>>> et_join(['a','b','c','d','e'])
'a, b, c, d et e'
"""
if values is None or len(values) == 0:
return ''
if len(values) == 1:
return values[0]
return ' et '.join([', '.join(values[:-1]), values[-1]])
# Core goodtables checks
# -> error message title is stored in 'title' attribute
# -> error message content is stored in 'content' attribute
# This is adapted to pophover display
def blank_row(err, headers, schema):
""" blank-row error """
return u_err(err, 'Ligne vide', 'Les lignes vides doivent être retirées de la table')
def duplicate_row(err, headers, schema):
""" duplicate-row error """
msg_prefix = 'La ligne est identique '
row_numbers = err['message-data']['row_numbers']
if not ',' in row_numbers:
msg = msg_prefix + "à la ligne {}.".format(row_numbers)
else:
msg = msg_prefix + "aux lignes {}.".format(et_join(row_numbers))
msg += "
Vous pouvez la supprimer."
return u_err(err, 'Ligne dupliquée', msg)
def enumerable_constraint(err, headers, schema):
""" enumerable-constraint """
constraint_values = eval(err['message-data']['constraint'])
ok_values = ['"{}"'.format(val) for val in constraint_values]
if len(ok_values) == 1:
return u_err(err, 'Valeur incorrecte', 'L\'unique valeur autorisée pour cette colonne est: {}'.format(ok_values[0]))
else:
html_str = '
' + '\n'.join(['- {}
'.format(val.strip('"')) for val in ok_values]) + '
'
return u_err(err, 'Valeur incorrecte', 'Les seules valeurs autorisées pour cette colonne sont : {}'.format(html_str))
def maximum_constraint(err, headers, schema):
""" maximum-constraint """
max_value = err['message-data']['constraint']
return u_err(err, 'Valeur trop grande', 'La valeur attendue doit être inférieure à {}'.format(max_value))
def maximum_length_constraint(err, headers, schema):
""" maximum-length-constraint """
max_value = err['message-data']['constraint']
text_value_len = len(err['message-data']['value'])
return u_err(err, 'Valeur trop longue', 'La valeur texte attendue ne doit pas comporter plus de {} caractère(s) (au lieu de {} actuellement)'.format(max_value, text_value_len))
def minimum_constraint(err, headers, schema):
""" minimum-constraint """
min_value = err['message-data']['constraint']
return u_err(err, 'Valeur trop petite', 'La valeur attendue doit être au moins égale à {}'.format(min_value))
def minimum_length_constraint(err, headers, schema):
""" minimum-length-constraint """
min_value = err['message-data']['constraint']
text_value_len = len(err['message-data']['value'])
return u_err(err, 'Valeur trop courte', 'Le texte attendu doit comporter au moins {} caractère(s) (au lieu de {} actuellement)'.format(min_value, text_value_len))
def pattern_constraint(err, headers, schema):
""" pattern-constraint """
column_number = err['column-number']
field = schema['fields'][column_number - 1]
col_name = field['name']
addon_info_list = []
if 'description' in field:
addon_info_list.append('Pour rappel, la description de cette colonne est « {} »'.format(field['description']))
if 'example' in field:
addon_info_list.append('Exemple(s) de valeur correcte : {}'.format(field['example']))
addon_info = '
' + '
'.join(addon_info_list) if addon_info_list else ''
return u_err(err, 'Erreur de format', 'La valeur ne respecte pas le format attendu pour la colonne {}.{}'.format(col_name, addon_info))
def required_constraint(err, headers, schema):
""" required-constraint error """
col_name = ''
col_nb = err['column-number']
if col_nb <= len(headers):
col_name = ' "{}"'.format(headers[col_nb - 1])
return u_err(err, 'Cellule vide', 'Le contenu de la colonne{} est obligatoire\n'.format(col_name)
+ 'Merci d\'indiquer une valeur.')
def type_or_format_error(err, headers, schema):
""" type-or-format-value """
err_type = err['message-data']['field_type']
err_value = err['message-data']['value']
# Date
if err_type == 'date':
# Checks if date is dd/mm/yyyy
dm = FRENCH_DATE_RE.match(err_value)
if dm:
iso_date = datetime.strptime(err_value, '%d/%m/%Y').strftime('%Y-%m-%d')
return u_err(err, 'Format de date incorrect', "La forme attendue est \"{}\"".format(iso_date))
# Checks if date is yyyy-mm-ddThh:MM:ss
# print('DATE TIME ? [{}]'.format(err_value))
dm = DATETIME_RE.match(err_value)
if dm:
iso_date = err_value[:err_value.find('T')]
return u_err(err, 'Format de date incorrect', "La forme attendue est \"{}\"".format(iso_date))
# Default date err msg
return u_err(err, 'Format de date incorrect', 'La date doit être écrite sous la forme aaaa-mm-jj.')
# Number
elif err_type == 'number':
if ',' in err_value:
en_number = err_value.replace(',', '.')
return u_err(err, 'Format de nombre incorrect', "Merci d'utiliser le point comme séparateur décimal (« {} »).".format(en_number))
return u_err(err, 'Format de nombre incorrect', 'Vérifiez que la cellule ne comporte que des chiffres et le point comme séparateur décimal.')
# Boolean
elif err_type == 'boolean':
column_number = err['column-number']
field = schema['fields'][column_number]
true_values = field.get('trueValues', ['true'])
false_values = field.get('falseValues', ['false'])
return u_err(err, "Valeur booléenne incorrecte", "Les valeurs acceptées sont {} pour 'vrai' et {} pour 'faux'".format(et_join(false_values), et_join(true_values)))
# Default msg
return u_err(err, 'Type ou format incorrect', 'La valeur de la cellule n\'est pas de type {}'.format(err_type))
def unique_constraint(err, headers, schema):
""" unique-constraint """
msg_prefix = 'Cette valeur est déjà présente '
row_numbers = err['message-data']['row_numbers']
if not ',' in row_numbers:
msg = msg_prefix + "à la ligne {}.".format(row_numbers)
else:
msg = msg_prefix + "aux lignes {}.".format(et_join(row_numbers))
msg += " Or une contrainte d'unicité est définie pour cette colonne."
msg += "Veuillez corriger les doublons de retirez la contrainte d'unicité du schéma."
return u_err(err, 'Valeur déjà utilisée', msg)
# Validata custom checks
def french_siret_value(err, headers, schema):
""" french-siret-value error """
return u_err(err, 'Numéro SIRET non valide',
'Le numéro de SIRET indiqué n\'est pas valide selon la définition de l\'INSEE')
# Validata pre-checks
#
# -> Error message is stored in 'message' key
def invalid_column_delimiter(err, headers, schema):
""" invalid-column-delimiter """
md = err['message-data']
msg_tpl = 'Le fichier CSV utilise le délimiteur de colonne « {} » au lieu du délimiteur attendu « {} ».'
err['message'] = msg_tpl.format(md.get('detected'), md.get(
'expected')) + "
Pour vous permettre de continuer la validation, un remplacement automatique a été réalisé."
return err
def missing_headers(err, headers, schema):
""" missing-headers """
cols = err['message-data']['headers']
if len(cols) == 1:
err['message'] = "La colonne \"{}\" n'a pas été trouvée dans le fichier".format(cols[0])
else:
cols_ul = '' + '\n'.join(['- {}
'.format(col) for col in cols]) + '
'
fields_nb = len(schema.get('fields', []))
addon_info = 'Utilisez-vous le bon schéma ?' if len(cols) == fields_nb else ''
err['message'] = "Les colonnes suivantes n'ont pas été trouvées dans le fichier : {}{}".format(
cols_ul, addon_info)
return err
def extra_headers(err, headers, schema):
""" extra-headers """
cols = err['message-data']['headers']
if len(cols) == 1:
err['message'] = "La colonne \"{}\" est inconnue dans le schéma".format(cols[0])
else:
cols_ul = '' + '\n'.join(['- {}
'.format(col) for col in cols]) + '
'
addon_info = 'Utilisez-vous le bon schéma ?' if len(cols) == len(headers) else ''
err['message'] = "Les colonnes suivantes sont inconnues dans le schéma : {}{}".format(cols_ul, addon_info)
return err
def wrong_headers_order(err, headers, schema):
""" wrong-headers-order """
fields = [f['name'] for f in schema.get('fields', [])]
assert len(headers) == len(fields), 'Wrong column order between two lists of different lengths'
msgs = []
for i, (header, field) in enumerate(zip(headers, fields)):
if header == field:
continue
msgs.append('la colonne {} devrait être « {} » (au lieu de « {} »)'.format(i+1, field, header))
errors_str = '\n' + '\n'.join(['- {}
\n'.format(msg) for msg in msgs]) + '\n
'
err['message'] = "Les colonnes du tableau ne sont pas dans l'ordre attendu :\n{}".format(errors_str)
return err