validata_util.py 1.26 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
#!/usr/bin/env python3
""" Call validation code """

import logging
from io import BytesIO

import validata_core
from validata_core.source_helpers import build_tabulator_params

log = logging.getLogger(__name__)


class ValidataSource():
    """ Handy class to handle different sort of data source """

    def __init__(self, type, name, source):
        """ Initialization """
        self.type = type
        self.name = name
        self.source = source

        info = build_tabulator_params(type, name, source)
        self.source = info.get('source')
        self.format = info.get('format')
        self.scheme = info.get('scheme')

    def get_tabulator_params(self):
        """ Creates source ready to be ingested by tabulator """

        return {'source': self.source, 'format': self.format, 'scheme': self.scheme}


def bytes_data(f):
    """Gets bytes data from Werkzeug FileStorage instance"""
    iob = BytesIO()
    f.save(iob)
    iob.seek(0)
    return iob.getvalue()


41
def core_validate(schema_code, **args):
42 43 44 45 46 47 48 49
    """ Validate source against schema using custom-checks and pre-checks"""

    return validata_core.validate(
        source=args['source'],
        schema=schema_code,
        force_strings=True,
        **{k: v for k, v in args.items() if k != 'source'}
    )