Source code for nlmod.read.bgt

import json
import os
import time
import warnings
from io import BytesIO
from typing import Dict
from xml.etree import ElementTree
from zipfile import ZipFile

import geopandas as gpd
import numpy as np
import pandas as pd
import requests
from shapely.geometry import LineString, MultiPolygon, Point, Polygon

from nlmod import NLMOD_DATADIR

from .. import cache
from ..util import extent_to_polygon


[docs] def get_bgt(*args, **kwargs): """Get geometries within an extent or polygon from the Basis Registratie Grootschalige Topografie (BGT) .. deprecated:: 0.10.0 `get_bgt` will be removed in nlmod 1.0.0, it is replaced by `download_bgt` because of new naming convention https://github.com/gwmod/nlmod/issues/47 Parameters ---------- extent : list or tuple of length 4 or shapely Polygon The extent (xmin, xmax, ymin, ymax) or polygon for which shapes are requested. layer : string or list of strings, optional The layer(s) for which shapes are requested. When layer is "all", all layers are requested. The default is "waterdeel". cut_by_extent : bool, optional Only return the intersection with the extent if True. The default is True make_valid : bool, optional Make geometries valid by appying a buffer of 0 m when True. The default is False. fname : string, optional Save the zipfile that is received by the request to file. The default is None, which does not save anything to file. geometry: string, optional When geometry is specified, this attribute is used as the geometry of the resulting GeoDataFrame. Some layers have multiple geometry-attributes. An example is the layer 'pand', where each buidling (polygon) also contains a Point-geometry for the label. When geometry is None, the last attribute starting with the word "geometrie" is used as the geometry. The default is None. remove_expired: bool, optional Remove expired items (that contain a value for 'eindRegistratie') when True. The default is True. add_bronhouder_names: bool, optional Add bronhouder in a column called 'bronhouder_name. names when True. The default is True. timeout: int optional The amount of time in seconds to wait for the server to send data before giving up. The default is 1200 (20 minutes). Returns ------- gdf : GeoPandas GeoDataFrame or dict of GeoPandas GeoDataFrame A GeoDataFrame (when only one layer is requested) or a dict of GeoDataFrames containing all geometries and properties. """ warnings.warn( "this function is deprecated and will eventually be removed, " "please use nlmod.read.bgt.download_bgt() in the future.", DeprecationWarning, ) return download_bgt(*args, **kwargs)
[docs] @cache.cache_pickle def download_bgt( extent, layer="waterdeel", cut_by_extent=True, make_valid=False, fname=None, geometry=None, remove_expired=True, add_bronhouder_names=True, timeout=1200, ): """Get geometries within an extent or polygon from the Basis Registratie Grootschalige Topografie (BGT) Parameters ---------- extent : list or tuple of length 4 or shapely Polygon The extent (xmin, xmax, ymin, ymax) or polygon for which shapes are requested. layer : string or list of strings, optional The layer(s) for which shapes are requested. When layer is "all", all layers are requested. The default is "waterdeel". cut_by_extent : bool, optional Only return the intersection with the extent if True. The default is True make_valid : bool, optional Make geometries valid by appying a buffer of 0 m when True. The default is False. fname : string, optional Save the zipfile that is received by the request to file. The default is None, which does not save anything to file. geometry: string, optional When geometry is specified, this attribute is used as the geometry of the resulting GeoDataFrame. Some layers have multiple geometry-attributes. An example is the layer 'pand', where each buidling (polygon) also contains a Point-geometry for the label. When geometry is None, the last attribute starting with the word "geometrie" is used as the geometry. The default is None. remove_expired: bool, optional Remove expired items (that contain a value for 'eindRegistratie') when True. The default is True. add_bronhouder_names: bool, optional Add bronhouder in a column called 'bronhouder_name. names when True. The default is True. timeout: int optional The amount of time in seconds to wait for the server to send data before giving up. The default is 1200 (20 minutes). Returns ------- gdf : GeoPandas GeoDataFrame or dict of GeoPandas GeoDataFrame A GeoDataFrame (when only one layer is requested) or a dict of GeoDataFrames containing all geometries and properties. """ if layer == "all": layer = get_bgt_layers() if isinstance(layer, str): layer = [layer] api_url = "https://api.pdok.nl" url = f"{api_url}/lv/bgt/download/v1_0/full/custom" body = {"format": "citygml", "featuretypes": layer} if isinstance(extent, Polygon): polygon = extent else: polygon = extent_to_polygon(extent) body["geofilter"] = polygon.wkt headers = {"content-type": "application/json"} response = requests.post( url, headers=headers, data=json.dumps(body), timeout=timeout ) # 20 minutes # check api-status, if completed, download if response.status_code in range(200, 300): running = True href = response.json()["_links"]["status"]["href"] url = f"{api_url}{href}" while running: response = requests.get(url, timeout=timeout) if response.status_code in range(200, 300): status = response.json()["status"] if status == "COMPLETED": running = False else: time.sleep(2) else: running = False else: msg = f"Download of bgt-data failed: {response.text}" raise (Exception(msg)) href = response.json()["_links"]["download"]["href"] response = requests.get(f"{api_url}{href}", timeout=timeout) if fname is not None: with open(fname, "wb") as file: file.write(response.content) zipfile = BytesIO(response.content) gdf = read_bgt_zipfile( zipfile, geometry=geometry, cut_by_extent=cut_by_extent, make_valid=make_valid, extent=polygon, remove_expired=remove_expired, add_bronhouder_names=add_bronhouder_names, ) if len(layer) == 1: gdf = gdf[layer[0]] return gdf
[docs] def read_bgt_zipfile( fname, geometry=None, files=None, cut_by_extent=True, make_valid=False, extent=None, remove_expired=True, add_bronhouder_names=True, ): """Read data from a zipfile that was downloaded using get_bgt(). Parameters ---------- fname : string The filename of the zip-file containing the BGT-data. geometry : str, optional DESCRIPTION. The default is None. files : string of list of strings, optional The files to read from the zipfile. Read all files when files is None. The default is None. cut_by_extent : bool, optional Cut the geoemetries by the supplied extent. When no extent is supplied, cut_by_extent is set to False. The default is True. make_valid : bool, optional Make geometries valid by appying a buffer of 0 m when True. THe defaults is False. extent : list or tuple of length 4 or shapely Polygon The extent (xmin, xmax, ymin, ymax) or polygon by which the geometries are clipped. Only used when cut_by_extent is True. The defult is None. remove_expired: bool, optional Remove expired items (that contain a value for 'eindRegistratie') when True. The default is True. add_bronhouder_names: bool, optional Add bronhouder in a column called 'bronhouder_name. names when True. The default is True. Returns ------- gdf : dict of GeoPandas GeoDataFrame A dict of GeoDataFrames containing all geometries and properties. """ zf = ZipFile(fname) gdf = {} if files is None: files = zf.namelist() elif isinstance(files, str): files = [files] if extent is None: cut_by_extent = False else: if isinstance(extent, Polygon): polygon = extent else: polygon = extent_to_polygon(extent) for file in files: key = file[4:-4] gdf[key] = read_bgt_gml(zf.open(file), geometry=geometry) if remove_expired and gdf[key] is not None and "eindRegistratie" in gdf[key]: # remove double features # by removing features with an eindRegistratie gdf[key] = gdf[key][gdf[key]["eindRegistratie"].isna()] if make_valid and isinstance(gdf[key], gpd.GeoDataFrame): gdf[key].geometry = gdf[key].make_valid() if cut_by_extent and isinstance(gdf[key], gpd.GeoDataFrame): gdf[key].geometry = gdf[key].intersection(polygon) gdf[key] = gdf[key][~gdf[key].is_empty] if add_bronhouder_names: bgt_bronhouder_names = get_bronhouder_names() for gdf_layer in gdf.values(): if gdf_layer is None or "bronhouder" not in gdf_layer.columns: continue gdf_layer["bronhouder_name"] = gdf_layer["bronhouder"].map( bgt_bronhouder_names ) return gdf
[docs] def read_bgt_gml(fname, geometry="geometrie2dGrondvlak", crs="epsg:28992"): def get_xy(text): xy = [float(val) for val in text.split()] xy = np.array(xy).reshape(int(len(xy) / 2), 2) return xy def get_ring_xy(exterior): ns = "{http://www.opengis.net/gml}" assert len(exterior) == 1 if exterior[0].tag == f"{ns}LinearRing": lr = exterior.find(f"{ns}LinearRing") xy = get_xy(lr.find(f"{ns}posList").text) elif exterior[0].tag == f"{ns}Ring": cm = exterior.find(f"{ns}Ring").find(f"{ns}curveMember") xy = read_curve(cm.find(f"{ns}Curve")) else: raise Exception(f"Unknown exterior type: {exterior[0].tag}") return xy def read_polygon(polygon): ns = "{http://www.opengis.net/gml}" exterior = polygon.find(f"{ns}exterior") shell = get_ring_xy(exterior) holes = [] for interior in polygon.findall(f"{ns}interior"): holes.append(get_ring_xy(interior)) return shell, holes def read_point(point): ns = "{http://www.opengis.net/gml}" xy = [float(x) for x in point.find(f"{ns}pos").text.split()] return xy def read_curve(curve): xy = np.empty((0, 2)) for segment in curve.find(f"{ns}segments"): xy = np.vstack((xy, get_xy(segment.find(f"{ns}posList").text))) return xy def read_linestring(linestring): return get_xy(linestring.find(f"{ns}posList").text) def read_label(child, d): ns = "{http://www.geostandaarden.nl/imgeo/2.1}" label = child.find(f"{ns}Label") d["label"] = label.find(f"{ns}tekst").text positie = label.find(f"{ns}positie").find(f"{ns}Labelpositie") xy = read_point( positie.find(f"{ns}plaatsingspunt").find( "{http://www.opengis.net/gml}Point" ) ) d["label_plaatsingspunt"] = Point(xy) d["label_hoek"] = float(positie.find(f"{ns}hoek").text) tree = ElementTree.parse(fname) ns = "{http://www.opengis.net/citygml/2.0}" data = [] for com in tree.findall(f".//{ns}cityObjectMember"): assert len(com) == 1 bp = com[0] d = {} for key in bp.attrib: d[key.split("}", 1)[1]] = bp.attrib[key] for child in bp: key = child.tag.split("}", 1)[1] if len(child) == 0: d[key] = child.text else: if key == "identificatie": ns = "{http://www.geostandaarden.nl/imgeo/2.1}" loc_id = child.find(f"{ns}NEN3610ID").find(f"{ns}lokaalID") d[key] = loc_id.text elif key.startswith("geometrie"): if geometry is None: geometry = key assert len(child) == 1 ns = "{http://www.opengis.net/gml}" if child[0].tag == f"{ns}MultiSurface": ms = child.find(f"{ns}MultiSurface") polygons = [] for sm in ms: assert len(sm) == 1 polygon = sm.find(f"{ns}Polygon") polygons.append(read_polygon(polygon)) d[key] = MultiPolygon(polygons) elif child[0].tag == f"{ns}Polygon": d[key] = Polygon(*read_polygon(child[0])) elif child[0].tag == f"{ns}Curve": d[key] = LineString(read_curve(child[0])) elif child[0].tag == f"{ns}LineString": d[key] = LineString(read_linestring(child[0])) elif child[0].tag == f"{ns}Point": d[key] = Point(read_point(child[0])) else: raise (ValueError((f"Unsupported tag: {child[0].tag}"))) elif key == "nummeraanduidingreeks": ns = "{http://www.geostandaarden.nl/imgeo/2.1}" nar = child.find(f"{ns}Nummeraanduidingreeks").find( f"{ns}nummeraanduidingreeks" ) read_label(nar, d) elif key.startswith("kruinlijn"): ns = "{http://www.opengis.net/gml}" if child[0].tag == f"{ns}LineString": ls = child.find(f"{ns}LineString") d[key] = LineString(read_linestring(ls)) elif child[0].tag == f"{ns}Curve": d[key] = LineString(read_curve(child[0])) else: raise (ValueError((f"Unsupported tag: {child[0].tag}"))) elif key == "openbareRuimteNaam": read_label(child, d) else: raise (KeyError((f"Unknown key: {key}"))) data.append(d) if len(data) > 0: if geometry is None: return pd.DataFrame(data) else: return gpd.GeoDataFrame(data, geometry=geometry, crs=crs) else: return None
[docs] def get_bgt_layers(timeout=1200): """ Get the layers in the Basis Registratie Grootschalige Topografie (BGT) Parameters ---------- timeout: int optional The amount of time in seconds to wait for the server to send data before giving up. The default is 1200 (20 minutes). Returns ------- list A list with the layer names. """ url = "https://api.pdok.nl/lv/bgt/download/v1_0/dataset" resp = requests.get(url, timeout=timeout) data = resp.json() return [x["featuretype"] for x in data["timeliness"]]
[docs] def get_bronhouder_names() -> Dict[str, str]: """Get the names of the bronhouders of the BGT data. Returns ------- dict A dictionary with the bronhouder codes as keys and the names as values. Notes ----- The bronhouder names are retrieved from https://www.kadaster.nl/-/bgt-bronhoudercodes. The `Toelichting` sheet in the .ods file gives the changes compared to the old file. If changes are made, please add them manually to the bgt_bronhouder_names dictionary. A test is added for to check if the dictionary is up to date with the latest file from the Kadaster up to date with the .ods file from 2024-01-01. """ fname = os.path.join(NLMOD_DATADIR, "bgt", "bronhouder_names.json") with open(fname, "r", encoding="utf-8") as fo: bgt_bronhouder_names = json.load(fo) return bgt_bronhouder_names
[docs] def _update_bronhouder_json(fname): """ "Update the json-file within nlmod with bronhouder-names Parameters ---------- fname : str The path and name of a file with bronhouder-names, downloaded from https://www.kadaster.nl/-/bgt-bronhoudercodes. """ sheet_names = [ "Gemeenten", "Provincies", "Waterschappen", "Landelijke_organisaties", ] data = {} for sheet_name in sheet_names: if sheet_name == "Gemeenten": skiprows = 0 else: skiprows = 1 df = pd.read_excel(fname, sheet_name=sheet_name, skiprows=skiprows) index_col = "BGT code" if index_col not in df.columns: index_col = "BGT" df["Naam"] = df["Naam"].str.strip() data.update(df.set_index(index_col)["Naam"].to_dict()) json_fname = os.path.join(NLMOD_DATADIR, "bgt", "bronhouder_names.json") with open(json_fname, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=0) f.write("\n")