Automate ALS Forestry Analysis - Data Processing
Recap
In the previous section Automate ALS Forestry Analysis - Data Preparation, we have
- Created a project configuration that defines directories, filenames and tool parameters
- Created a data preparation script that
- Creates a filelist
- Creates LAS/LAZ Index
- Creates a point cloud catalog (lasvf) for the available point clouds
- Creates a tiling scheme for processing units with 100 m x 100 m size
- Selects processing units that are covered by the AOI
- Saves these selected units to disk (as
cfg.TILESHAPE_100)
This tutorial section assumes that you have completed the section Automate ALS Forestry Analysis - Data Preparation
If you have not yet setup LIS Pro 3D for usage with Python (specifically the PySAGA api), please have a look at our Automation Guide and specifically the section Installation of LIS Pro 3D with Python. There, you will find a step-by-step description on how to setup your development environment!
Remember the directory structure:
project # top-level folder
├── data
├── scripts
| ├── project_config.py
| ├── 01_data_preparation.py
│ └── 02_data_processing.py
├── processing
└── ...
Open 02_data_processing.py in VS Code.
Differences Compared to GUI Workflow
In the following fully automated workflow there are some differences compared to the workflow performed in the GUI.
Unit-Wise Processing
In the GUI part of the tutorial (section Forestry Analysis), we have manually applied the tools to the entire point cloud at once. This worked because the point cloud was small enough to fit into memory. For automated processing, it is typically necessary to process the point cloud in smaller units, which is what we will do here. Therefore, we will add code that loads data separately for each of the processing units.
We use the coordinates of the lower left corner (xmin, ymin) to reference individual units.
Parallelization
Parallelization allows you to leverage all cores of your computer and thereby significantly speeds up processing!
The Entire Forestry Analysis Workflow
Additional steps not covered in the GUI workflow are highlighted in bold. The processing script will contain the following 19 steps
- Select current processing unit
- Get subset from available point cloud data with overlap
- Set spatial reference
- Generate DTM
- Close gaps in DTM
- Generate DSM
- Generate NDSM
- Attach DZ Value to point cloud
- Create CHM
- Smooth CHM
- Detect single trees
- Compute single tree metrics
- Attach segmentation to point cloud
- Compute statistics for the tree segment
- Set unique tree id
- Join statistics to single trees
- Select single trees of current processing unit
- Remove overlap from elevation models
- Remove overlap from pointcloud
The overlap is used in order to avoid edge effects and will be removed after processing!
Fill 02_data_processing.py
import os
import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *
@lis_cmd.proc_function_wrapper
def process_unit(unit_index=0):
processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.TILESHAPE_100
)
if not processing_units.is_Valid():
return lis_cmd.Error_Return(
f"Loading processing_units {cfg.TILESHAPE_100} failed"
)
proc_shape = processing_units.Get_Shape(unit_index)
xmin = proc_shape.Get_Extent().Get_XMin()
ymin = proc_shape.Get_Extent().Get_YMin()
# units are referenced by their lower-left corner coordinate
unit_name = f"{int(xmin)}_{int(ymin)}"
lis_cmd.Message_Print(f"\nProcessing unit {unit_name} \n")
# select current unit
processing_units.Select(unit_index)
# copy current unit to new shapes layer
proc_unit = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
INPUT=processing_units, OUTPUT=proc_unit
):
return lis_cmd.Error_Return("Failed to execute tool")
# will become filled after successful execution with output data objects of type 'saga_api.CSG_PointCloud'
pc_out_list = []
if not lis_virtual.Get_Subset_from_Virtual_LASLAZ(
AOI_SHP=proc_unit,
PC_OUT=pc_out_list,
FILENAME=cfg.LASVF,
FILEPATH="",
COPY_ATTR=True,
CONSTRAIN_QUERY=True,
ATTR_FIELD="classification",
VALUE_RANGE="2; 5",
FILE_COMPRESSION=True,
ERROR_IF_EMPTY=False,
THIN_OUT=False,
FIELD_TILENAME="<not set>",
AOI_XRANGE="0; 0",
AOI_YRANGE="0; 0",
AOI_ADD_OVERLAP=True,
OVERLAP=cfg.OVERLAP,
SKIP_EMPTY_AOIS=False,
FILENAME_TILE_INFO="",
METHOD_CLIP="lower and left boundary is inside",
ONE_PC_PER_POLYGON=False,
):
return lis_cmd.Error_Return("Failed to execute tool")
if len(pc_out_list) < 1:
return lis_cmd.Error_Return(
f"Processing unit {unit_name} contains no point cloud data, skipping!"
)
# add the point cloud datasets to the data manager
pointcloud = (
saga_api.SG_Get_Data_Manager().Add(pc_out_list[0]).asPointCloud()
)
if pointcloud.Get_Count() < 5:
return lis_cmd.Error_Return(
f"Processing unit {unit_name} has less than 5 points, skipping ..."
)
# set projection for point cloud
shapes = [
pointcloud
] # Python list with input data objects of type 'saga_api.CSG_Shapes'
if not pj_proj4.Set_Coordinate_Reference_System(
SHAPES=shapes, CRS_STRING="EPSG:2056", CRS_FILE=""
):
return lis_cmd.Error_Return("Failed to execute tool")
# generate dtm
DTM = saga_api.SG_Get_Data_Manager().Add_Grid()
if not lis_conversion.Point_Cloud_to_Grid(
PC_IN=pointcloud,
ZFIELD="Z",
FIELD_FILTERATTR="classification",
METHOD="mean",
USE_ATTR_RANGE=False,
USE_Z_RANGE=False,
USE_RADIUS=False,
FILTER_RANGE="2; 2",
TARGET_DEFINITION="user defined",
TARGET_USER_SIZE=0.500000,
TARGET_USER_XMIN=xmin - cfg.OVERLAP,
TARGET_USER_YMIN=ymin - cfg.OVERLAP,
TARGET_USER_FLAT=True,
TARGET_USER_FITS="cells",
TARGET_OUT_GRID=DTM,
):
return lis_cmd.Error_Return("Failed to execute tool")
DTM.Set_Name(saga_api.CSG_String("DTM"))
# close dtm
if not grid_tools.Close_Gaps(INPUT=DTM, THRESHOLD=0.100000):
return lis_cmd.Error_Return("Failed to execute tool")
# generate dsm
DSM = saga_api.SG_Get_Data_Manager().Add_Grid()
if not lis_conversion.Point_Cloud_to_Grid(
PC_IN=pointcloud,
TARGET_OUT_GRID=DSM,
ZFIELD="Z",
FIELD_FILTERATTR="<not set>",
METHOD="max",
USE_ATTR_RANGE=False,
USE_Z_RANGE=False,
USE_RADIUS=False,
TARGET_DEFINITION="grid or grid system",
TARGET_TEMPLATE=DTM,
):
return lis_cmd.Error_Return("Failed to execute tool")
DSM.Set_Name(saga_api.CSG_String("DSM"))
# generate ndsm
NDSM = saga_api.SG_Get_Data_Manager().Add_Grid()
if not lis_arithmetic.Create_nDSM(
DSM=DSM, DTM=DTM, NDSM=NDSM, CH_NEGATIVE="set to zero"
):
return lis_cmd.Error_Return("Failed to execute tool")
NDSM.Set_Name(saga_api.CSG_String("NDSM"))
# attach dz value to point cloud
if not lis_arithmetic.Height_Above_Ground_from_Grid(
DEM=DTM,
PC_IN=pointcloud,
COPY_ATTR=True,
ATTR_SUFFIX="",
METHOD="vertical grid offset",
):
return lis_cmd.Error_Return("Failed to execute tool")
# generate chm
CHM = saga_api.SG_Get_Data_Manager().Add_Grid()
if not lis_conversion.Point_Cloud_to_Grid(
PC_IN=pointcloud,
TARGET_OUT_GRID=CHM,
ZFIELD="dz",
FIELD_FILTERATTR="classification",
METHOD="max",
USE_ATTR_RANGE=False,
USE_Z_RANGE=False,
USE_RADIUS=False,
FILTER_RANGE="5; 5",
TARGET_DEFINITION="grid or grid system",
TARGET_TEMPLATE=DTM,
):
return lis_cmd.Error_Return("Failed to execute tool")
CHM.Set_Name(saga_api.CSG_String("CHM"))
# catch case when CHM grid is empty (no vegetation points in tile)
if CHM.Get_NoData_Count() == CHM.Get_NCells():
return lis_cmd.Error_Return("Failed to execute tool - CHM is empty")
# smooth chm
CHM_smooth = saga_api.SG_Get_Data_Manager().Add_Grid()
if not grid_filter.Gaussian_Filter(
INPUT=CHM, RESULT=CHM_smooth, KERNEL_RADIUS=5, SIGMA=50.000000
):
return lis_cmd.Error_Return("Failed to execute tool")
# detect single trees
segments = saga_api.SG_Get_Data_Manager().Add_Grid()
seeds = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not lis_forestry.Single_Tree_Derivation(
CHM=CHM_smooth,
SEGMENTS=segments,
SEEDS=seeds,
BORDER_SEEDS=True,
METHOD="seed to saddle difference",
THRESHOLD=1.000000,
):
return lis_cmd.Error_Return("Failed to execute tool")
segments.Set_Name(saga_api.CSG_String("CHM_segments"))
# compute single tree metrics
treetops = saga_api.SG_Get_Data_Manager().Add_Shapes()
centroids = saga_api.SG_Get_Data_Manager().Add_Shapes()
boundaries = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not lis_forestry.Tree_Shape_Metrics(
CHM=CHM,
SEGMENTS=segments,
SEEDS=seeds,
TREETOPS=treetops,
CENTROIDS=centroids,
BOUNDARIES=boundaries,
SMOOTH_BOUNDARIES=True,
APPLY_FILTER=True,
MIN_AREA=2.000000,
MIN_SHAPE_RATIO=0.000000,
):
return lis_cmd.Error_Return("Failed to execute tool")
# attach segment id to pointcloud
grids = [
segments
] # Python list with input data objects of type 'saga_api.CSG_Grid'
if not lis_tools.Attach_Grid_Values_to_Point_Cloud(
PC_IN=pointcloud, GRIDS=grids, COPY_ATTR=True
):
return lis_cmd.Error_Return("Failed to execute tool")
if not lis_arithmetic.Point_Cloud_Attribute_Calculator(
PC_IN=pointcloud,
FORMULA="ifelse([classification]<3,0,[CHM_segments])",
NAME="TreeID",
TYPE="4 byte floating point number",
INCLUDE_NODATA=True,
):
return lis_cmd.Error_Return("Failed to execute tool")
# compute point cloud statistics for single trees
statistics_table = saga_api.SG_Get_Data_Manager().Add_Table()
if not lis_analysis.Feature_Statistics(
PC_IN=pointcloud,
TAB_OUT=statistics_table,
FIELD_SEGMENTID="TreeID",
FIELDS="dz",
COPY_ATTR=True,
ATTR_SUFFIX="",
ONLY_TABLE=False,
CALC_N=False,
CALC_MIN=False,
CALC_MAX=True,
CALC_RANGE=False,
CALC_SUM=False,
CALC_MEAN=True,
CALC_STDDEV=False,
CALC_MEDIAN=False,
CALC_MAJORITY=False,
CALC_UNIQUE=False,
):
return lis_cmd.Error_Return("Failed to execute tool")
# generate unique tree ids
formula = "[ID]+" + str(unit_index * 100000)
if not table_calculus.Field_Calculator(
TABLE=statistics_table,
FIELD="<not set>",
NAME="UniqueID",
FORMULA=formula,
USE_NODATA=False,
):
return lis_cmd.Error_Return("Failed to execute tool")
# join pointcloud statistics to treetops
if not table_tools.Join_Attributes_from_a_Table(
TABLE_A=treetops,
TABLE_B=statistics_table,
ID_A="ID",
ID_B="ID",
FIELDS_ALL=True,
KEEP_ALL=True,
CMP_CASE=True,
):
return lis_cmd.Error_Return("Failed to execute tool")
# select single tree treetops of current tile
if not shapes_tools.Select_by_Location(
SHAPES=treetops,
LOCATIONS=proc_unit,
CONDITION="intersect",
METHOD="new selection",
):
return lis_cmd.Error_Return("Failed to execute tool")
treetops_tile = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
INPUT=treetops, OUTPUT=treetops_tile
):
return lis_cmd.Error_Return("Failed to execute tool")
# select single tree centroids of current tile
if not table_tools.Join_Attributes_from_a_Table(
TABLE_A=centroids,
TABLE_B=treetops_tile,
ID_A="ID",
ID_B="ID",
FIELDS_ALL=False,
FIELDS="dz_max,dz_mean,UniqueID",
KEEP_ALL=True,
CMP_CASE=True,
):
return lis_cmd.Error_Return("Failed to execute tool")
if not shapes_tools.Select_by_Attributes_Numerical_Expression(
SHAPES=centroids,
FIELD="UniqueID",
EXPRESSION="a > 0",
USE_NODATA=False,
METHOD="new selection",
):
return lis_cmd.Error_Return("Failed to execute tool")
centroids_tile = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
INPUT=centroids, OUTPUT=centroids_tile
):
return lis_cmd.Error_Return("Failed to execute tool")
# select single tree boundaries of current tile
if not table_tools.Join_Attributes_from_a_Table(
TABLE_A=boundaries,
TABLE_B=treetops_tile,
ID_A="ID",
ID_B="ID",
FIELDS_ALL=True,
FIELDS="dz_max,dz_mean,UniqueID",
KEEP_ALL=True,
CMP_CASE=True,
):
return lis_cmd.Error_Return("Failed to execute tool")
if not shapes_tools.Select_by_Attributes_Numerical_Expression(
SHAPES=boundaries,
FIELD="UniqueID",
EXPRESSION="a > 0",
USE_NODATA=False,
METHOD="new selection",
):
return lis_cmd.Error_Return("Failed to execute tool")
boundaries_tile = saga_api.SG_Get_Data_Manager().Add_Shapes()
if not shapes_tools.Copy_Selection_to_New_Shapes_Layer(
INPUT=boundaries, OUTPUT=boundaries_tile
):
return lis_cmd.Error_Return("Failed to execute tool")
# save data
treetops_tile.Save(
os.path.join(cfg.TREE_POS_DIR, f"{unit_name}_treetops.shp")
)
centroids_tile.Save(
os.path.join(cfg.TREE_POS_DIR, f"{unit_name}_centroids.shp")
)
boundaries_tile.Save(
os.path.join(cfg.OUTLINES_POS_DIR, f"{unit_name}_boundaries.shp")
)
# clip elevation models of current tile
grids_list = [DTM, DSM, NDSM, CHM]
clipped_list = []
if not grid_tools.Clip_Grids(
GRIDS=grids_list,
SHAPES=proc_unit,
CLIPPED=clipped_list,
EXTENT="shapes extent",
BUFFER=0.000000,
):
return lis_cmd.Error_Return("Failed to execute tool")
if len(grids_list) < 1:
return lis_cmd.Error_Return(
f"Processing unit {unit_name} contains no point cloud data, skipping!"
)
# save data
DTM_clipped = clipped_list[0]
DTM_clipped.Set_Name(saga_api.CSG_String(f"{unit_name}_DTM"))
DTM_clipped.Save(os.path.join(cfg.DTM_DIR, f"{unit_name}_DTM.sg-grd-z"))
DSM_clipped = clipped_list[1]
DSM_clipped.Set_Name(saga_api.CSG_String(f"{unit_name}_DSM"))
DSM_clipped.Save(os.path.join(cfg.DSM_DIR, f"{unit_name}_DSM.sg-grd-z"))
NDSM_clipped = clipped_list[2]
NDSM_clipped.Set_Name(saga_api.CSG_String(f"{unit_name}_NDSM"))
NDSM_clipped.Save(os.path.join(cfg.NDSM_DIR, f"{unit_name}_NDSM.sg-grd-z"))
CHM_clipped = clipped_list[3]
CHM_clipped.Set_Name(saga_api.CSG_String(f"{unit_name}_CHM"))
CHM_clipped.Save(os.path.join(cfg.CHM_DIR, f"{unit_name}_CHM.sg-grd-z"))
# clip point cloud of current tile
pointcloud_clipped = saga_api.SG_Get_Data_Manager().Add_PointCloud()
if not lis_forestry.Tree_Segment_Aware_Overlap_Removal(
PC_IN=pointcloud,
STEM_POSITIONS=treetops_tile,
SELECTION_POLYGON=proc_unit,
PC_OUT=pointcloud_clipped,
FIELD_TREEID="TreeID",
FIELD_TREEID_STEM="ID",
FIELD_X_STEM="<not set>",
FIELD_Y_STEM="<not set>",
FIELD_TREEID_SKEL="<no attributes>",
FIELD_TREEID_PIPE="<no attributes>",
AOI_XRANGE="0; 0",
AOI_YRANGE="0; 0",
):
return lis_cmd.Error_Return("Failed to execute tool")
pointcloud_clipped.Save(
os.path.join(cfg.PC_DIR, f"{unit_name}_pc.sg-pts-z")
)
# free memory and stop logging
saga_api.SG_Get_Data_Manager().Delete()
return True
if __name__ == "__main__":
# -----------------------------------------------------------------------------
# start the logging
starttime = lis_cmd.Start_Logging(False)
# -----------------------------------------------------------------------------
# load the shapefile with the tiling scheme used for processing
processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.TILESHAPE_100
)
if not processing_units.is_Valid():
lis_cmd.Error_Exit(
f"Loading processing_units {cfg.TILESHAPE_100} failed",
starttime,
True,
)
lis_cmd.Message_Print(
f"Number of units to process: {processing_units.Get_Count()} \n\n"
)
# -----------------------------------------------------------------------------
# process the process_unit function in parallel (each tile on a CPU core) with the maximum number of available CPU cores
num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()
num_processing_units = processing_units.Get_Count()
lis_cmd.Run_Parallel_Tasks(process_unit, num_processing_units, num_threads)
# -----------------------------------------------------------------------------
# free memory and stop logging
saga_api.SG_Get_Data_Manager().Delete()
lis_cmd.Stop_Logging(starttime)Run Script via run_script.py
After preparing the 02_data_processing.py script, create a new script called run_script.py. This allows clean execution and logging of multiple steps in one go.
Copy the following code into the script and save it:
from subprocess import run
import os
import sys
import argparse
LOG_DIR = "../log"
def parse_args():
parser = argparse.ArgumentParser(
description="run a script with automated logging."
)
parser.add_argument(
"scripts",
type=str,
nargs="+",
help="the script(s) to execute, e.g. 'my_script.py' or 'my_script0.py my_script1.py my_script2.py'",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
for script in args.scripts:
scriptname, fileext = os.path.splitext(script)
std_log = os.path.join(LOG_DIR, f"{scriptname}.std.log")
err_log = os.path.join(LOG_DIR, f"{scriptname}.err.log")
if fileext == ".py":
command = f"{sys.executable} -u {script} > {std_log} 2> {err_log}"
print("running: " + command)
run(command, universal_newlines=True, shell=True)
else:
print(f"Skipping {script} (unknown file extension)")Execute the Processing Pipeline
Run the processing pipeline in the Miniforge Prompt using the following command:
python run_script.py 01_data_preparation.py 02_data_processing.py