Commit 76c32802 authored by Pierre Dittgen's avatar Pierre Dittgen

one more custom check

parent 35419c75
Pipeline #2358 failed with stage
in 1 minute and 59 seconds
......@@ -318,6 +318,35 @@ def schema_cohesive_columns():
}
@pytest.fixture
def schema_compare_columns():
return {
"$schema": "https://frictionlessdata.io/schemas/table-schema.json",
"fields": [
{
"name": "depenses",
"title": "Dépenses",
"type": "number"
},
{
"name": "recettes",
"title": "Recettes",
"type": "number"
},
],
"custom_checks": [
{
"name": "compare-columns-value",
"params": {
"column": "depenses",
"op": "<=",
"column2": "recettes"
}
}
]
}
def validate_csv_text(**options):
return validate(scheme='text', format='csv', with_repair=False, **options)
......@@ -652,6 +681,28 @@ def test_cohesive_columns_values_4(schema_cohesive_columns):
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "cohesive-columns-value"
def test_compare_columns_value_1(schema_compare_columns):
source = """depenses,recettes
12000,15000"""
report = validate_csv_text(source=source, schema=schema_compare_columns)
assert_no_report_errors(report)
def test_compare_columns_value_2(schema_compare_columns):
source = """depenses,recettes
12000,12000"""
report = validate_csv_text(source=source, schema=schema_compare_columns)
assert_no_report_errors(report)
def test_compare_columns_value_3(schema_compare_columns):
source = """depenses,recettes
12000,6000"""
report = validate_csv_text(source=source, schema=schema_compare_columns)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "compare-columns-value"
# def test_error_stats(schema_types_and_required):
# source = """A,B
# 2,2020-04-01
......
from .cohesive_columns_value import CohesiveColumnsValue
from .compare_columns_value import CompareColumnsValue
from .french_siren_value import FrenchSirenValue
from .french_siret_value import FrenchSiretValue
from .nomenclature_actes_value import NomenclatureActesValue
......@@ -8,6 +9,7 @@ from .year_interval_value import YearIntervalValue
# Please keep the below dict up-to-date
available_checks = {
"cohesive-columns-value": CohesiveColumnsValue,
"compare-columns-value": CompareColumnsValue,
"french-siren-value": FrenchSirenValue,
"french-siret-value": FrenchSiretValue,
"nomenclature-actes-value": NomenclatureActesValue,
......
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
from goodtables.error import Error
from goodtables.registry import check
"""
Compare columns value check
......@@ -31,9 +25,11 @@ from goodtables.registry import check
Pierre Dittgen, Jailbreak
"""
import decimal
from simpleeval import simple_eval
from frictionless import errors, Check
# Module API
OP_LABELS = {
'>': 'supérieure',
......@@ -43,107 +39,102 @@ OP_LABELS = {
'<': 'inférieure',
}
class CompareColumnsValueError(errors.CellError):
"""Custom error."""
code = "compare-columns-value"
name = "comparaison de colonnes"
tags = ["#body"]
template = "{note}."
description = ""
@check('compare-columns-value', type='custom', context='body')
class CompareColumnsValue(object):
"""Compare columns value check class"""
# Public
def __init__(self, column, **options):
"""Gets and store column names to check"""
class CompareColumnsValue(Check):
"""Compare columns value check class."""
self.column = column
self.column2 = options['column2']
self.op = options['op']
possible_Errors = [CompareColumnsValueError]
@staticmethod
def valued(val):
return val != ''
def check_row(self, cells):
cell = None
value1 = None
value2 = None
def prepare(self):
"""Extract custom params from descriptor."""
self.__column = self.get("column")
self.__column2 = self.get("column2")
self.__op = self.get("op")
# Gets column values
for item in cells:
if item['header'] == self.column:
cell = item
value1 = item['value']
elif item['header'] == self.column2:
value2 = item['value']
# 1 column doesn't exist
if value1 is None or value2 is None:
return
def validate_task(self):
if self.__column not in self.table.schema.field_names:
note = 'colonne manquante : "%s"' % self.__column
yield errors.TaskError(note=note)
elif self.__column2 not in self.table.schema.field_names:
note = 'colonne manquante : "%s"' % self.__column2
yield errors.TaskError(note=note)
elif self.__op not in OP_LABELS:
note = 'opérateur non géré : "%s"' % self.__op
yield errors.TaskError(note=note)
# one of the columns is not valued
if not CompareColumnsValue.valued(value1) or not CompareColumnsValue.valued(value2):
return
# Op validity
if not self.op in OP_LABELS:
return self.err(cell,
'Opérateur `{}` invalide'.format(self.op), {'operator': self.op})
def validate_row(self, row):
cell_value1 = row[self.__column]
cell_value2 = row[self.__column2]
op = self.__op
# Compare
comparison_str = CompareColumnsValue.compute_comparison_str(value1, self.op, value2)
comparison_str = compute_comparison_str(cell_value1, op, cell_value2)
if comparison_str is None:
return self.err(cell,
"La valeur de la colonne {} `{}` n'est pas comparable avec la valeur de la colonne {} `{}`"
.format(self.column, value1, self.column2, value2),
{'column1': self.column, 'value1': value1, 'column2': self.column2, 'value2': value2})
note = f"la valeur de la colonne {self.__column} `{cell_value1}` n'est pas comparable avec"
note += f" la valeur de la colonne {self.__column2} `{cell_value2}`"
yield CompareColumnsValueError.from_row(row, note=note, field_name=self.__column)
return
compare_result = simple_eval(comparison_str)
compare_result = eval(comparison_str)
if not compare_result:
return self.err(cell,
"La valeur de la colonne {} `{}` devrait être {} à la valeur de la colonne {} `{}`"
.format(self.column, value1, OP_LABELS[self.op], self.column2, value2),
{'column1': self.column, 'value1': value1, 'column2': self.column2, 'value2': value2, 'op': OP_LABELS[self.op]})
@staticmethod
def is_a_number(value):
"""Return True if value is an int, a float or a string representation of a number."""
if type(value) in (int, float):
return True
if not isinstance(value, str):
return False
if value.isnumeric():
return True
try:
float(value)
return True
except ValueError:
return False
@staticmethod
def compute_comparison_str(value1, op, value2):
""" Computes comparison_str """
# number vs number
if CompareColumnsValue.is_a_number(value1) and CompareColumnsValue.is_a_number(value2):
return '{} {} {}'.format(value1, op, value2)
# string vs string
if isinstance(value1, str) and isinstance(value2, str):
n_value1 = value1.replace('"', '\\"')
n_value2 = value2.replace('"', '\\"')
return '"{}" {} "{}"'.format(n_value1, op, n_value2)
# thing vs thing, compare string repr
if type(value1) == type(value2):
return f"'{value1}' {op} '{value2}'"
# potato vs cabbage?
return None
def err(self, cell, msg, msg_substitutions):
""" Create and return formatted error """
error = Error(
'compare-columns-value',
cell,
message=msg,
message_substitutions=msg_substitutions
)
return [error]
op_str = OP_LABELS[self.__op]
note = f"la valeur de la colonne {self.__column} `{cell_value1}` devrait être {op_str}"
note += f" à la valeur de la colonne {self.__column2} `{cell_value2}`"
yield CompareColumnsValueError.from_row(row, note=note, field_name=self.__column)
metadata_profile = { # type: ignore
"type": "object",
"required": ["column", "column2", "op"],
"properties": {"column": {}, "column2": {}, "op": {"type": "string"}},
}
def is_a_number(value):
"""Return True if value is an int, a float or a string representation of a number."""
if type(value) in (int, float) or isinstance(value, decimal.Decimal):
return True
if not isinstance(value, str):
return False
if value.isnumeric():
return True
try:
float(value)
return True
except ValueError:
return False
def valued(val):
return not (val is None or val == '')
def compute_comparison_str(value1, op, value2):
""" Computes comparison_str """
# number vs number
if is_a_number(value1) and is_a_number(value2):
return f"{str(value1)} {op} {str(value2)}"
# string vs string
if isinstance(value1, str) and isinstance(value2, str):
n_value1 = value1.replace('"', '\\"')
n_value2 = value2.replace('"', '\\"')
return f'"{n_value1}" {op} "{n_value2}"'
# thing vs thing, compare string repr
if type(value1) == type(value2):
return f"'{value1}' {op} '{value2}'"
# potato vs cabbage?
return None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment