Parallelizing a Workflow with LIS Pro 3D
< Previous section Next section >
Sequential processing (unparallelized) often is the default to start with. In the case when a dataset is too big to fit into memory at once, you first need to chunk it down into smaller processing units that can then be processed either sequentially or in parallel, depending on the memory footprint required to process a single unit.

In this section, we show how to parallelize the workflow introduced previously. Fortunately, only two parts of the code need to be modified.
Step 1: Add Custom LIS Decorator
Before
After
import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *
def process_unit(unit_index): ...import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *
@lis_cmd.proc_function_wrapper
def process_unit(unit_index): ...Add the @lis_cmd.proc_function_wrapperto the process_unit function to make it work in parallel.
The @lis_cmd.proc_function_wrapper decorator expects a processing function that must comply to either one of the following two interfaces:
def process_unit(unit_index: int) -> bool:
...
return Trueor
def process_unit(unit_index: int, args_dict: dict[str, Any]) -> bool:
...
return TrueThe latter function version allows you to pass a Python dictionary (filled with further arguments) to the processing function. This is convenient to specify further parameters that have the same values across all processing units.
Step 2: Pass the Processing Function to the LIS Parallelizer
Replace the sequential processing loop from the previous section with the LIS parallel execution call.
Before
...
if __name__ == "__main__":
...
# --------------------------------------------------------------------
# iterate over the processing units and process each unit by calling
# the process_unit function with the processing pipeline
for unit_index in range(processing_units.Get_Count()):
result = process_unit(unit_index)
if not result:
lis_cmd.Error_Exit("Processing task failed", starttime)
...What to Change
Remove (or comment out) the entire sequential processing loop, starting with
for unit_index in range(processing_units.Get_Count()):and ending with
lis_cmd.Error_Exit("Processing task failed", starttime)Then replace it with the parallel execution code shown below.
After
...
if __name__ == "__main__":
...
# --------------------------------------------------------------------
# execute the process_unit function in parallel (each unit on a
# CPU core) with the maximum number of available CPU cores
num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()
lis_cmd.Message_Print(
f"Number of threads for parallized processing: {num_threads}\n\n"
)
num_processing_units = processing_units.Get_Count()
# this line starts parallel processing of your job
lis_cmd.Run_Parallel_Tasks(
process_unit, num_processing_units, num_threads
)
...Be aware that parallelized processing typically requires more memory than sequential processing! Therefore you have to take care that the amount of data that is processed in parallel still fits into the memory of your computer (RAM)! Not taking care of this can lead to crashing your program or leaving your PC unresponsive to user input for longer periods.
Set num_threads with respect to your computer memory. saga_api.SG_OMP_Get_Max_Num_Threads() will automatically return the number of available cores on your machine! The formula to obtain a rough estimate of the required RAM is
\(required \space memory = number \space of \space parallel \space processes \times memory \space usage \space per \space process\)
For example, if processing a single unit requires 1 GB RAM, 32 parallel processes will require 32 GB RAM. You can always set num_threads to a lower number of cpu cores than the total number of your machine!
Instead of writing
num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()simply write something like
num_threads = 4
if (
num_threads > saga_api.SG_OMP_Get_Max_Num_Threads()
): # ensure that you are not defining more threads than available
num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()That’s it! Now you can run the workflow again and check the processing time. The processing time should decrease significantly.
As we use a C++ library in the background, it is not possible to directly pass SAGA objects to the parallelized processing as function arguments. This is why we load the tiling scheme both in the main function as well as in the processing function.
The Entire Parallelized Workflow
import os
import project_config as cfg
from PySAGA import saga_api, lis_cmd
from PySAGA.tools import *
# ------------------------------------------------------------------------
# note: the @lis_cmd.proc_function_wrapper decorator
# is required for parallel processing!
@lis_cmd.proc_function_wrapper
def process_unit(unit_index):
# --------------------------------------------------------------------
# load the shapefile with the tiling scheme used for processing
processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.PROCESSING_UNITS
)
if not processing_units:
lis_cmd.Error_Return(
f"Loading processing_units {cfg.PROCESSING_UNITS} failed"
)
# --------------------------------------------------------------------
# get unit to process
proc_unit = processing_units.Get_Shape(unit_index)
xmin = proc_unit.Get_Extent().Get_XMin()
ymin = proc_unit.Get_Extent().Get_YMin()
xmax = proc_unit.Get_Extent().Get_XMax()
ymax = proc_unit.Get_Extent().Get_YMax()
# units are referenced by their lower-left corner coordinate
unit_name = f"{int(xmin)}_{int(ymin)}"
lis_cmd.Message_Print(f"\nProcessing unit {unit_name} \n")
# --------------------------------------------------------------------
# query point cloud (with overlap) from virtual point cloud dataset
pc_out_list = []
if not lis_virtual.Get_Subset_from_Virtual_LASLAZ(
PC_OUT=pc_out_list,
FILENAME=cfg.LASVF,
COPY_ATTR=True,
CONSTRAIN_QUERY=False,
FILE_COMPRESSION=True,
ERROR_IF_EMPTY=True,
THIN_OUT=False,
AOI_XRANGE=f"{xmin};{xmax}",
AOI_YRANGE=f"{ymin};{ymax}",
SKIP_EMPTY_AOIS=True,
METHOD_CLIP="complete boundary is inside",
):
return lis_cmd.Error_Return("Failed to execute tool")
# check if the processing unit contains any valid data
if len(pc_out_list) < 1:
return lis_cmd.Error_Return(
f"Unit contains no point cloud data, skipping!"
)
# add the point cloud datasets to the data manager
# for easier memory management
pc = saga_api.SG_Get_Data_Manager().Add(pc_out_list[0]).asPointCloud()
if pc.Get_Count() < 1:
return lis_cmd.Error_Return(
f"pointcloud contains no points, skipping!"
)
dsm = saga_api.SG_Get_Data_Manager().Add_Grid()
if not lis_conversion.Point_Cloud_to_Grid(
PC_IN=pc,
TARGET_OUT_GRID=dsm,
ZFIELD="Z",
METHOD="max",
USE_ATTR_RANGE=False,
USE_Z_RANGE=False,
USE_RADIUS=False,
TARGET_DEFINITION="user defined",
TARGET_USER_SIZE=cfg.OUTPUT_RESOLUTION,
TARGET_USER_XMIN=xmin,
TARGET_USER_XMAX=xmax,
TARGET_USER_YMIN=ymin,
TARGET_USER_YMAX=ymax,
TARGET_USER_FITS="cells",
):
return lis_cmd.Error_Return(f"Failed to generate DSM")
# remove buffer of processing unit before saving
grids_list = [
dsm
] # Python list with input data objects of type 'saga_api.CSG_Grid'
clipped_list = (
[]
) # Python list, will become filled after successful execution with output data objects of type 'saga_api.CSG_Grid'
if not grid_tools.Clip_Grids(
GRIDS=grids_list,
CLIPPED=clipped_list,
EXTENT="user defined",
XMIN=xmin,
XMAX=xmax,
YMIN=ymin,
YMAX=ymax,
BUFFER=0.0,
):
return lis_cmd.Error_Return(f"Failed to clip DSM")
if len(clipped_list) < 1:
return lis_cmd.Error_Return(
f"Unit contains no grid data, skipping!"
)
dsm_out = saga_api.SG_Get_Data_Manager().Add(clipped_list[0]).asGrid()
dsm_out.Get_Projection().Create(saga_api.CSG_String(cfg.PROJ))
dsm_out.Save(os.path.join(cfg.DSM_DIR, f"{unit_name}_dsm.tif"))
# --------------------------------------------------------------------
# free memory resources and return success
saga_api.SG_Get_Data_Manager().Delete()
return True
if __name__ == "__main__":
starttime = lis_cmd.Start_Logging(False)
# --------------------------------------------------------------------
# load the shapefile with the tiling scheme used for processing
processing_units = saga_api.SG_Get_Data_Manager().Add_Shapes(
cfg.PROCESSING_UNITS
)
if not processing_units:
lis_cmd.Error_Exit(
f"Loading processing_units {cfg.PROCESSING_UNITS} failed",
starttime,
True,
)
lis_cmd.Message_Print(
f"Number of units to process: {processing_units.Get_Count()} \n\n"
)
# --------------------------------------------------------------------
# use the maximum number of available CPU cores on the machien
# to execute process_unit in parallel
num_threads = saga_api.SG_OMP_Get_Max_Num_Threads()
lis_cmd.Message_Print(
f"Number of threads for parallized processing: {num_threads}\n\n"
)
num_processing_units = processing_units.Get_Count()
# this line starts parallel processing of your job
lis_cmd.Run_Parallel_Tasks(
process_unit, num_processing_units, num_threads
)
# --------------------------------------------------------------------
# free memory and stop logging
saga_api.SG_Get_Data_Manager().Delete()
lis_cmd.Stop_Logging(starttime)