Commit 35419c75 authored by Pierre Dittgen's avatar Pierre Dittgen

Add cohesive-columns custom check

parent 358ab69c
Pipeline #2357 failed with stage
in 1 minute and 57 seconds
......@@ -285,6 +285,38 @@ def schema_year_interval():
]
}
@pytest.fixture
def schema_cohesive_columns():
return {
"$schema": "https://frictionlessdata.io/schemas/table-schema.json",
"fields": [
{
"name": "id",
"title": "Identifiant",
"type": "number"
},
{
"name": "col1",
"title": "Colonne 1",
"type": "string"
},
{
"name": "col2",
"title": "Colonne 2",
"type": "string"
}
],
"custom_checks": [
{
"name": "cohesive-columns-value",
"params": {
"column": "col1",
"othercolumns": ["col2"]
}
}
]
}
def validate_csv_text(**options):
return validate(scheme='text', format='csv', with_repair=False, **options)
......@@ -585,6 +617,41 @@ Baz/foobar
assert report["tables"][0]["errors"][0]["code"] == "nomenclature-actes-value"
def test_cohesive_columns_values_1(schema_cohesive_columns):
source = """id,col1,col2
1,,"""
report = validate_csv_text(source=source, schema=schema_cohesive_columns)
assert_no_report_errors(report)
def test_cohesive_columns_values_2(schema_cohesive_columns):
source = """id,col1,col2
1,foo,bar"""
report = validate_csv_text(source=source, schema=schema_cohesive_columns)
assert_no_report_errors(report)
def test_cohesive_columns_values_3(schema_cohesive_columns):
source = """id,col1,col2
1,foo,"""
report = validate_csv_text(source=source, schema=schema_cohesive_columns)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "cohesive-columns-value"
def test_cohesive_columns_values_4(schema_cohesive_columns):
source = """id,col1,col2
1,,bar"""
report = validate_csv_text(source=source, schema=schema_cohesive_columns)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "cohesive-columns-value"
# def test_error_stats(schema_types_and_required):
# source = """A,B
# 2,2020-04-01
......
from .cohesive_columns_value import CohesiveColumnsValue
from .french_siren_value import FrenchSirenValue
from .french_siret_value import FrenchSiretValue
from .nomenclature_actes_value import NomenclatureActesValue
......@@ -6,6 +7,7 @@ from .year_interval_value import YearIntervalValue
# Please keep the below dict up-to-date
available_checks = {
"cohesive-columns-value": CohesiveColumnsValue,
"french-siren-value": FrenchSirenValue,
"french-siret-value": FrenchSiretValue,
"nomenclature-actes-value": NomenclatureActesValue,
......
# -*- coding: utf-8 -*-
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
"""
Cohesive columns value check
......@@ -19,72 +15,66 @@ from __future__ import unicode_literals
Pierre Dittgen, Jailbreak
"""
import re
from simpleeval import simple_eval
from goodtables.registry import check
from goodtables.error import Error
from frictionless import errors, Check
# Module API
class CohesiveColumnsValueError(errors.CellError):
"""Custom error."""
code = "cohesive-columns-value"
name = "cohérence entre colonnes"
tags = ["#body"]
template = "incohérence relevée ({note})."
description = ""
@check('cohesive-columns-value', type='custom', context='body')
class CohesiveColumnsValue(object):
class CohesiveColumnsValue(Check):
"""
Cohesive columns value check class
"""
# Public
def __init__(self, column, **options):
""" Gets and store column names to check """
self.column = column
column_names = [column]
column_names.extend(options['othercolumns'])
self.column_names = column_names
self.column_nb = len(self.column_names)
@staticmethod
def valued(val):
return val != ''
def check_row(self, cells):
cell = None
# Gets column values
values_dict = {}
for item in cells:
if item['header'] in self.column_names:
values_dict[item['header']] = item['value']
if item['header'] == self.column:
cell = item
# Main column
if cell is None:
return
# Missing columns
if len(values_dict) != self.column_nb:
missing_columns = [name for name in self.column_names if not name in values_dict]
return self.err(cell,
"Colonne(s) non trouvée(s) : {}".format(', '.join(missing_columns)),
{'code': 'missing-columns', 'missing-columns': missing_columns})
possible_Errors = [CohesiveColumnsValueError]
def prepare(self):
"""Extract custom params from descriptor."""
self.__column = self.get("column")
self.__other_columns = self.get("othercolumns")
self.__all_columns = [self.__column] + self.__other_columns
self.__columns_nb = len(self.__all_columns)
def validate_task(self):
if self.__column not in self.table.schema.field_names:
note = 'colonne manquante : "%s"' % self.__column
yield errors.TaskError(note=note)
elif len(self.__other_columns) == 0:
note = 'la iste de colonnes à comparer est vide'
yield errors.TaskError(note=note)
else:
for col in self.__other_columns:
if col not in self.table.schema.field_names:
note = 'colonne à comparer "%s" non trouvée' % col
yield errors.TaskError(note=note)
def validate_row(self, row):
cell_value = row[self.__column]
status = valued(cell_value)
other_cell_values = [row[col] for col in self.__other_columns]
# test if all columns are valued or all columns are empty
if not all(CohesiveColumnsValue.valued(v) for _, v in values_dict.items()) \
and not all(not CohesiveColumnsValue.valued(v) for _, v in values_dict.items()):
return self.err(cell,
"Les colonnes {} doivent toutes comporter une valeur ou toutes être vides"
.format(", ".join(self.column_names)),
{'code': 'empty-valued-col-mix', 'columns': self.column_names})
def err(self, cell, msg, msg_substitutions):
""" Create and return formatted error """
error = Error(
'cohesive-columns-value',
cell,
message=msg,
message_substitutions=msg_substitutions
)
return [error]
if any(valued(v) != status for v in other_cell_values):
columns_str = ", ".join(self.__all_columns)
note = f"les colonnes {columns_str} doivent toutes comporter une valeur ou toutes être vides"
yield CohesiveColumnsValueError.from_row(row, note=note, field_name=self.__column)
metadata_profile = { # type: ignore
"type": "object",
"required": ["column", "othercolumns"],
"properties": {"column": {"type": "string"}, "othercolumns": {"type": "array"}},
}
def valued(val):
return val is not None and val != ''
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment