Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Antoine Augusti
Validata Core
Commits
0895808f
Commit
0895808f
authored
Jun 25, 2019
by
Christophe Benz
Browse files
Remove useless class
parent
a38cf1f2
Changes
5
Hide whitespace changes
Inline
Side-by-side
CHANGELOG.md
View file @
0895808f
## 0.4.0
Breaking changes:
-
Replace
`validata_code.Validator`
class by
`validata_code.validate`
function.
## 0.3.5
Non-breaking changes:
...
...
setup.py
View file @
0895808f
...
...
@@ -12,7 +12,7 @@ with readme_filepath.open('rt', encoding='utf-8') as fd_in:
setup
(
name
=
'validata_core'
,
version
=
'0.
3.5
'
,
version
=
'0.
4.0
'
,
description
=
"Validata Core library"
,
long_description
=
LONG_DESCRIPTION
,
...
...
@@ -59,7 +59,6 @@ setup(
'requests'
,
'tabulator'
,
'tableschema'
,
'toml'
,
'toolz'
,
# for custom_checks
...
...
tests/test_core.py
View file @
0895808f
...
...
@@ -3,7 +3,7 @@ from io import BytesIO
import
pytest
from
openpyxl
import
Workbook
from
validata_core
import
V
alidat
or
from
validata_core
import
v
alidat
e
@
pytest
.
fixture
...
...
@@ -62,7 +62,7 @@ def schema_number():
def
validate_csv_text
(
**
options
):
return
Validator
().
validate
(
scheme
=
'text'
,
format
=
'csv'
,
**
options
)
return
validate
(
scheme
=
'text'
,
format
=
'csv'
,
**
options
)
def
test_empty_file
(
schema_abc
):
...
...
@@ -224,7 +224,7 @@ a;c"""
def
validate_xlsx_bytes
(
**
options
):
return
Validator
().
validate
(
scheme
=
'bytes'
,
format
=
'xlsx'
,
**
options
)
return
validate
(
scheme
=
'bytes'
,
format
=
'xlsx'
,
**
options
)
def
build_one_cell_xlsx
(
cell
):
...
...
validata_core/__init__.py
View file @
0895808f
...
...
@@ -147,79 +147,69 @@ def amend_report(report):
return
report
class
Validator
:
def
load_schema
(
self
,
source
):
"""Return a `tableschema.Schema` instance from `source`."""
if
isinstance
(
source
,
Path
):
source
=
str
(
source
)
try
:
return
tableschema
.
Schema
(
source
)
except
tableschema
.
exceptions
.
LoadError
as
e
:
log
.
exception
(
e
)
log
.
warning
(
"Schema load exception from {}"
.
format
(
source
))
return
None
def
validate
(
self
,
source
,
schema
,
**
options
):
"""Validate a `source` using a `schema`.
`schema` can be either:
- a `pathlib.Path`
- a `str` containing either:
- a file path
- an URL
- a `dict` representing the schema in JSON
- a `tableschema.Schema` instance
"""
if
not
isinstance
(
schema
,
tableschema
.
Schema
):
schema
=
self
.
load_schema
(
schema
)
schema_descriptor
=
schema
.
descriptor
checks
=
[
'structure'
,
'schema'
,
{
'extra-or-missing-header'
:
{}}]
custom_checks_config
=
schema_descriptor
.
get
(
'custom_checks'
)
if
custom_checks_config
:
for
custom_check_conf
in
custom_checks_config
:
checks
.
append
({
custom_check_conf
[
'name'
]:
custom_check_conf
[
'params'
]})
inspector
=
goodtables
.
Inspector
(
checks
=
checks
,
skip_checks
=
[
'non-matching-header'
,
'extra-header'
,
'missing-header'
],
row_limit
=
VALIDATA_MAX_ROWS
,
)
options
=
{
**
options
,
"custom_loaders"
:
loaders
.
custom_loaders
}
report
=
inspector
.
inspect
(
source
=
source
,
schema
=
schema_descriptor
,
**
options
)
if
report
[
'tables'
][
0
].
get
(
'format'
)
==
"csv"
and
not
any
(
get_in
([
'errors'
,
err
[
'code'
],
'type'
],
spec
,
default
=
None
)
==
'source'
for
err
in
report
[
'tables'
][
0
][
'errors'
]
):
standard_csv_delimiter
=
","
dialect
=
csv_helpers
.
detect_dialect
(
source
,
**
options
)
if
dialect
is
None
:
error
=
goodtables
.
Error
(
code
=
'unknown-csv-dialect'
)
def
validate
(
source
,
schema
,
**
options
):
"""Validate a `source` using a `schema`.
`schema` can be either:
- a `pathlib.Path`
- a `str` containing either:
- a file path
- an URL
- a `dict` representing the schema in JSON
- a `tableschema.Schema` instance
"""
if
isinstance
(
schema
,
Path
):
schema
=
str
(
schema
)
if
not
isinstance
(
schema
,
tableschema
.
Schema
):
schema
=
tableschema
.
Schema
(
schema
)
schema_descriptor
=
schema
.
descriptor
checks
=
[
'structure'
,
'schema'
,
{
'extra-or-missing-header'
:
{}}]
custom_checks_config
=
schema_descriptor
.
get
(
'custom_checks'
)
if
custom_checks_config
:
for
custom_check_conf
in
custom_checks_config
:
checks
.
append
({
custom_check_conf
[
'name'
]:
custom_check_conf
[
'params'
]})
inspector
=
goodtables
.
Inspector
(
checks
=
checks
,
skip_checks
=
[
'non-matching-header'
,
'extra-header'
,
'missing-header'
],
row_limit
=
VALIDATA_MAX_ROWS
,
)
options
=
{
**
options
,
"custom_loaders"
:
loaders
.
custom_loaders
}
report
=
inspector
.
inspect
(
source
=
source
,
schema
=
schema_descriptor
,
**
options
)
if
report
[
'tables'
][
0
].
get
(
'format'
)
==
"csv"
and
not
any
(
get_in
([
'errors'
,
err
[
'code'
],
'type'
],
spec
,
default
=
None
)
==
'source'
for
err
in
report
[
'tables'
][
0
][
'errors'
]
):
standard_csv_delimiter
=
","
dialect
=
csv_helpers
.
detect_dialect
(
source
,
**
options
)
if
dialect
is
None
:
error
=
goodtables
.
Error
(
code
=
'unknown-csv-dialect'
)
report
=
prepend_error
(
report
,
table_index
=
0
,
error
=
dict
(
error
))
else
:
detected_delimiter
=
dialect
.
delimiter
if
detected_delimiter
!=
standard_csv_delimiter
:
error
=
goodtables
.
Error
(
code
=
'invalid-column-delimiter'
,
message_substitutions
=
{
"detected"
:
detected_delimiter
,
"expected"
:
standard_csv_delimiter
,
},
)
report
=
prepend_error
(
report
,
table_index
=
0
,
error
=
dict
(
error
))
else
:
detected_delimiter
=
dialect
.
delimiter
if
detected_delimiter
!=
standard_csv_delimiter
:
error
=
goodtables
.
Error
(
code
=
'invalid-column-delimiter'
,
message_substitutions
=
{
"detected"
:
detected_delimiter
,
"expected"
:
standard_csv_delimiter
,
},
)
report
=
prepend_error
(
report
,
table_index
=
0
,
error
=
dict
(
error
))
# Translate error messages
report
=
improve_messages
(
report
,
schema_descriptor
)
# Tag errors ('structure' or 'value')
# Compute statistics
report
=
amend_report
(
report
)
# Add date
report
[
'date'
]
=
datetime
.
now
(
timezone
.
utc
).
isoformat
()
return
report
# Translate error messages
report
=
improve_messages
(
report
,
schema_descriptor
)
# Tag errors ('structure' or 'value')
# Compute statistics
report
=
amend_report
(
report
)
# Add date
report
[
'date'
]
=
datetime
.
now
(
timezone
.
utc
).
isoformat
()
return
report
def
compute_badge
(
report
,
config
)
->
dict
:
...
...
validata_core/cli.py
100755 → 100644
View file @
0895808f
...
...
@@ -9,7 +9,7 @@ import json
import
logging
import
sys
from
.
import
V
alidat
or
from
.
import
v
alidat
e
def
cli
():
...
...
@@ -28,12 +28,8 @@ def cli():
stream
=
sys
.
stderr
,
# script outputs data
)
report
=
Validator
().
validate
(
source
=
args
.
source
,
schema
=
args
.
schema
)
report
=
validate
(
source
=
args
.
source
,
schema
=
args
.
schema
)
json
.
dump
(
report
,
sys
.
stdout
,
ensure_ascii
=
False
,
indent
=
2
,
sort_keys
=
True
)
return
0
if
__name__
==
'__main__'
:
sys
.
exit
(
cli
())
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment