validate_helper.py 2.13 KB
Newer Older
1
2
""" Call validation code """

3
import logging
4

5
import validata_core
6

Pierre Dittgen's avatar
Pierre Dittgen committed
7
log = logging.getLogger(__name__)
8
9
10
11
12
13
14
15


class ValidatorHelper:
    """ Help validating tabular data """

    schema_dict = {}

    @classmethod
16
    def init(cls, schemas_config):
17
        """ Register and download schema and custom_checks info """
18
        cls.validator = validata_core.Validator(schemas_config)
19
        cls.schema_dict = {}
20
21
22
23
24
        for code, schema_config in schemas_config.items():
            schema = schema_config['schema']
            log.info('Loading schema %r from %r', code, schema)
            schema_instance = cls.validator.load_schema(schema)
            cls.schema_dict[code] = {**schema_config, "schema_instance": schema_instance}
25
26
27
28
29
30
31
32
33
34
35

    @classmethod
    def schema_exist(cls, schema_code):
        """ Checks if schema exists """
        return schema_code in cls.schema_dict

    @classmethod
    def schema_info(cls, schema_code):
        """ Return schema info from code """
        if not cls.schema_exist(schema_code):
            return None
Pierre Dittgen's avatar
Pierre Dittgen committed
36
37

        # First schema keys but 'fields'
38
39
40
41
42
        d1 = {
            k: v
            for k, v in cls.schema(schema_code).descriptor.items()
            if k != 'fields'
        }
Pierre Dittgen's avatar
Pierre Dittgen committed
43
44
45

        # All keys but schema* and custom_checks*
        d2 = {k: v for k, v in cls.schema_dict[schema_code].items()
46
              if not k.startswith('schema')}
Pierre Dittgen's avatar
Pierre Dittgen committed
47
48

        return {**d1, 'code': schema_code, **d2}
49

Pierre Dittgen's avatar
Pierre Dittgen committed
50
    @classmethod
51
52
    def schema(cls, schema_code):
        """ Return schema from schema code """
Pierre Dittgen's avatar
Pierre Dittgen committed
53
54
        if not cls.schema_exist(schema_code):
            return None
55
        return cls.schema_dict[schema_code]['schema_instance']
Pierre Dittgen's avatar
Pierre Dittgen committed
56

57
58
59
60
    @classmethod
    def schema_info_list(cls):
        """ Computes and return schema info list """
        return [cls.schema_info(code) for code in sorted(cls.schema_dict.keys())]
61
62
63
64
65
66
67
68

    @classmethod
    def validate(cls, schema_code, **options):
        """ Try to retrieve cached schema from `schema_code`, otherwise pass `schema_code` it as-is """
        schema = cls.schema(schema_code)
        if schema is None:
            schema = schema_code
        return cls.validator.validate(schema=schema, **options)