validate_helper.py 3.71 KB
Newer Older
1 2 3 4 5 6 7 8
#!/usr/bin/env python3
""" Call validation code """

from pathlib import Path

import requests

import ujson as json
Pierre Dittgen's avatar
Pierre Dittgen committed
9 10 11 12 13 14
from validata_validate import validate


#
# MEMENTO: json.load(pkg_resources.resource_stream('validata_validate', 'spec.json'))
#
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35


class ValidatorHelper:
    """ Help validating tabular data """

    schema_dict = {}

    @classmethod
    def init(cls, schema_info, cache_dir: Path):
        """ Register and download schema and custom_checks info """
        cls.schema_dict = {}
        cls.cache_dir = cache_dir
        for code in schema_info:
            print('Downloading schema {}'.format(code))
            schema = schema_info[code].copy()

            # schema download
            schema['schema'] = cls.json_download(schema['schema_json_url'],
                                                 '{}_schema.json'.format(code))

            # custom_checks
36 37 38
            if 'goodtables_checks_json_url' in schema:
                schema['goodtables_checks'] = cls.json_download(schema['goodtables_checks_json_url'],
                                                                '{}_goodtables_checks.json'.format(code))
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
            cls.schema_dict[code] = schema

    @classmethod
    def json_download(cls, url, filename):
        """ Download url content as JSON """

        if cls.cache_dir is None:
            return requests.get(url).json()

        cache_file = cls.cache_dir / filename
        if cache_file.exists():
            with cache_file.open('rt', encoding='utf-8') as fd:
                return json.load(fd)
        else:
            data = requests.get(url).json()
            with cache_file.open("wt", encoding='utf-8') as fd:
                json.dump(data, fd, ensure_ascii=False, sort_keys=True, indent=2)
            return data

    @classmethod
    def schema_exist(cls, schema_code):
        """ Checks if schema exists """
        return schema_code in cls.schema_dict

    @classmethod
    def schema_info(cls, schema_code):
        """ Return schema info from code """
        if not cls.schema_exist(schema_code):
            return None
Pierre Dittgen's avatar
Pierre Dittgen committed
68 69 70 71 72 73

        # First schema keys but 'fields'
        d1 = dict([(k, v) for k, v in cls.schema_dict[schema_code]['schema'].items() if k != 'fields'])

        # All keys but schema* and custom_checks*
        d2 = {k: v for k, v in cls.schema_dict[schema_code].items()
74
              if not k.startswith('schema') and not k.startswith('goodtables_checks')}
Pierre Dittgen's avatar
Pierre Dittgen committed
75 76

        return {**d1, 'code': schema_code, **d2}
77

Pierre Dittgen's avatar
Pierre Dittgen committed
78
    @classmethod
79 80
    def schema(cls, schema_code):
        """ Return schema from schema code """
Pierre Dittgen's avatar
Pierre Dittgen committed
81 82
        if not cls.schema_exist(schema_code):
            return None
83
        return cls.schema_dict[schema_code]['schema']
Pierre Dittgen's avatar
Pierre Dittgen committed
84

85 86 87 88 89 90
    @classmethod
    def schema_info_list(cls):
        """ Computes and return schema info list """
        return [cls.schema_info(code) for code in sorted(cls.schema_dict.keys())]

    @classmethod
91
    def validate(cls, schema_code, **args):
Pierre Dittgen's avatar
Pierre Dittgen committed
92 93 94 95 96 97 98
        """ Validate source against schema using custom-checks """

        # Gets schema info
        sc_info = cls.schema_dict[schema_code]

        # Build checks configuration
        checks = ['structure', 'schema']
99
        pre_checks_conf = None
100 101
        if 'goodtables_checks' in sc_info:
            c_checks = sc_info['goodtables_checks']
102 103 104 105
            if 'custom_checks' in c_checks:
                for check_conf in c_checks['custom_checks']:
                    checks.append({check_conf['name']: check_conf['params']})
            pre_checks_conf = c_checks.get('pre_checks')
Pierre Dittgen's avatar
Pierre Dittgen committed
106 107

        return validate(
108
            source=args['source'],
Pierre Dittgen's avatar
Pierre Dittgen committed
109
            schema=sc_info['schema'],
Pierre Dittgen's avatar
Pierre Dittgen committed
110
            pre_checks_conf=pre_checks_conf,
111
            checks=checks,
112
            **{k: v for k, v in args.items() if k != 'source'}
Pierre Dittgen's avatar
Pierre Dittgen committed
113
        )