Commit cf3ecf3d authored by Pierre Dittgen's avatar Pierre Dittgen

minimal based on frictionless framework + custom checks

parent f8f37bcb
Pipeline #2365 failed with stage
in 1 minute and 18 seconds
......@@ -9,19 +9,15 @@ certifi==2020.11.8 # via requests
chardet==3.0.4 # via frictionless, requests
click==7.1.2 # via typer
colorama==0.4.4 # via typer
et-xmlfile==1.0.1 # via openpyxl
decorator==4.4.2 # via validators
ezodf==0.3.2 # via validata_core (setup.py)
frictionless==3.33.3 # via validata_core (setup.py)
frictionless==3.34.3 # via validata_core (setup.py)
idna==2.10 # via requests
ijson==3.1.2.post0 # via frictionless
importlib-resources==3.3.0 # via validata_core (setup.py)
isodate==0.6.0 # via frictionless
jdcal==1.4.1 # via openpyxl
jsonlines==1.2.0 # via frictionless
jsonschema==3.2.0 # via frictionless
lxml==4.6.1 # via validata_core (setup.py)
lxml==4.6.2 # via validata_core (setup.py)
numpy==1.19.4 # via pandas
openpyxl==3.0.5 # via frictionless
pandas==1.1.4 # via tablib
petl==1.6.8 # via frictionless
pyrsistent==0.17.3 # via jsonschema
......@@ -31,20 +27,16 @@ python-stdnum==1.14 # via validata_core (setup.py)
pytz==2020.4 # via pandas
pyyaml==5.3.1 # via frictionless
requests==2.25.0 # via frictionless, validata_core (setup.py)
rfc3986==1.4.0 # via frictionless
shellingham==1.3.2 # via typer
simpleeval==0.9.10 # via frictionless
simplejson==3.17.2 # via frictionless
six==1.15.0 # via isodate, jsonlines, jsonschema, python-dateutil
six==1.15.0 # via isodate, jsonschema, python-dateutil, validators
stringcase==1.2.0 # via frictionless
tablib[pandas]==2.0.0 # via validata_core (setup.py)
text-unidecode==1.3 # via python-slugify
toolz==0.11.1 # via validata_core (setup.py)
typer[all]==0.3.2 # via frictionless
unicodecsv==0.14.1 # via frictionless
urllib3==1.26.2 # via requests
xlrd==1.2.0 # via frictionless
xlwt==1.3.0 # via frictionless
validators==0.18.1 # via frictionless
# The following packages are considered to be unsafe in a requirements file:
# setuptools
This diff is collapsed.
import pytest
import tablib
from validata_core import EMPTY_HEADER, repair_core
def gen_dataset(rows):
"""Turns rows into tablib.Dataset"""
return tablib.Dataset(*rows[1:], headers=rows[0])
def err_msg_data(err):
"""Shortcut"""
return dict(err)['message-data']
ASTRONAUTS_ROWS = [
['Name', 'Country', 'Year'],
['Neil Armstrong', 'USA', 1958],
['Scott Carpenter', 'USA', 1959],
['Ivan Anikeyev', 'USSR', 1960],
['Neil Armstrong', 'USA', 1960],
['Tatyana Kuznetsova', 'USSR', 1962],
['Neil Armstrong', 'USA', 1962],
]
@pytest.fixture()
def astronauts_dataset():
# https://en.wikipedia.org/wiki/List_of_astronauts_by_year_of_selection
return gen_dataset(ASTRONAUTS_ROWS)
def test_no_repair_to_be_done(astronauts_dataset):
source_dataset = astronauts_dataset
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset == source_dataset
assert len(report) == 0
def test_reorder_columns(astronauts_dataset):
source_dataset = astronauts_dataset
fixed_dataset, report = repair_core(source_dataset, ['Country', 'Year', 'Name'])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('USA', 1958, 'Neil Armstrong')
assert len(report) == 1
assert report[0].code == 'wrong-headers-order'
def test_missing_column_at_start():
rows = [row[1:] for row in ASTRONAUTS_ROWS]
source_dataset = tablib.Dataset(*rows[1:], headers=rows[0])
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('', 'USA', 1958)
assert len(report) == 1
assert report[0].code == 'missing-header'
assert err_msg_data(report[0]).get('column-name') == 'Name'
def test_missing_column_inside():
rows = [[row[0]] + row[2:] for row in ASTRONAUTS_ROWS]
source_dataset = tablib.Dataset(*rows[1:], headers=rows[0])
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('Neil Armstrong', '', 1958)
assert len(report) == 1
assert report[0].code == 'missing-header'
assert err_msg_data(report[0]).get('column-name') == 'Country'
def test_missing_column_at_end():
rows = [row[:-1] for row in ASTRONAUTS_ROWS]
source_dataset = tablib.Dataset(*rows[1:], headers=rows[0])
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('Neil Armstrong', 'USA', '')
assert len(report) == 1
assert report[0].code == 'missing-header'
assert err_msg_data(report[0]).get('column-name') == 'Year'
def test_empty_column_at_start():
rows = [[EMPTY_HEADER if i == 0 else ''] + row for i, row in enumerate(ASTRONAUTS_ROWS)]
source_dataset = tablib.Dataset(*rows[1:], headers=rows[0])
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('Neil Armstrong', 'USA', 1958)
assert len(report) == 1
assert report[0].code == 'blank-header'
def test_empty_column_at_end():
rows = [row + [EMPTY_HEADER if i == 0 else ''] for i, row in enumerate(ASTRONAUTS_ROWS)]
source_dataset = tablib.Dataset(*rows[1:], headers=rows[0])
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('Neil Armstrong', 'USA', 1958)
assert len(report) == 1
assert report[0].code == 'blank-header'
def test_empty_column_inside():
rows = [[row[0]] + [EMPTY_HEADER if i == 0 else ''] + row[1:] for i, row in enumerate(ASTRONAUTS_ROWS)]
source_dataset = tablib.Dataset(*rows[1:], headers=rows[0])
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset[0] == ('Neil Armstrong', 'USA', 1958)
assert len(report) == 1
assert report[0].code == 'blank-header'
def test_wrong_named_column(astronauts_dataset):
source_dataset = astronauts_dataset
source_dataset.headers = ('Name', 'Land', 'Year')
fixed_dataset, report = repair_core(source_dataset, ASTRONAUTS_ROWS[0])
assert fixed_dataset != source_dataset
assert fixed_dataset.headers == ['Name', 'Country', 'Year', 'Land']
assert fixed_dataset[0] == ('Neil Armstrong', '', 1958, 'USA')
assert len(report) == 2
assert any(r.code == 'extra-header' and err_msg_data(r).get('column-name') == 'Land' for r in report)
assert any(r.code == 'missing-header' and err_msg_data(r).get('column-name') == 'Country' for r in report)
import csv
import io
import itertools
import logging
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
import frictionless
import importlib_resources
import requests
import tablib
from toolz import get_in, thread_first, update_in
from . import csv_helpers, loaders, messages
from .custom_checks import available_checks, header_checks
# from .spec import spec
from .custom_checks import available_checks
log = logging.getLogger(__name__)
VALIDATA_MAX_ROWS = 100000
def replace_at(seq, index, item):
"""Replace seq[index] by item."""
return (
item if index == index1 else item1
for index1, item1 in enumerate(seq)
)
def prepend_error(report, table_index, error):
return update_in(report, ["tables"], lambda tables: list(
replace_at(tables, table_index, thread_first(
tables[table_index],
(update_in, ["errors"], lambda errors: [error] + errors),
(update_in, ["error-count"], lambda error_count: error_count + 1),
))))
def improve_messages(report, schema):
"""Translate report error messages and add `title` and `content` fields"""
if report is None:
return None
for table_id in range(report['stats']['tables']):
table = report['tables'][table_id]
table['errors'] = messages.improve_messages(table['errors'], schema)
return report
def compute_error_statistics(errors, columns):
"""Computes error statistics as a dict:
{
'count': 12,
'structure-errors': {
'count': 1,
'count-by-code': {
'invalid-column-delimiter': 1
}
},
'value-errors': {
'count': 10,
'rows-count': 3,
'count-by-code': {
'type-or-format-error': 2,
'pattern-constraint': 7,
'french-siret-value': 1,
},
'count-by-col-and-code': {
'A': {'required-constraint': 7},
'B': {'type-or-format-error': 3}
}
},
}
"""
# Nb of errors by category
errors_nb_dict = {'structure': 0, 'value': 0}
# Errors distribution by category
errors_dist_dict = {'structure': defaultdict(int), 'value': defaultdict(int)}
# Errors by column and category
errors_col_code = {}
# Fill in error stats
for err in errors:
err_tag = err['tag']
errors_nb = len(err['cells']) \
if err['code'] in ('extra-headers', 'missing-headers') else 1
errors_nb_dict[err_tag] += errors_nb
errors_dist_dict[err_tag][err['code']] += errors_nb
if err_tag == "value":
col = columns[(err["column-number"] - 1)]
if col not in errors_col_code:
errors_col_code[col] = defaultdict(int)
errors_col_code[col][err["code"]] += 1
# Compute statistics
return {
'structure-errors': {
'count': errors_nb_dict['structure'],
'count-by-code': errors_dist_dict['structure'],
},
'value-errors': {
'count': errors_nb_dict['value'],
'rows-count': len(set([err['row-number'] for err in errors if err['tag'] == 'value'])),
'count-by-code': errors_dist_dict['value'],
'count-by-col-and-code': errors_col_code,
},
'count': errors_nb_dict['structure'] + errors_nb_dict['value']
}
def amend_report(report):
"""tag 'structure' and 'value' error
Remove 'value' errors if 'structure' errors
Computes statistics
"""
def categorize_err(err):
"""Computes error category: 'structure' or 'value'"""
if err.get('column-number') is None and err.get('row-number') is None:
return 'structure'
return 'value'
if report["stats"]["tables"] == 0:
import ipdb; ipdb.set_trace()
# No table!
# Tag 'structure' or 'value'
errors = [{**err, 'tag': categorize_err(err)} for err in report['tables'][0]['errors']]
# Among value errors, only keep a single error by error cell
# => the 1st encountered one
filtered_errors = []
row_col_set = set()
for err in errors:
if err['tag'] == 'value':
row_col_id = '{}_{}'.format(err['row-number'], err.get('column-number', ''))
if row_col_id in row_col_set:
continue
row_col_set.add(row_col_id)
filtered_errors.append(err)
errors = filtered_errors
# Integrate enhanced errors into report
report['tables'][0]['errors'] = errors
report['tables'][0]['error-count'] = len(errors)
# Store statistics
columns = {}
if 'headers' in report['tables'][0]:
columns = report['tables'][0]['headers']
stats = compute_error_statistics(errors, columns)
report['tables'][0]['error-stats'] = stats
return report
def retrieve_schema_descriptor(schema):
"""Transforms a schema into a schema_descriptor
`schema` can be either:
- a `pathlib.Path`
- a `str` containing either:
- a file path
- an URL
- a `dict` representing the schema in JSON
- a `tableschema.Schema` instance
"""
if isinstance(schema, Path):
schema = str(schema)
if not isinstance(schema, frictionless.schema.Schema):
schema = frictionless.Schema(schema)
return schema
# Needed here because tablib Dataset doesn't allow empty column headers
EMPTY_HEADER = '__empty_header__'
def repair_core(dataset: tablib.Dataset, schema_field_names):
"""Core engine of repair function:
Check tabular data and return transformed dataset and report log
"""
report = []
def append_col(dataset: tablib.Dataset, column_values, header):
"""work around a tablib bug on append_col,
see https://github.com/vinayak-mehta/tablib/issues/33"""
dataset.append_col(column_values, header=header)
if dataset.headers is None:
dataset.headers = [header]
# Same field names, same order, just return dataset as is
if dataset.headers == schema_field_names:
return dataset, report
# else, work!
rows_nb = len(dataset.dict)
content_dataset = tablib.Dataset()
rejected_cols_dataset = tablib.Dataset()
column_names_dict = dict()
last_nonempty_header_col = None
first_nonempty_header_col = None
empty_header_cols = []
duplicate_header_map = {}
for i, head in enumerate(dataset.headers):
# Don't keep empty header column
if head == EMPTY_HEADER:
empty_header_cols.append(i)
continue
# Remember first non-empty header
if first_nonempty_header_col is None:
first_nonempty_header_col = i
# Remember last non-empty header
last_nonempty_header_col = i
# Move unknown columns in a special dataset
if head not in schema_field_names:
report.append(goodtables.Error(code='extra-header',
message_substitutions={'column-name': head}))
append_col(rejected_cols_dataset, dataset.get_col(i), head)
continue
# Rename and move duplicate columns in a special dataset
if head in column_names_dict:
ver = duplicate_header_map.get(head) or 1
fixed_head = '{}~{}'.format(head, ver)
duplicate_header_map[head] = ver + 1
report.append(goodtables.Error(code='duplicate-header',
message_substitutions={
'column-name': head,
'fixed-column-name': fixed_head,
# not used by Validata but needed to avoid
# a KeyError in message substitution
'column_numbers': ''}))
append_col(rejected_cols_dataset, dataset.get_col(i), fixed_head)
# Normal case
else:
append_col(content_dataset, dataset.get_col(i), head)
column_names_dict[head] = i
# add blank-header errors
def create_blank_header_error(col_id, pos_type, addon={}):
return goodtables.Error(code='blank-header', message_substitutions={
'column-number': col_id + 1,
'position': pos_type,
**addon
})
# With context to ease repairing
for col_id in empty_header_cols:
if col_id < first_nonempty_header_col:
report.append(create_blank_header_error(col_id, 'leading'))
elif col_id > last_nonempty_header_col:
report.append(create_blank_header_error(col_id, 'trailing'))
else:
before_header = list(filter(lambda elt: elt != EMPTY_HEADER, dataset.headers[:col_id][::-1]))[0]
after_header = list(filter(lambda elt: elt != EMPTY_HEADER, dataset.headers[col_id+1:]))[0]
position_addon = {
'before-header-name': before_header,
'after-header-name': after_header,
}
report.append(create_blank_header_error(col_id, 'in', addon=position_addon))
# Compare ordering
if content_dataset.headers:
schema_order_extract = [h for h in schema_field_names if h in content_dataset.headers]
if content_dataset.headers != schema_order_extract:
report.append(goodtables.Error(code='wrong-headers-order',
message_substitutions={'actual-order': content_dataset.headers,
'wanted-order': schema_order_extract}))
# Then reorder and create empty columns if no content found
fixed_dataset = tablib.Dataset()
for h in schema_field_names:
if content_dataset.headers and h in content_dataset.headers:
col_id = content_dataset.headers.index(h)
append_col(fixed_dataset, content_dataset.get_col(col_id), h)
else:
append_col(fixed_dataset, [''] * rows_nb, h)
report.append(goodtables.Error(code='missing-header',
message_substitutions={'column-number': i+1, 'column-name': h}))
# Adds rejected columns at the end if any
if len(rejected_cols_dataset) != 0:
for i, h in enumerate(rejected_cols_dataset.headers):
append_col(fixed_dataset, rejected_cols_dataset.get_col(i), h)
return fixed_dataset, report
def repair(source, schema_descriptor, **repair_options):
"""Try to repair a `source` using a `schema
Returns (fixed_source, report)
"""
def to_inline_data(dataset):
return [dataset.headers] + [dataset[i] for i in range(len(dataset))]
def consume_source(source, **options):
table = frictionless.Table(source, **options)
table.open()
# Get source headers
headers = table.header
# And source body rows
body_rows = list(table.data_stream)
return headers, body_rows
# Gets schema content
schema_field_names = [f.get('name') for f in schema_descriptor.get('fields')]
# consume source to get headers and content
try:
headers, body_rows = consume_source(source, **repair_options)
except StopIteration:
return (source, [])
# Create dataset for easier post processing
dataset = tablib.Dataset(*body_rows, headers=[h if h else EMPTY_HEADER for h in headers])
# Repair dataset!
fixed_dataset, column_errors = repair_core(dataset, schema_field_names)
# Return fixed source with potential errors
return (to_inline_data(fixed_dataset), column_errors)
def validate(source, schema, with_repair=True, **options):
def validate(source, schema, **options):
"""Validate a `source` using a `schema`."""
schema_descriptor = retrieve_schema_descriptor(schema)
fixed_source, structure_errors = source, None
# TODO: handle repair
#checks = ['structure', 'schema', {'extra-or-missing-header': {}}]
#if with_repair:
# fixed_source, structure_errors = repair(source, schema_descriptor, **options)
# checks = ['structure', 'schema']
# Extract custom checks reference from table schema
extra_checks = header_checks
# Handle different schema format
if isinstance(schema, dict):
schema = frictionless.schema.Schema(schema)
elif not isinstance(schema, frictionless.schema.Schema):
schema = frictionless.schema.Schema(str(schema))
# Dynamically add custom check based on schema needs
custom_checks_config = schema_descriptor.get('custom_checks')
extra_checks = []
custom_checks_config = schema.get('custom_checks')
if custom_checks_config:
extra_checks = []
for cc_conf in custom_checks_config:
......@@ -362,124 +30,29 @@ def validate(source, schema, with_repair=True, **options):
cc_descriptor = cc_conf["params"]
extra_checks.append((cc_class, cc_descriptor))
inspector_options_keys = [
'pick_errors', 'skip_errors', 'infer_schema',
'infer_fields', 'sync_schema', 'limit_errors',
'table_limit', 'query',
]
# TODO: merge options
inspector_options = {
# Merge options to pass to frictionless
validate_options = {
'query': frictionless.Query(limit_rows=VALIDATA_MAX_ROWS),
'extra_checks': extra_checks,
**{
# TODO: We hide "extra-cell" or "missing-cell" to make validata header errors
# work but at the risk that extra cells and missing cells are no more detected :(
'skip_errors': [
'non-matching-header',
'extra-header',
'missing-header',
"missing-cell",
"extra-cell",
],
'query': frictionless.Query(limit_rows=VALIDATA_MAX_ROWS),
'extra_checks': extra_checks,
},
**{k: v for k, v in options.items() if k in inspector_options_keys}
}
validate_options = {**options, **inspector_options}
if with_repair:
validate_options['scheme'] = 'stream'
validate_options['format'] = 'inline'
report = frictionless.validate_table(fixed_source, schema=schema_descriptor, **validate_options)
# TODO: support error types
# if report['tables'][0].get('format') == "csv" and not any(
# get_in(['errors', err['code'], 'type'], spec, default=None) == 'source'
# for err in report['tables'][0]['errors']
# ):
# standard_csv_delimiter = ","
# dialect = csv_helpers.detect_dialect(fixed_source, **options)
# if dialect is None:
# error = goodtables.Error(code='unknown-csv-dialect')
# report = prepend_error(report, table_index=0, error=dict(error))
# else:
# detected_delimiter = dialect.delimiter
# if detected_delimiter != standard_csv_delimiter:
# error = goodtables.Error(
# code='invalid-column-delimiter',
# message_substitutions={