Commit deecb000 authored by Pierre Dittgen's avatar Pierre Dittgen
Browse files

Now generates legacy data

parent dec0d2e9
......@@ -28,7 +28,6 @@ download_opendatafrance_data() {
# data.gouv.fr
download_and_filter_datagouv_data() {
DG_TEMP_DIR=$CACHE_DIR/datagouv
DG_DUMP_DIR=$DUMPS_DIR/datagouv
ORGA_CSV_URL=https://www.data.gouv.fr/organizations.csv
DATASET_CSV_URL=https://www.data.gouv.fr/datasets.csv
RESOURCE_CSV_URL=https://www.data.gouv.fr/resources.csv
......@@ -39,7 +38,7 @@ download_and_filter_datagouv_data() {
download_file_or_exit $DATASET_CSV_URL $DG_TEMP_DIR/datasets.csv
download_file_or_exit $RESOURCE_CSV_URL $DG_TEMP_DIR/resources.csv
#Extract id_datagouv column content from OpenDataFrance organisations file
# Extract id_datagouv column content from OpenDataFrance organisations file
DG_ORG_ID_FILE=$DG_TEMP_DIR/org_ids.txt
$CSV_CUT -c "id-datagouv" $ODF_ORGA_FILE | $SED -e '1d' | $GREP -v '""' > $DG_ORG_ID_FILE || exit 1
......@@ -56,7 +55,6 @@ download_and_filter_datagouv_data() {
# OpenDataSoft
download_and_merge_opendatasoft_data() {
ODS_TEMP_DIR=$CACHE_DIR/opendatasoft
ODS_DUMP_DIR=$DUMPS_DIR/opendatasoft
echo "Downloading OpenDataSoft catalog and monitoring..."
ODS_INFO_FILE=$ODS_TEMP_DIR/ods_info.csv
......@@ -96,6 +94,63 @@ prepare_cog_info() {
(echo "regcode,regnom" && $CSV_CUT -c "reg,libelle" $COG_REG_TXT_FILE | $SED "1d") > $CACHE_DIR/cog_region.csv || exit 1
}
# SQLiteDB
prepare_db_info() {
if [ -e $SQLITE_DB ]; then
rm -f $SQLITE_DB
fi
# Imports departements CSV into DB
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --table_name=depts --csv_delimiter=tab $CACHE_DIR/cog.tsv $SQLITE_DB
# Imports GoogleSheet CSV into DB
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --table_name data_orga --integer_cols=5,7 $ODF_ORGA_FILE $SQLITE_DB || exit 1
$SQLITE3 $SQLITE_DB < $LIB_DIR/db/add_reg_nom.sql || exit 1
# Imports Data gouv CSVs into DB
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --table_name data_gouv_organizations --csv_delimiter=semicolon --integer_cols=9,10,11,12,13,14,15,16 $DG_DUMP_DIR/organizations.csv $SQLITE_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --table_name data_gouv_datasets --csv_delimiter=semicolon --integer_cols=18,19,20,22 $DG_DUMP_DIR/datasets.csv $SQLITE_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --table_name data_gouv_resources --csv_delimiter=semicolon --integer_cols=20 $DG_DUMP_DIR/resources.csv $SQLITE_DB || exit 1
# And add _siren et _nom columns
$SQLITE3 $SQLITE_DB < $LIB_DIR/db/add_siren_nom_to_datagouv_tables.sql || exit 1
# Imports OpenDataSoft CSV into DB
$PYTHON $LIB_DIR/db/csv_sqlite_import.py $ODS_DUMP_DIR/ods_catalog.csv $SQLITE_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py $ODS_DUMP_DIR/ods_monitoring.csv $SQLITE_DB || exit 1
# En a-t-on besoin ?
# Creates and fill ref_org table from data_orga
$SQLITE3 $SQLITE_DB < $LIB_DIR/db/create_ref_org_table.sql || exit 1
if [ -e $SQLITE_GEOREF_DB ]; then
name=`basename $SQLITE_GEOREF_DB`
echo "$name already present, not regenerated"
exit 0
fi
GEO_TEMP_DIR=$CACHE_DIR/geo
mkdir -p $GEO_TEMP_DIR
# Converts SHP to tsv
$PYTHON $LIB_DIR/db/shp2csv.py --csv_delimiter tab $RSC_DIR/ADE_1-1_SHP_LAMB93_FR/REGION.shp $GEO_TEMP_DIR/ae_metropole_region.tsv || exit 1
$PYTHON $LIB_DIR/db/shp2csv.py --csv_delimiter tab $RSC_DIR/ADE_1-1_SHP_LAMB93_FR/DEPARTEMENT.shp $GEO_TEMP_DIR/ae_metropole_departement.tsv || exit 1
$PYTHON $LIB_DIR/db/shp2csv.py --csv_delimiter tab $RSC_DIR/ADE_1-1_SHP_LAMB93_FR/COMMUNE.shp $GEO_TEMP_DIR/ae_metropole_commune.tsv || exit 1
$PYTHON $LIB_DIR/db/shp2csv.py --csv_delimiter tab $RSC_DIR/ADE_1-1_SHP_LAMB93_FR/EPCI.shp $GEO_TEMP_DIR/ae_metropole_epci.tsv || exit 1
$PYTHON $LIB_DIR/db/shp2csv.py --csv_delimiter tab $RSC_DIR/IAU-IDF/Intercommunalites_de_la_Region_IledeFrance_au_1er_janvier_2018.shp $GEO_TEMP_DIR/iau_ept.tsv || exit 1
# Imports Geo CSVs into GeoRefDB
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --csv_delimiter tab $GEO_TEMP_DIR/ae_metropole_region.tsv $SQLITE_GEOREF_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --csv_delimiter tab $GEO_TEMP_DIR/ae_metropole_departement.tsv $SQLITE_GEOREF_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --csv_delimiter tab $GEO_TEMP_DIR/ae_metropole_commune.tsv $SQLITE_GEOREF_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --csv_delimiter tab $GEO_TEMP_DIR/ae_metropole_epci.tsv $SQLITE_GEOREF_DB || exit 1
$PYTHON $LIB_DIR/db/csv_sqlite_import.py --csv_delimiter tab $GEO_TEMP_DIR/iau_ept.tsv $SQLITE_GEOREF_DB || exit 1
# Clean
rm -fR $GEO_TEMP_DIR
}
# Main script
......@@ -105,4 +160,5 @@ download_opendatafrance_data
download_and_filter_datagouv_data
download_and_merge_opendatasoft_data
download_siren_info
prepare_cog_info
\ No newline at end of file
prepare_cog_info
prepare_db_info
\ No newline at end of file
......@@ -21,10 +21,19 @@ DUMPS_DIR=$BASE_DIR/dumps
LIB_DIR=$BASE_DIR/lib
RSC_DIR=$BASE_DIR/rsc
# Pivot directories
DG_DUMP_DIR=$DUMPS_DIR/datagouv
ODS_DUMP_DIR=$DUMPS_DIR/opendatasoft
# Pivot files
ODF_ORGA_FILE=$DUMPS_DIR/opendatafrance/organisations.csv
ODF_PTF_FILE=$DUMPS_DIR/opendatafrance/plateformes.csv
# Legacy
SQLITE_DB=$CACHE_DIR/process.db
SQLITE_GEOREF_DB=$CACHE_DIR/georef.db
# Tools
CSV_OPTS="-z 1000000"
CSV_CUT="csvcut $CSV_OPTS"
......@@ -36,6 +45,7 @@ GREP=grep
ICONV=icon
PYTHON=python3
SED=sed
SQLITE3=sqlite3
# Misc
download_file_or_exit() {
......
-- Add 'regnom' column to data_orga table
CREATE TEMPORARY TABLE _regions (code TEXT, name TEXT);
INSERT INTO _regions
SELECT regcode, nom
FROM data_orga
WHERE type='REG';
INSERT INTO _regions VALUES ('NOR-28', 'NORMANDIE');
INSERT INTO _regions VALUES ('OCC-76', 'OCCITANIE');
INSERT INTO _regions VALUES ('OM-06', 'OUTREMER');
ALTER TABLE data_orga ADD COLUMN regnom TEXT;
UPDATE data_orga SET regnom=(SELECT name FROM _regions WHERE data_orga.regcode = _regions.code);
\ No newline at end of file
-- Create temporary table for siren to datagouv id matching
CREATE TEMPORARY TABLE _odf_dg (siren TEXT, nom TEXT, id_data_gouv TEXT);
INSERT INTO _odf_dg
SELECT siren, nom, id_datagouv
FROM data_orga
WHERE id_datagouv != '' AND id_datagouv IS NOT NULL;
-- Use it to update datagouv organizations table
ALTER TABLE data_gouv_organizations ADD COLUMN '_siren';
ALTER TABLE data_gouv_organizations ADD COLUMN '_nom';
UPDATE data_gouv_organizations
SET _siren = (SELECT siren FROM _odf_dg WHERE _odf_dg.id_data_gouv = id),
_nom = (SELECT nom FROM _odf_dg WHERE _odf_dg.id_data_gouv = id);
-- Use it to update datagouv datasets table
ALTER TABLE data_gouv_datasets ADD COLUMN '_siren';
ALTER TABLE data_gouv_datasets ADD COLUMN '_nom';
UPDATE data_gouv_datasets
SET _siren = (SELECT siren FROM _odf_dg WHERE _odf_dg.id_data_gouv = organization_id),
_nom = (SELECT nom FROM _odf_dg WHERE _odf_dg.id_data_gouv = organization_id);
-- Use it to update datagouv resources table
ALTER TABLE data_gouv_resources ADD COLUMN '_siren';
ALTER TABLE data_gouv_resources ADD COLUMN '_nom';
UPDATE data_gouv_resources
SET _siren = (SELECT siren FROM _odf_dg WHERE _odf_dg.id_data_gouv = dataset_organization_id),
_nom = (SELECT nom FROM _odf_dg WHERE _odf_dg.id_data_gouv = dataset_organization_id);
-- Create ref_org handy table
DROP TABLE IF EXISTS ref_org;
CREATE TABLE ref_org AS
SELECT siren, nom, type
FROM data_orga
ORDER BY nom;
#!/usr/bin/env python3
#
#
"""
Imports a CSV file into an existing SQLite DB
"""
import argparse
import collections
import csv
import re
import sqlite3
import sys
import unicodedata
from collections import OrderedDict
from pathlib import Path
from timeit import default_timer as timer
NORMALIZE_RE = re.compile("['\"\\. -]+")
DELIMITER_NAME_TO_CHAR = collections.OrderedDict([
('comma', ','),
('semicolon', ';'),
('tab', '\t'),
('pipe', '|'),
])
class SqliteTableHelper:
""" Ease SQLite table handling """
def __init__(self, cur, table_name, headers, col_type):
""" Inits instance """
self.cur = cur
self.table_name = table_name
self.headers = headers
self.col_type = col_type
self._inserted_rows_nb = 0
# Computes SQL INSERT statement
col_names = ["'{}'".format(h['name']) for h in headers]
q_list = ['?'] * len(headers)
self.sql_insert = 'INSERT INTO {} ({}) VALUES ({});'.format(table_name, ', '.join(col_names), ', '.join(q_list))
def create_table(self):
""" Creates table """
self.cur.execute('DROP TABLE IF EXISTS {}'.format(self.table_name))
table_constraints = ['{} {}'.format(h['name'], self.col_type.get(i, 'TEXT'))
for i, h in enumerate(self.headers)]
sql_tpl = 'CREATE TABLE {} ({})'.format(self.table_name, ', '.join(table_constraints))
self.cur.execute(sql_tpl)
def flush_insert(self, data_buffer, doit):
""" Inserts data into table if any. Reset data_buffer """
if not data_buffer or not doit:
return
try:
self.cur.executemany(self.sql_insert, data_buffer)
except sqlite3.ProgrammingError:
print('PROBLEM !!!!')
for i, row in enumerate(data_buffer):
print('#{} ({}) {}'.format(i, len(row), row))
self._inserted_rows_nb += len(data_buffer)
data_buffer.clear()
def inserted_rows_nb(self):
""" Returns nb of inserted rows """
return self._inserted_rows_nb
def norm_string(s):
""" Change 'Hélène à côté' into 'helene_a_cote' """
s = ''.join(c for c in unicodedata.normalize('NFD', s.lower())
if unicodedata.category(c) != 'Mn')
s = s.replace('°', 'o')
return NORMALIZE_RE.sub('_', s)
def norm_table_name(table_name):
""" Change ODATER_DATA_ORGA_AVRIL18 into odater_data_orga_avril18 """
return norm_string(table_name)
def norm_col_name(col):
""" Changes 'metric.members' into 'metrics_members' """
return norm_string(col)
def import_csv(csv_filepath: Path, table_name, conn, csv_delimiter, col_type):
""" Converts CSV file """
if not csv_filepath.exists():
sys.stderr.write('CSV file [{}] not found!\n'.format(str(csv_filepath)))
return
def type_row_values(row, col_type, row_id, headers):
""" Converts some row values from string to INTEGER or REAL """
if not col_type:
return row
typed_row = []
for i, val in enumerate(row):
if i in col_type:
if col_type[i] == 'INTEGER':
try:
val = int(val) if val != '' else 0
except ValueError:
print("Line {}: {}: Incorrect integer value [{}]".format(row_id, headers[i], val))
val = 0
elif col_type == 'REAL':
try:
val = float(val) if val != '' else 0.0
except ValueError:
print("Line {}: {}: Incorrect float value [{}]".format(row_id, headers[i], val))
val = 0.0
typed_row.append(val)
return typed_row
print('Importing {} into table {}...'.format(str(csv_filepath), table_name))
cur = conn.cursor()
cur.execute('BEGIN TRANSACTION')
row_buffer = []
sql_batch_rows_nb = 100
sth = None
# Reads CSV line by line
with csv_filepath.open(mode='rt', encoding='utf-8') as csv_fd:
reader = csv.reader(csv_fd, delimiter=csv_delimiter)
header = None
for cp, row in enumerate(reader):
# Header line
if header is None:
header = row
# row headers
headers = [{'orig': col, 'name': norm_col_name(col)} for col in header]
# Display info on typed cols!!!
if col_type:
for index, sqltype in col_type.items():
if index >= len(header):
print(" Unknown column #{} to treat as {}, stopping here.".format(index, sqltype))
sys.exit(1)
else:
print(' Column {} is treated as {}'.format(header[index], sqltype))
# Creates table
sth = SqliteTableHelper(cur, table_name, headers, col_type)
sth.create_table()
continue
# 'normal' row
if len(row) == len(header):
row_buffer.append(type_row_values(row, col_type, cp, header))
else:
print('WARN: bad number of columns on row #{}, ignoring data'.format(cp), file=sys.stderr)
# Flush?
sth.flush_insert(row_buffer, cp != 0 and cp % sql_batch_rows_nb == 0)
# Flush last buffered row values
sth.flush_insert(row_buffer, True)
conn.commit()
return sth.inserted_rows_nb()
def main():
""" Converts CSV files to SQLite """
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('csv_file', type=Path, help='CSV file to import')
parser.add_argument('db_file', type=Path, help='SQLite DB file to create or populate')
parser.add_argument('--csv_delimiter', help="CSV delimiter", choices=DELIMITER_NAME_TO_CHAR.keys(), default='comma')
parser.add_argument('--table_name', help='SQL table name (otherwise use CSV file name)')
parser.add_argument(
'--integer_cols', help='Indexes (starting at 0 and separated by ",") of columns containing integer data')
parser.add_argument(
'--float_cols', help='Indexes (starting at 0 and separated by ",") of columns containing float data')
args = parser.parse_args()
# To avoid "field larger than field limit (131072)" error
csv.field_size_limit(sys.maxsize)
if not args.csv_file.exists():
parser.error('CSV file [{}] not found'.format(str(args.csv_file)))
# DB connection
db_file_name = str(args.db_file)
print('Populating {} database'.format(db_file_name))
conn = sqlite3.connect(db_file_name)
# Table name overloading
table_name = norm_table_name(args.csv_file.stem)
if args.table_name:
table_name = args.table_name
# CSV delimiter
csv_delimiter = DELIMITER_NAME_TO_CHAR[args.csv_delimiter]
col_type = OrderedDict()
# Integer colum indexes
if args.integer_cols:
for index in args.integer_cols.split(','):
col_type[int(index)] = 'INTEGER'
# Float column indexes
if args.float_cols:
for ind_str in args.float_cols.split(','):
index = int(ind_str)
if index in col_type:
parser.error("Column #{} can't be INTEGER and REAL at the same time".format(index))
col_type[index] = 'REAL'
start = timer()
rows_nb = import_csv(args.csv_file, table_name, conn, csv_delimiter, col_type)
end = timer()
# Close
conn.close()
print('Done ({} rows imported in {:.2f}s).'.format(rows_nb, end - start))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import argparse
import collections
import csv
from pathlib import Path
from timeit import default_timer as timer
import pyproj
import ujson as json
from fiona import collection
"""
Extracts content from an ESRI Shapefile to a CSV file converting coordinates to WGS84
"""
DELIMITER_NAME_TO_CHAR = collections.OrderedDict([
('comma', ','),
('semicolon', ';'),
('tab', '\t'),
])
WGS84_EPSG = 'epsg:4326'
class ProjConverter:
""" Converts geometry from given epsg code to 4326 (WGS84) """
def __init__(self, from_proj_dict, coords_decimal_number):
""" Inits class instance with origin projection """
# self.proj1 = pyproj.Proj(**from_proj_dict)
self.proj1 = pyproj.Proj(**from_proj_dict, preserve_units=False)
# self.proj2 = pyproj.Proj(init=self.WGS84_EPSG)
self.proj2 = pyproj.Proj(WGS84_EPSG, preserve_units=False)
self.fmt = '{0:.%df}' % coords_decimal_number
def convert_coords(self, coords):
""" Convert coords """
coords = pyproj.transform(self.proj1, self.proj2, coords[0], coords[1])
coords = (float(self.fmt.format(coords[0])), float(self.fmt.format(coords[1])))
return coords
def convert_coords_list(self, coord_list):
return [pt for pt in pyproj.itransform(self.proj1, self.proj2, coord_list, always_xy=True)]
def convert_geometry(self, geom):
""" Convert geometry and return"""
coords = []
if geom['type'] == 'Polygon':
for ring in geom['coordinates']:
coords.append(self.convert_coords_list(ring))
elif geom['type'] == 'MultiPolygon':
for p in geom['coordinates']:
for ring in p:
coords.append(self.convert_coords_list(ring))
else:
import sys
sys.stderr.write('WTF geomtype [{}]?\n'.format(geom['type']))
sys.exit(1)
return coords
def glue(values, sep):
""" converts values into string values and join with given separator
transforms ['foo', 4.5, 8, 'baz'], '-' into 'foo-4.5-8-baz' """
return sep.join(map(str, values))
def convert(shp_file: Path, csv_file: Path, csv_delimiter, with_geometry, nb_decimals):
""" Generates CSV file from Shapefile """
proj_converter = None
with collection(str(shp_file), 'r') as input:
if with_geometry:
proj_converter = ProjConverter(input.crs, nb_decimals)
with csv_file.open(mode='wt', encoding='utf-8') as csv_fd:
header = False
features_nb = 0
for feature in input:
props = feature['properties']
# Header row
if not header:
header_cols = [k.upper() for k in props]
if with_geometry:
header_cols.append('GEOMETRY')
csv_fd.write('{}\n'.format(csv_delimiter.join(header_cols)))
header = True
# feature row
row_values = list(map(str, props.values()))
if with_geometry:
geom = feature['geometry']
geom['coordinates'] = proj_converter.convert_geometry(feature['geometry'].copy())
row_values.append(json.dumps(geom))
csv_fd.write('{}\n'.format(csv_delimiter.join(row_values)))
features_nb += 1
return features_nb
def main():
""" Dump SHP file content """
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('shp_file', type=Path, help='ESRI Shapefile to read')
parser.add_argument('csv_file', type=Path, help='CSV file to write')
parser.add_argument('--csv_delimiter', help="CSV delimiter", choices=DELIMITER_NAME_TO_CHAR.keys(), default='comma')
parser.add_argument('--without_geometry', help="don't export geometry information", action='store_true')
parser.add_argument('--coords_decimals_nb', type=int, help='decimals nb for geo coordinates', default=4)
args = parser.parse_args()
assert args.shp_file.is_file()
print('Converting {} into {}'.format(str(args.shp_file), str(args.csv_file)))
start = timer()
features_nb = convert(args.shp_file, args.csv_file,
DELIMITER_NAME_TO_CHAR[args.csv_delimiter], not args.without_geometry,
args.coords_decimals_nb)
end = timer()
print('Done ({} features exported in {:.2f}s).'.format(features_nb, end - start))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Some common information shared by several scripts
"""
# Tables and field names reference
SQL_CONSTANTS = {
'orga_metrics_table': 'orga_metrics',
'orga_metrics_siren_col': 'siren',
'orga_metrics_source_col': 'source',
'orga_metrics_met_type_col': 'met_type',
'orga_metrics_met_value_col': 'met_value',
'orga_geometry_table': 'orga_geometry',
'orga_geometry_code_col': 'code',
'orga_geometry_coords_col': 'coords',
'orga_geometry_nom_col': 'nom',
'orga_geometry_siren_col': 'siren',
'orga_geometry_source_col': 'source',
'ref_org_table': 'ref_org',
'ref_org_siren_col': 'siren',
'ref_org_name_col': 'nom',
'ref_org_type_col': 'type',
'ref_org_dg_org_id_col': 'id_datagouv',
'ref_org_ods_org_id_col': 'id_ods',
'dg_org_table': 'data_gouv_organizations',
'dg_org_id_col': 'id',
'dg_org_siren_col': '_siren',
'dg_org_url_col': 'url',
'dg_ds_table': 'data_gouv_datasets',
'dg_ds_org_id_col': 'organization_id',
'dg_ds_siren_col': '_siren',
'dg_ds_metric_views_col': 'metric_views',
'dg_ds_license_col': 'license',
'dg_res_table': 'data_gouv_resources',
'dg_res_org_id_col': 'dataset_organization_id',
'dg_res_siren_col': '_siren',
'dg_res_downloads_col': 'downloads',
'dg_res_filesize_col': 'filesize',
'dg_res_format_col': 'format',
'dg_res_created_at_col': 'created_at',
'dg_res_modified_col': 'modified',
'odf_org_table': 'data_orga',
'odf_org_siren_col': 'siren',
'odf_org_nom_col': 'nom',
'odf_org_regcode_col': 'regcode',
'odf_org_depcode_col': 'depcode',
'odf_org_dg_org_id_col': 'id_datagouv',
'odf_org_dg_url_col': 'url_datagouv',
'odf_org_ptf_url_col': 'url_ptf',
'odf_org_ptf_nb_col': 'nb_ptf',
'odf_org_ods_org_id_col': 'id_ods',
'odf_org_type_col': 'type',
'odf_org_lat_col': 'lat',
'odf_org_long_col': 'long',
'ods_catalog_table': 'ods_catalog',
'ods_catalog_license_col': 'default_license',
'ods_catalog_def_modified_col': 'default_modified',
'ods_catalog_def_metadata_processed_col': 'default_metadata_processed',
'ods_monitoring_table': 'ods_monitoring',
'ods_monitoring_download_count_col': 'download_count',
'ods_monitoring_records_count_col': 'records_count',
}
#!/usr/bin/env python3
"""
Creates a table orga_metrics in the db
and fills it with information harvester from different provider
Warning: If you add a new provider, don't forget to update
fill_opendatasoft metrics function WHERE_CRITERIA string
"""