Automate Data Processing
The Workflow
The following script will take care of the following 15 steps:
- Select current processing unit
- Load and find corresponding buildings for the selected unit
- Derive processing extent
- Get building subset from point cloud data
- Query DTM from raster catalogue
- Group building points into clumps
- Find planar segments in the building point cloud
- Calculate slope of planar patches
- Filter out steep and small segments
- Assign roof segments to buildings
- Generate building models
- Compute building statistics
- Join attributes from input building footprint layer
- Export to CityJSON
- Export to CityGML
Final Code
import os
import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *
@lis_cmd.proc_function_wrapper
def process_unit(unit_index):
processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.TILESHAPE_250
)
if not processing_units.is_Valid():
return lis_cmd.Error_Return("Loading processing_units failed")
buildings_used = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.BUILDING_FOOTPRINTS_USED
)
if not buildings_used.is_Valid():
return lis_cmd.Error_Return(
f"Loading building footprints {cfg.BUILDING_FOOTPRINTS_USED} failed"
)
# -------------------------------------------------------------------------
# get unit to process
proc_unit = processing_units.Get_Shape(unit_index)
xmin = proc_unit.Get_Extent().Get_XMin()
ymin = proc_unit.Get_Extent().Get_YMin()
# units are referenced by their lower-left corner coordinate
unit_id = f"{int(xmin)}_{int(ymin)}"
lis_cmd.Message_Print(f"\nProcessing unit {unit_id} \n")
# -------------------------------------------------------------------------
# select current unit
processing_units.Select(unit_index)
# -------------------------------------------------------------------------
# copy current unit to new layer
current_unit = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
INPUT=processing_units, OUTPUT=current_unit
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# select buildings with their centroid in current processing unit
if not shapes_tools.Select_by_Location(
SHAPES=buildings_used,
LOCATIONS=current_unit,
CONDITION="have their centroid in",
METHOD="new selection",
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# copy selected buildings to new shapes layer
current_footprints = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
INPUT=buildings_used, OUTPUT=current_footprints
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# compute extent of current buildings
current_extent = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Get_Shapes_Extents(
SHAPES=current_footprints, EXTENTS=current_extent, OUTPUT="all shapes"
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# query building points from point cloud catalogue
# will be filled with the loaded point cloud subset of type 'saga_api.CSG_PointCloud'
pc_out_list = []
if not lis_virtual.Get_Subset_from_Virtual_LASLAZ(
AOI_SHP=current_extent,
PC_OUT=pc_out_list,
FILENAME=cfg.LASVF,
FILEPATH="",
COPY_ATTR=True,
CONSTRAIN_QUERY=True,
ATTR_FIELD="classification",
VALUE_RANGE="6;6",
FILE_COMPRESSION=True,
ERROR_IF_EMPTY=False,
THIN_OUT=False,
FIELD_TILENAME="<not set>",
AOI_XRANGE="0; 0",
AOI_YRANGE="0; 0",
AOI_ADD_OVERLAP=True,
OVERLAP=cfg.OVERLAP,
SKIP_EMPTY_AOIS=False,
FILENAME_TILE_INFO="",
METHOD_CLIP="lower and left boundary is inside",
ONE_PC_PER_POLYGON=False,
):
return lis_cmd.Error_Return("Failed to execute tool")
if len(pc_out_list) < 1:
return lis_cmd.Error_Return(
f"Processing unit {unit_id} contains no point cloud data, skipping!"
)
# add the point cloud dataset to the data manager for easier memory management
building_pts = (
saga_api.SG_Get_Data_Manager().Add(pc_out_list[0]).asPointCloud()
)
if building_pts.Get_Count() < 5:
return lis_cmd.Error_Return(
f"Processing unit {unit_id} has less than 5 points, skipping!"
)
# -------------------------------------------------------------------------
# query dtm from raster catalogue
# this list will hold a single DTM grid of type 'saga_api.CSG_Grid'
grids_list = []
if not io_gdal.Import_Raster(
EXTENT_SHAPES=current_extent,
GRIDS=grids_list,
FILES=cfg.VRT,
MULTIPLE="single grids",
TRANSFORM=True,
RESAMPLING="Nearest Neighbour",
EXTENT="shapes extent",
EXTENT_BUFFER=cfg.OVERLAP,
):
return lis_cmd.Error_Return("Failed to execute tool")
if len(grids_list) < 1:
return lis_cmd.Error_Return(
f"Processing unit {unit_id} contains no grid data, skipping!"
)
# add the grid to the data manager for easier memory management
dtm = saga_api.SG_Get_Data_Manager().Add(grids_list[0]).asGrid()
# -------------------------------------------------------------------------
# group building clumps
if not lis_segmentation.Densitybased_Spatial_Clustering(
PC_IN=building_pts,
FIELD_ATTR_1="X",
FIELD_ATTR_2="Y",
FIELD_ATTR_3="Z",
FIELD_CONSTRAINT="<not set>",
COPY_ATTR=True,
ATTR_SUFFIX="bld",
CHOICE_DIM="3-dimensional",
SEARCH_RADIUS=cfg.RADIUS_CLUSTER,
MIN_POINTS=cfg.MIN_POINTS_CLUSTER,
CLUSTERSIZE=False,
FILTER_BY_SIZE=True,
MIN_CLUSTERSIZE=cfg.MIN_CLUSTERSIZE,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# find planar patches
if not lis_segmentation.Clump_Segmentation(
PC_IN=building_pts,
FIELD_CLUMPID="cluster_bld",
FIELD_SEED="<not set>",
FIELD_THRES="<not set>",
FIELD_DIRSCAN_X="<not set>",
COPY_ATTR=True,
ATTR_SUFFIX="roof",
TILESIZE=cfg.TILESIZE_SEGMENTATION,
OVERLAP=cfg.OVERLAP_SEGMENTATION,
SEGMENT_MINSIZE=cfg.SEGMENT_MINSIZE,
ROBUST_DIST_THRES=cfg.ROBUST_DIST_THRES,
RANSAC_MODEL_POINT_DISTANCE="0.02; 2.00",
RANSAC_ITER=cfg.RANSAC_ITER,
RANSAC_SEED=cfg.RANSAC_SEED,
REGION_RADIUS=cfg.REGION_RADIUS,
REGION_MINSIZE=cfg.REGION_MINSIZE,
REGION_MAXANGLE=cfg.REGION_MAXANGLE,
REGION_MAXOFFSET=cfg.REGION_MAXOFFSET,
SEGSIZE=True,
NORMAL=False,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# compute slope angle per roof segment
if not lis_analysis.Segment_Features(
PC_IN=building_pts,
FIELD_SEGMENTID="segmentid_roof",
FIELD_DIRSCAN_X="<not set>",
COPY_ATTR=True,
ATTR_SUFFIX="roof",
MAX_EDGELENGTH=1.500000,
SLOPE=True,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# filter roof segments by size and slope
buildings_filtered = saga_api.SG_Get_Data_Manager().Add_PointCloud()
if not lis_filtering.Attribute_Filter(
PC_IN=building_pts,
PC_OUT=buildings_filtered,
FIELD_ATTR_1="segmentsize_roof",
FIELD_ATTR_2="slope_roof",
FIELD_ATTR_3="<not set>",
COPY_ATTR=True,
METHOD="drop points",
TYPE_1="lower limit (min)",
MIN_1=cfg.FILTER_MIN_SEGSIZE,
TYPE_2="upper limit (max)",
MAX_2=cfg.FILTER_MAX_SLOPE,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# assign roof surfaces to buildings
if not lis_city_modeller.Assign_Roof_Segments_to_Buildings(
PC_IN=buildings_filtered,
SHP_IN=current_footprints,
FIELD_ROOFID="segmentid_roof",
FIELD_BLOCKED="<not set>",
COPY_ATTR=True,
FIELD_BUILDINGID="proc_id",
FIELD_LABEL_FLAG="<not set>",
ATTR_SUFFIX="",
MIN_PERCENTAGE_INSIDE=cfg.PERCENTAGE_INSIDE,
COVERAGE_FILTER=False,
FLAG_SEGMENTS=False,
REMOVE_UNCONNECTED=False,
ID_UNDEF=-1,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# generate building models
shp_lod2 = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not lis_city_modeller.LOD2_Building_Modeller(
PC_IN=buildings_filtered,
GRID_DTM=dtm,
SHP_GROUND_PLAN=current_footprints,
SHP_LOD2=shp_lod2,
FIELD_BUILDINGID="proc_id",
FIELD_ROOFID="segmentid_roof_relabel",
FIELD_BUILDINGID_G="proc_id",
FIELD_BEDPLATE_G="<not set>",
MAX_SLOPE=cfg.MODEL_MAX_SLOPE,
MIN_POINTS_ROOF=cfg.MIN_POINTS_ROOF,
MAX_EDGELENGTH=cfg.MAX_EDGELENGTH,
MAX_LINEDIST=cfg.MAX_LINEDIST,
SNAP_DIST=cfg.SNAP_DIST,
METHOD_GAP_FILLING="roof plane minimum height",
ITERATIONS_ROOF_GAPS=cfg.ITERATIONS_ROOF_GAPS,
MIN_BUILDING_HEIGHT=cfg.MIN_BUILDING_HEIGHT,
RECTIFY_SMALL_POLYGONS=True,
MAX_AREA_RECTIFY=cfg.MAX_AREA_RECTIFY,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# calculate building statistics
models_out = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not lis_city_modeller.LOD2_Building_Statistics(
SHP_BUILDINGS=shp_lod2,
GRID_DTM=dtm,
SHP_STATS=models_out,
FIELD_BUILDINGID="proc_id",
FIELD_SURFACECLASS="surfaceClass",
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# join attributes of input building footprint layer
if not table_tools.Join_Attributes_from_a_Table(
TABLE_A=models_out,
TABLE_B=current_footprints,
ID_A="proc_id",
ID_B="proc_id",
FIELDS_ALL=True,
KEEP_ALL=True,
CMP_CASE=True,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# save model as shapefile
models_out.Get_Projection().Create(saga_api.CSG_String(cfg.PROJ))
models_out_file = os.path.join(cfg.SHAPES_DIR, f"{unit_id}.shp")
models_out.Save(models_out_file)
# -------------------------------------------------------------------------
# export model as cityjson file
json_file = os.path.join(cfg.JSON_DIR, f"{unit_id}.json")
if not lis_city_modeller.Export_Shapes_to_CityJSON(
SHP_IN=models_out,
FIELD_OBJECTID="proc_id",
FIELD_SURFACECLASS="surfaceClass",
FIELD_HEIGHT="measuredHeight",
FIELDS_CITYOBJECT="roofHgtMax",
FIELDS_POLYFACE="roofSlope;roofAspect",
FIELD_EXTERNALID=cfg.EXTERNALID,
FILE=json_file,
VERSION="2.0.1",
CITY_OBJECT="Building",
LOD=2,
EPSG=cfg.EPSG,
DATE="2024-07-10",
INFO_SYSTEM_EXTERNAL=cfg.EXTERNALID,
GEOMETRY_TYPE="Solid",
CLEAN_SOLID=True,
SNAP_TOL=0.001,
COORD_PRECISION=3,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# export model as citygml file
gml_file = os.path.join(cfg.GML_DIR, f"{unit_id}.gml")
if not lis_city_modeller.Export_Shapes_to_CityGML(
SHP_IN=models_out,
FIELD_OBJECTID="proc_id",
FIELD_SURFACECLASS="surfaceClass",
FIELD_HEIGHT="measuredHeight",
FIELDS_CITYOBJECT="roofHgtMax",
FIELDS_POLYFACE="roofSlope;roofAspect",
FIELD_EXTERNALID=cfg.EXTERNALID,
FILE=gml_file,
CITY_OBJECT="Building",
LOD=2,
DATE="2024-07-10",
EPSG=str(cfg.EPSG),
INFO_SYSTEM_EXTERNAL=cfg.EXTERNALID,
GEOMETRY_TYPE="Solid",
COORD_PRECISION=3,
):
return lis_cmd.Error_Return("Failed to execute tool")
# -------------------------------------------------------------------------
# free memory ressources
saga_api.SG_Get_Data_Manager().Delete()
return True
if __name__ == "__main__":
# -----------------------------------------------------------------------------
# start the logging
starttime = lis_cmd.Start_Logging(False)
# -----------------------------------------------------------------------------
# load the shapefile with the processing units
processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.TILESHAPE_250
)
if not processing_units.is_Valid():
lis_cmd.Error_Exit(
"Loading processing_units failed",
starttime,
True,
)
lis_cmd.Message_Print(
f"Number of units to process: {processing_units.Get_Count()} \n\n"
)
# -------------------------------------------------------------------------
# execute the process_unit function in parallel (each unit on a
# CPU core) with the maximum number of available CPU cores
num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()
num_processing_units = processing_units.Get_Count()
lis_cmd.Run_Parallel_Tasks(process_unit, num_processing_units, num_threads)
# -------------------------------------------------------------------------
# free memory and stop logging
saga_api.SG_Get_Data_Manager().Delete()
lis_cmd.Stop_Logging(starttime)Create a Run Script
In your scripts folder, create a new Python script called “run_script.py” and add the following code to the script!
from subprocess import run
import os
import sys
import argparse
LOG_DIR = "../log"
def parse_args():
parser = argparse.ArgumentParser(
description="run a script with automated logging."
)
parser.add_argument(
"scripts",
type=str,
nargs="+",
help="the script(s) to execute, e.g. 'my_script.py' or 'my_script0.py my_script1.py my_script2.py'",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
for script in args.scripts:
scriptname, fileext = os.path.splitext(script)
std_log = os.path.join(LOG_DIR, f"{scriptname}.std.log")
err_log = os.path.join(LOG_DIR, f"{scriptname}.err.log")
if fileext == ".py":
command = f"{sys.executable} -u {script} > {std_log} 2> {err_log}"
print("running: " + command)
run(command, universal_newlines=True, shell=True)
else:
print(f"Skipping {script} (unknown file extension)")Run the Script Using the Runscript
Run the tool in the Miniforge Prompt using the command
python run_script.py 01_data_prepararation 02_data_processing.py