Commit ef1ddef9 authored by Pierre Dittgen's avatar Pierre Dittgen

Add helper to deal with bytes content

parent bef7ac63
Pipeline #2373 passed with stage
in 1 minute and 34 seconds
......@@ -5,6 +5,7 @@ import pytest
from openpyxl import Workbook
from validata_core import validate
from validata_core.helpers import prepare_source
@pytest.fixture
......@@ -140,6 +141,7 @@ def schema_siret():
]
}
@pytest.fixture
def schema_year_interval():
return {
......@@ -195,7 +197,7 @@ def schema_year_interval_allow_year_only():
}
def _schema_sum_columns_value(column_list = ["chauffage", "salaires", "fraisdebouche"]):
def _schema_sum_columns_value(column_list=["chauffage", "salaires", "fraisdebouche"]):
return {
"$schema": "https://frictionlessdata.io/schemas/table-schema.json",
"fields": [
......@@ -285,6 +287,7 @@ def schema_year_interval():
]
}
@pytest.fixture
def schema_cohesive_columns():
return {
......@@ -346,13 +349,19 @@ def schema_compare_columns():
]
}
def validate_csv_text(source, schema, **options):
return validate(source, schema, scheme='text', format='csv', **options)
def validate_csv_bytes(csv_bytes_source, schema, **options):
source, source_options = prepare_source("file", "foo.csv", csv_bytes_source)
validate_options = {
**options,
**source_options,
}
return validate(source, schema, **validate_options)
def test_empty_file(schema_abc):
source = ""
report = validate_csv_text(source, schema_abc)
source = b""
report = validate_csv_bytes(source, schema_abc)
assert report["tables"][0]["errors"][0]["code"] == "source-error"
assert report["tables"][0]["errors"][0]["note"] == "the source is empty"
......@@ -360,58 +369,59 @@ def test_empty_file(schema_abc):
def assert_no_report_errors(report):
assert len(report['tables'][0]['errors']) == 0, report
def test_valid_delimiter(schema_abc):
source = """A,B,C
source = b"""A,B,C
a,b,c"""
report = validate_csv_text(source, schema_abc)
report = validate_csv_bytes(source, schema_abc)
assert_no_report_errors(report)
def test_valid_delimiter_semicolon(schema_abc):
source = """A;B;C
source = b"""A;B;C
a;b;c"""
report = validate_csv_text(source, schema_abc)
report = validate_csv_bytes(source, schema_abc)
assert_no_report_errors(report)
def test_invalid_delimiter_percent(schema_abc):
source = """A%B%C
source = b"""A%B%C
a%b%c"""
report = validate_csv_text(source, schema_abc)
report = validate_csv_bytes(source, schema_abc)
assert len(report['tables'][0]['errors']) != 0
# def test_missing_header_start(schema_abc):
# source = """B,C
# source = b"""B,C
# b,c"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['A']
# def test_missing_header_middle(schema_abc):
# source = """A,C
# source = b"""A,C
# a,c"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['B']
# def test_missing_header_end(schema_abc):
# source = """A,B
# source = b"""A,B
# a,b"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['C']
# def test_missing_and_extra_header_end(schema_abc):
# source = """A,B,Z
# source = b"""A,B,Z
# a,b,z"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 2
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['C']
......@@ -420,9 +430,9 @@ a%b%c"""
# def test_missing_and_extra_header_middle(schema_abc):
# source = """A,Z,B
# source = b"""A,Z,B
# a,z,b"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 2
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['C']
......@@ -431,9 +441,9 @@ a%b%c"""
# waiting for https://github.com/frictionlessdata/frictionless-py/issues/551
# def test_missing_and_extra_header_multiple(schema_abc):
# source = """A,Z
# source = b"""A,Z
# a,z"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 2
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['B', 'C']
......@@ -442,27 +452,27 @@ a%b%c"""
# waiting for https://github.com/frictionlessdata/frictionless-py/issues/551
# def test_extra_header_start(schema_abc):
# source = """X,A,B,C
# source = b"""X,A,B,C
# x,a,b,c"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][1]['code'] == 'extra-headers'
# assert report['tables'][0]['errors'][1]['cells'] == ['X']
# waiting for https://github.com/frictionlessdata/frictionless-py/issues/551
# def test_extra_header(schema_abc):
# source = """A,B,C,D
# source = b"""A,B,C,D
# a,b,c,d"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][0]['code'] == 'extra-headers'
# assert report['tables'][0]['errors'][0]['cells'] == ['D']
# def test_extra_multiple(schema_abc):
# source = """A,B,C,X,Y
# source = b"""A,B,C,X,Y
# a,b,c,x,y"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['error-stats']['count'] == 2
# assert report['tables'][0]['errors'][0]['code'] == 'extra-headers'
......@@ -470,9 +480,9 @@ a%b%c"""
# def test_missing_and_extra_headers_multiple(schema_abc):
# source = """A,Z,D
# source = b"""A,Z,D
# a,z,d"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 2
# assert report['tables'][0]['errors'][0]['code'] == 'missing-headers'
# assert report['tables'][0]['errors'][0]['message-data']['headers'] == ['B', 'C']
......@@ -481,17 +491,17 @@ a%b%c"""
# def test_header_order(schema_abc):
# source = """A,C,B
# source = b"""A,C,B
# a,c,b"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][0]['code'] == 'wrong-headers-order'
# def test_invalid_delimiter_and_missing_header(schema_abc):
# source = """A;C
# source = b"""A;C
# a;c"""
# report = validate_csv_text(source, schema_abc)
# report = validate_csv_bytes(source, schema_abc)
# assert len(report['tables'][0]['errors']) == 2
# assert report['tables'][0]['errors'][0]['code'] == 'invalid-column-delimiter'
# assert report['tables'][0]['errors'][0]['message-data']['detected'] == ';'
......@@ -501,105 +511,106 @@ a%b%c"""
def test_valid_custom_check_siren(schema_siren):
source = """id,siren
source = b"""id,siren
1,529173189"""
report = validate_csv_text(source, schema_siren)
report = validate_csv_bytes(source, schema_siren)
assert_no_report_errors(report)
def test_invalid_custom_check_siren(schema_siren):
source = """id,siren
source = b"""id,siren
1,529173188"""
report = validate_csv_text(source, schema_siren)
report = validate_csv_bytes(source, schema_siren)
assert len(report['tables'][0]['errors']) == 1
assert report['tables'][0]['errors'][0]['code'] == 'french-siren-value'
def test_valid_custom_check_siret(schema_siret):
source = """id,numero_siret
source = b"""id,numero_siret
1,83014132100026"""
report = validate_csv_text(source, schema_siret)
report = validate_csv_bytes(source, schema_siret)
assert_no_report_errors(report)
def test_invalid_custom_check_siret(schema_siret):
source = """id,numero_siret
source = b"""id,numero_siret
1,529173188"""
report = validate_csv_text(source, schema_siret)
report = validate_csv_bytes(source, schema_siret)
assert len(report['tables'][0]['errors']) == 1
assert report['tables'][0]['errors'][0]['code'] == 'french-siret-value'
def test_valid_custom_check_year_interval_1(schema_year_interval):
source = """projet,annee
source = b"""projet,annee
Validata,2018/2020"""
report = validate_csv_text(source, schema_year_interval)
report = validate_csv_bytes(source, schema_year_interval)
assert_no_report_errors(report)
def test_valid_custom_check_year_interval_2(schema_year_interval_allow_year_only):
source = """projet,annee
source = b"""projet,annee
Validata,2018/2020"""
report = validate_csv_text(source, schema_year_interval_allow_year_only)
report = validate_csv_bytes(source, schema_year_interval_allow_year_only)
assert_no_report_errors(report)
def test_valid_custom_check_year_interval_3(schema_year_interval_allow_year_only):
source = """projet,annee
source = b"""projet,annee
Validata,2020"""
report = validate_csv_text(source, schema_year_interval_allow_year_only)
report = validate_csv_bytes(source, schema_year_interval_allow_year_only)
assert_no_report_errors(report)
def test_invalid_custom_check_year_interval_1(schema_year_interval):
source = """projet,annee
source = b"""projet,annee
Validata,foobar"""
report = validate_csv_text(source, schema_year_interval)
report = validate_csv_bytes(source, schema_year_interval)
assert len(report['tables'][0]['errors']) == 1
assert report['tables'][0]['errors'][0]['code'] == 'year-interval-value'
def test_invalid_custom_check_year_interval_2(schema_year_interval):
source = """projet,annee
source = b"""projet,annee
Validata,2017/2017"""
report = validate_csv_text(source, schema_year_interval)
report = validate_csv_bytes(source, schema_year_interval)
assert len(report['tables'][0]['errors']) == 1
assert report['tables'][0]['errors'][0]['code'] == 'year-interval-value'
def test_invalid_custom_check_year_interval_3(schema_year_interval):
source = """projet,annee
source = b"""projet,annee
Validata,2017/2016"""
report = validate_csv_text(source, schema_year_interval)
report = validate_csv_bytes(source, schema_year_interval)
assert len(report['tables'][0]['errors']) == 1
assert report['tables'][0]['errors'][0]['code'] == 'year-interval-value'
def test_valid_custom_sum_columns_value_1(schema_sum_columns_value_ok):
source = """charges,chauffage,salaires,fraisdebouche
source = b"""charges,chauffage,salaires,fraisdebouche
12000,600,4000,7400"""
report = validate_csv_text(source, schema_sum_columns_value_ok)
report = validate_csv_bytes(source, schema_sum_columns_value_ok)
assert_no_report_errors(report)
def test_valid_custom_sum_columns_value_2(schema_sum_columns_value_ok):
source = """charges,chauffage,salaires,fraisdebouche
source = b"""charges,chauffage,salaires,fraisdebouche
12000,600,,7400"""
report = validate_csv_text(source, schema_sum_columns_value_ok)
report = validate_csv_bytes(source, schema_sum_columns_value_ok)
assert_no_report_errors(report)
def test_valid_custom_sum_columns_value_3(schema_sum_columns_value_ok):
source = """charges,chauffage,salaires,fraisdebouche
source = b"""charges,chauffage,salaires,fraisdebouche
,600,4000,7400"""
report = validate_csv_text(source, schema_sum_columns_value_ok)
report = validate_csv_bytes(source, schema_sum_columns_value_ok)
assert_no_report_errors(report)
def test_valid_custom_sum_columns_value_3(schema_sum_columns_value_ok):
# 1100 != 600 + 4000 + 7400
source = """charges,chauffage,salaires,fraisdebouche
source = b"""charges,chauffage,salaires,fraisdebouche
1100,600,4000,7400"""
report = validate_csv_text(source, schema_sum_columns_value_ok)
report = validate_csv_bytes(source, schema_sum_columns_value_ok)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "sum-columns-value"
......@@ -608,97 +619,99 @@ def test_valid_custom_sum_columns_value_3(schema_sum_columns_value_ok):
# - invalid column name in custom check param -> general error
# - only one column name in custom check params -> general error
def test_valid_nomenclature_actes_value(schema_nomenclature_actes_value):
source = """acte
source = b"""acte
Fonction publique/foobar
"""
report = validate_csv_text(source, schema_nomenclature_actes_value)
report = validate_csv_bytes(source, schema_nomenclature_actes_value)
assert_no_report_errors(report)
def test_invalid_nomenclature_actes_value_1(schema_nomenclature_actes_value):
source = """acte
source = b"""acte
foobar
"""
report = validate_csv_text(source, schema_nomenclature_actes_value)
report = validate_csv_bytes(source, schema_nomenclature_actes_value)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "nomenclature-actes-value"
def test_invalid_nomenclature_actes_value_2(schema_nomenclature_actes_value):
source = """acte
source = b"""acte
Baz/foobar
"""
report = validate_csv_text(source, schema_nomenclature_actes_value)
report = validate_csv_bytes(source, schema_nomenclature_actes_value)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "nomenclature-actes-value"
def test_cohesive_columns_values_1(schema_cohesive_columns):
source = """id,col1,col2
source = b"""id,col1,col2
1,,"""
report = validate_csv_text(source, schema_cohesive_columns)
report = validate_csv_bytes(source, schema_cohesive_columns)
assert_no_report_errors(report)
def test_cohesive_columns_values_2(schema_cohesive_columns):
source = """id,col1,col2
source = b"""id,col1,col2
1,foo,bar"""
report = validate_csv_text(source, schema_cohesive_columns)
report = validate_csv_bytes(source, schema_cohesive_columns)
assert_no_report_errors(report)
def test_cohesive_columns_values_3(schema_cohesive_columns):
source = """id,col1,col2
source = b"""id,col1,col2
1,foo,"""
report = validate_csv_text(source, schema_cohesive_columns)
report = validate_csv_bytes(source, schema_cohesive_columns)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "cohesive-columns-value"
def test_cohesive_columns_values_4(schema_cohesive_columns):
source = """id,col1,col2
source = b"""id,col1,col2
1,,bar"""
report = validate_csv_text(source, schema_cohesive_columns)
report = validate_csv_bytes(source, schema_cohesive_columns)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "cohesive-columns-value"
def test_compare_columns_value_1(schema_compare_columns):
source = """depenses,recettes
source = b"""depenses,recettes
12000,15000"""
report = validate_csv_text(source, schema_compare_columns)
report = validate_csv_bytes(source, schema_compare_columns)
assert_no_report_errors(report)
def test_compare_columns_value_2(schema_compare_columns):
source = """depenses,recettes
source = b"""depenses,recettes
12000,12000"""
report = validate_csv_text(source, schema_compare_columns)
report = validate_csv_bytes(source, schema_compare_columns)
assert_no_report_errors(report)
def test_compare_columns_value_3(schema_compare_columns):
source = """depenses,recettes
source = b"""depenses,recettes
12000,6000"""
report = validate_csv_text(source, schema_compare_columns)
report = validate_csv_bytes(source, schema_compare_columns)
assert report["stats"]["errors"] == 1
assert report["stats"]["tables"] == 1
assert report["tables"][0]["errors"][0]["code"] == "compare-columns-value"
# def test_error_stats(schema_types_and_required):
# source = """A,B
# source = b"""A,B
# 2,2020-04-01
# ,foo
# """
# report = validate_csv_text(source, schema_types_and_required)
# report = validate_csv_bytes(source, schema_types_and_required)
# assert report['tables'][0]['error-stats'] == {
# 'structure-errors': {'count': 0, 'count-by-code': {}},
# 'value-errors': {
......@@ -714,31 +727,33 @@ def test_compare_columns_value_3(schema_compare_columns):
# # XLSX
# def validate_xlsx_bytes(source, **options):
# # return validate(BytesResource(source, format="xlsx"), with_repair=False, **options)
# res = Resource(scheme=)
# return validate(BytesResource(source, format="xlsx"), with_repair=False, **options)
def validate_xlsx_bytes(bytes_source_content, **options):
source, source_options = prepare_source("file", "foo.xlsx", bytes_source_content)
validate_options = {
**options,
**source_options,
}
return validate(source, **validate_options)
# def build_one_cell_xlsx(cell):
# wb = Workbook()
# ws1 = wb.active
# ws1["A1"] = "A"
# ws1["A2"] = cell
# return wb
def build_one_cell_xlsx(cell):
wb = Workbook()
ws1 = wb.active
ws1["A1"] = "A"
ws1["A2"] = cell
return wb
# def test_xlsx_invalid_date_type_str(schema_date):
# wb = build_one_cell_xlsx("c")
# with BytesIO() as fp:
# wb.save(fp)
# xlsx_bytes = fp.getvalue()
# report = validate_xlsx_bytes(xlsx_bytes, schema=schema_date)
# assert len(report['tables'][0]['errors']) == 1
# assert report['tables'][0]['errors'][0]['code'] == 'type-or-format-error'
# assert report['tables'][0]['errors'][0]['message-data']['value'] == 'c'
# assert report['tables'][0]['errors'][0]['message-data']['field_format'] == 'default'
# assert report['tables'][0]['errors'][0]['message-data']['field_type'] == 'date'
def test_xlsx_invalid_date_type_str(schema_date):
wb = build_one_cell_xlsx("c")
with BytesIO() as fp:
wb.save(fp)
xlsx_bytes = fp.getvalue()
report = validate_xlsx_bytes(xlsx_bytes, schema=schema_date)
assert len(report['tables'][0]['errors']) == 1
assert report['tables'][0]['errors'][0]['code'] == 'type-error'
assert report['tables'][0]['errors'][0]['cell'] == 'c'
assert report['tables'][0]['schema']['fields'][0]['type'] == 'date'
# def test_xlsx_invalid_date_type_datetime(schema_date):
......
from io import BytesIO
from typing import Any, Dict, Tuple
from frictionless.helpers import detect_encoding, detect_source_scheme_and_format
def prepare_source(source_type, name, source) -> Tuple[Any, Dict]:
"""Detect scheme, format (an encoding) from name and source.
Return source, **source_params
"""
scheme, format = detect_source_scheme_and_format(name)
if source_type == "file":
if format == 'csv':
scheme = 'text'
encoding = detect_encoding(source)
source = source.decode(encoding)
else:
scheme = 'filelike'
source = BytesIO(source)
return source, {
"scheme": scheme,
"format": format,
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment