Source code for pyxccd.imagetool.prepare_ard

# Author: Su Ye
# This script is an example for generating block-based stack files from original ARD zip as
# intermediate inputs to the COLD algorithm in a HPC environment. As preparation, you need to
# download '_BT' and '_SR' for all Landsat ARD collection 1.
# This script has 4 steps: 1) warp single-path array to limit the observation inputs for each pixel
# if single_path is set True; 2) unzip all images and unpack bit-based QA bands; 3)
# partition each 5000*5000 temporal images to blocks and eliminate those image blocks if no clear,
# water or snow pixel were in it (so to save disk space and enable independent IO for individual
# block in later time-series analysis); 4) save each image block to python-native binary format
# (.npy) into its block folders

# For a 42-year Landsat ARD C1 tile (~3000 images), this script averagely produces ~350 G intermediate disk
# files, and takes ~12 mins to finish if 200 EPYC 7452 cores are used.
# running example:
# source /home/colory666/env_collect/py310_geo/bin/activate
# python prepare_ard.py --source_dir=/data/landsat_c2/204_22 --out_dir=/data/results/204_22_stack --yaml_path=/home/colory666/pyxccd/src/python/pyxccd/imagetool/config.yaml  --n_cores=10

import shutil
import tarfile
import xml.etree.ElementTree as ET
from os.path import isfile, join, isdir, exists
from importlib import resources as importlib_resources
import datetime as dt
import os
import logging
import time
import functools
from logging import Logger
from datetime import datetime
import pandas as pd
import multiprocessing
import numpy as np
from dateutil.parser import parse
import click
import yaml
from pathlib import Path
from pyxccd.common import DatasetInfo
from pyxccd.utils import class_from_dict, rio_loaddata, rio_warp


# define constant here
QA_CLEAR = 0
QA_WATER = 1
QA_SHADOW = 2
QA_SNOW = 3
QA_CLOUD = 4
QA_FILL = 255

QA_CIRRUS_HLS = 0
QA_WATER_HLS = 5
QA_SHADOW_HLS = 3
QA_SNOW_HLS = 4
QA_CLOUDADJACENT_HLS = 2
QA_CLOUD_HLS = 1
res = 30


s2_stack_bands = ["B02", "B03", "B04", "B8A", "B11", "B12", "Fmask"]
l8_stack_bands = ["B02", "B03", "B04", "B05", "B06", "B07", "Fmask"]


[docs] def mask_value(vector, val): """ Build a boolean mask around a certain value in the vector. Args: vector: 1-d ndarray of values val: values to mask on Returns: 1-d boolean ndarray """ return vector == val
[docs] def qabitval_array_HLS(packedint_array: np.ndarray): """ Institute a hierarchy of qa values that may be flagged in the bitpacked value. fill > cloud > shadow > snow > water > clear Args: packedint: int value to bit check Returns: offset value to use """ unpacked = np.full(packedint_array.shape, 0) QA_CLOUD_unpacked = np.bitwise_and(packedint_array, 1 << 1) QA_CLOUD_ADJ = np.bitwise_and(packedint_array, 1 << 2) QA_SHADOW_unpacked = np.bitwise_and(packedint_array, 1 << 3) QA_SNOW_unpacked = np.bitwise_and(packedint_array, 1 << 4) QA_WATER_unpacked = np.bitwise_and(packedint_array, 1 << 5) unpacked[QA_WATER_unpacked > 0] = QA_WATER unpacked[QA_SNOW_unpacked > 0] = QA_SNOW unpacked[QA_SHADOW_unpacked > 0] = QA_SHADOW unpacked[QA_CLOUD_ADJ > 0] = QA_CLOUD unpacked[QA_CLOUD_unpacked > 0] = QA_CLOUD unpacked[packedint_array == QA_FILL] = QA_FILL return unpacked
[docs] def qabitval_array(packedint_array: np.ndarray): """ Institute a hierarchy of qa values that may be flagged in the bitpacked value. fill > cloud > shadow > snow > water > clear Args: packedint: int value to bit check Returns: offset value to use """ unpacked = np.full(packedint_array.shape, QA_FILL) QA_CLOUD_unpacked = np.bitwise_and(packedint_array, 1 << (QA_CLOUD + 1)) QA_SHADOW_unpacked = np.bitwise_and(packedint_array, 1 << (QA_SHADOW + 1)) QA_SNOW_unpacked = np.bitwise_and(packedint_array, 1 << (QA_SNOW + 1)) QA_WATER_unpacked = np.bitwise_and(packedint_array, 1 << (QA_WATER + 1)) QA_CLEAR_unpacked = np.bitwise_and(packedint_array, 1 << (QA_CLEAR + 1)) unpacked[QA_CLEAR_unpacked > 0] = QA_CLEAR unpacked[QA_WATER_unpacked > 0] = QA_WATER unpacked[QA_SNOW_unpacked > 0] = QA_SNOW unpacked[QA_SHADOW_unpacked > 0] = QA_SHADOW unpacked[QA_CLOUD_unpacked > 0] = QA_CLOUD return unpacked
[docs] def qabitval_array_c2(packedint_array: np.ndarray): """ Institute a hierarchy of qa values that may be flagged in the bitpacked value for c2 fill > cloud > shadow > snow > water > clear Args: packedint: int value to bit check Returns: offset value to use """ unpacked = np.full(packedint_array.shape, QA_FILL) QA_CLEAR_unpacked = np.bitwise_and(packedint_array, 1 << 6) QA_SHADOW_unpacked = np.bitwise_and(packedint_array, 1 << 4) QA_CLOUD_unpacked = np.bitwise_and(packedint_array, 1 << 3) QA_DILATED_unpacked = np.bitwise_and(packedint_array, 1 << 1) QA_SNOW_unpacked = np.bitwise_and(packedint_array, 1 << 5) QA_WATER_unpacked = np.bitwise_and(packedint_array, 1 << 7) unpacked[QA_CLEAR_unpacked > 0] = QA_CLEAR unpacked[QA_WATER_unpacked > 0] = QA_WATER unpacked[QA_SNOW_unpacked > 0] = QA_SNOW unpacked[QA_SHADOW_unpacked > 0] = QA_SHADOW unpacked[QA_CLOUD_unpacked > 0] = QA_CLOUD unpacked[QA_DILATED_unpacked > 0] = QA_CLOUD return unpacked
[docs] def single_image_stacking_hls( source_dir: str, out_dir: str, logger: Logger, dataset_info: DatasetInfo, is_partition: bool, clear_threshold: float, low_date_bound: str, upp_date_bound: str, folder: str, ): """ unzip single image, convert bit-pack qa to byte value, and save as numpy :param source_dir: the parent folder to save image 'folder' :param out_dir: the folder to save result :param logger: the handler of logger file :param data_info: data info data class :param is_partition: True, partition each image into blocks; False, save original size of image :param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed :param low_date_bound: the lower date of user interested date range :param upp_date_bound: the upper date of user interested date range :param folder: the folder name of image :return: """ try: QA_band = rio_loaddata(join(source_dir, folder, f"{folder}.Fmask.tif")) except Exception as e: # logger.error('Cannot open QA band for {}: {}'.format(folder, e)) logger.error(f"Cannot open QA band for {folder}: {e}") return False # convertQA = np.vectorize(qabitval) QA_band_unpacked = qabitval_array_HLS(QA_band).astype(np.short) if clear_threshold > 0: clear_ratio = np.sum( np.logical_or(QA_band_unpacked == QA_CLEAR, QA_band_unpacked == QA_WATER) ) / np.sum(QA_band_unpacked != QA_FILL) else: clear_ratio = 1 if clear_ratio > clear_threshold: [collection, sensor, tile_id, imagetime, version1, version2] = folder.rsplit( "." ) year = imagetime[0:4] doy = imagetime[4:7] file_name = sensor + tile_id + year + doy + collection + version1 if low_date_bound is not None: if (dt.datetime(int(year), 1, 1) + dt.timedelta(int(doy) - 1)) < parse( low_date_bound ): return True if upp_date_bound is not None: if (dt.datetime(int(year), 1, 1) + dt.timedelta(int(doy) - 1)) > parse( upp_date_bound ): return True if sensor == "L30": try: b1 = rio_loaddata(join(source_dir, folder, "{}.B02.tif".format(folder))) b2 = rio_loaddata(join(source_dir, folder, "{}.B03.tif".format(folder))) b3 = rio_loaddata(join(source_dir, folder, "{}.B04.tif".format(folder))) b4 = rio_loaddata(join(source_dir, folder, "{}.B05.tif".format(folder))) b5 = rio_loaddata(join(source_dir, folder, "{}.B06.tif".format(folder))) b6 = rio_loaddata(join(source_dir, folder, "{}.B07.tif".format(folder))) b7 = np.full(b6.shape, 0) # assign zero except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error("Cannot open Landsat bands for {}: {}".format(folder, e)) return False else: try: b1 = rio_loaddata(join(source_dir, folder, "{}.B02.tif".format(folder))) b2 = rio_loaddata(join(source_dir, folder, "{}.B03.tif".format(folder))) b3 = rio_loaddata(join(source_dir, folder, "{}.B04.tif".format(folder))) b4 = rio_loaddata(join(source_dir, folder, "{}.B8A.tif".format(folder))) b5 = rio_loaddata(join(source_dir, folder, "{}.B11.tif".format(folder))) b6 = rio_loaddata(join(source_dir, folder, "{}.B12.tif".format(folder))) b7 = np.full(b6.shape, 0) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error("Cannot open Landsat bands for {}: {}".format(folder, e)) return False if ( (b1 is None) or (b2 is None) or (b3 is None) or (b4 is None) or (b5 is None) or (b6 is None) ): logger.error("Reading Landsat band fails for {}".format(folder)) return False if is_partition is True: # width of a block bytesize = 2 # short16 = 2 * byte # source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles-in-python-using-numpy-d1bf0dd7b6f7 B1_blocks = np.lib.stride_tricks.as_strided( b1, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B2_blocks = np.lib.stride_tricks.as_strided( b2, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B3_blocks = np.lib.stride_tricks.as_strided( b3, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B4_blocks = np.lib.stride_tricks.as_strided( b4, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B5_blocks = np.lib.stride_tricks.as_strided( b5, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B6_blocks = np.lib.stride_tricks.as_strided( b6, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B7_blocks = np.lib.stride_tricks.as_strided( b7, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) QA_blocks = np.lib.stride_tricks.as_strided( QA_band_unpacked, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) for i in range(dataset_info.n_block_y): for j in range(dataset_info.n_block_x): # check if no valid pixels in the chip, then eliminate qa_unique = np.unique(QA_blocks[i][j]) # skip blocks are all cloud, shadow or filled values # in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD', # 'QA_SHADOW', or FILLED value (255) if ( QA_CLEAR not in qa_unique and QA_WATER not in qa_unique and QA_SNOW not in qa_unique ): continue block_folder = "block_x{}_y{}".format(j + 1, i + 1) np.save( join(join(out_dir, block_folder), file_name), np.dstack( [ B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j], B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j], QA_blocks[i][j], ] ).astype(np.int16), ) else: np.save( join(out_dir, file_name), np.dstack([b1, b2, b3, b4, b5, b6, b7, QA_band_unpacked]).astype( np.int16 ), ) # scene_list.append(folder_name) else: # logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3])) logger.warn("Not enough clear observations for {}".format(folder)) return True
[docs] def single_image_stacking( tmp_path: str, source_dir: str, out_dir: str, clear_threshold: float, path_array: np.ndarray, logger: Logger, dataset_info: DatasetInfo, is_partition: bool, low_date_bound: str, upp_date_bound: str, b_c2: bool, folder: str, ): """ unzip single image, convert bit-pack qa to byte value, and save as numpy :param tmp_path: tmp folder to save unzip image :param source_dir: image folder save source zipped files :param out_dir: the folder to save result :param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed :param path_array: path array has the same dimension of inputted image, and the pixel value indicates the path which the pixel belongs to; if path_array == none, we will use all path :param logger: the handler of logger file :param dataset_info: dataset information :param is_partition: True, partition each image into blocks; False, save original size of image :param low_date_bound: the lower bound of user interested year range :param upp_date_bound: the upper bound of user interested year range :param b_c2: False :param folder: the folder name of image :return: """ # unzip SR if os.path.exists(join(tmp_path, folder)): shutil.rmtree(join(tmp_path, folder), ignore_errors=True) if os.path.exists(join(tmp_path, folder.replace("SR", "BT"))): shutil.rmtree(join(tmp_path, folder.replace("SR", "BT")), ignore_errors=True) try: with tarfile.open(join(source_dir, folder + ".tar")) as tar_ref: try: tar_ref.extractall(join(tmp_path, folder)) except Exception: # logger.warning('Unzip fails for {}'.format(folder)) logger.error("Unzip fails for {}".format(folder)) return except IOError as e: logger.error("Unzip fails for {}: {}".format(folder, e)) # return # unzip BT try: with tarfile.open( join(source_dir, folder.replace("SR", "BT") + ".tar") ) as tar_ref: try: tar_ref.extractall(join(tmp_path, folder.replace("SR", "BT"))) except Exception: # logger.warning('Unzip fails for {}'.format(folder.replace("SR", "BT"))) logger.error("Unzip fails for {}".format(folder.replace("SR", "BT"))) return except IOError as e: logger.error("Unzip fails for {}: {}".format(folder.replace("SR", "BT"), e)) return if not isdir(join(tmp_path, folder.replace("SR", "BT"))): logger.error("Fail to locate BT folder for {}".format(folder)) return try: if b_c2 is False: QA_band = rio_loaddata( join( join(tmp_path, folder), "{}_PIXELQA.tif".format(folder[0 : len(folder) - 3]), ) ) else: QA_band = rio_loaddata( join( join(tmp_path, folder), "{}_QA_PIXEL.TIF".format(folder[0 : len(folder) - 3]), ) ) except Exception as e: # logger.error('Cannot open QA band for {}: {}'.format(folder, e)) logger.error("Cannot open QA band for {}: {}".format(folder, e)) return # convertQA = np.vectorize(qabitval) if b_c2 is False: QA_band_unpacked = qabitval_array(QA_band).astype(np.short) else: QA_band_unpacked = qabitval_array_c2(QA_band).astype(np.short) if clear_threshold > 0: clear_ratio = np.sum( np.logical_or(QA_band_unpacked == QA_CLEAR, QA_band_unpacked == QA_WATER) ) / np.sum(QA_band_unpacked != QA_FILL) else: clear_ratio = 1 if clear_ratio > clear_threshold: if folder[3] == "5": sensor = "LT5" elif folder[3] == "7": sensor = "LE7" elif folder[3] == "8": sensor = "LC8" elif folder[3] == "4": sensor = "LT4" elif folder[3] == "9": sensor = "LC9" else: logger.error( "Sensor is not correctly formatted for the scene {}".format(folder) ) col = folder[8:11] row = folder[11:14] year = folder[15:19] doy = datetime(int(year), int(folder[19:21]), int(folder[21:23])).strftime("%j") collection = "C{}".format(folder[35:36]) if b_c2 is False: version = folder[37:40] else: version = folder[34:35] file_name = sensor + col + row + year + doy + collection + version if low_date_bound is not None: if (dt.datetime(int(year), 1, 1) + dt.timedelta(int(doy) - 1)) < parse( low_date_bound ): return if upp_date_bound is not None: if (dt.datetime(int(year), 1, 1) + dt.timedelta(int(doy) - 1)) > parse( upp_date_bound ): return if b_c2 is False: if sensor == "LT5" or sensor == "LE7" or sensor == "LT4": try: b1 = rio_loaddata( join(join(tmp_path, folder), "{}b1.tif".format(folder)) ) b2 = rio_loaddata( join(join(tmp_path, folder), "{}b2.tif".format(folder)) ) b3 = rio_loaddata( join(join(tmp_path, folder), "{}b3.tif".format(folder)) ) b4 = rio_loaddata( join(join(tmp_path, folder), "{}b4.tif".format(folder)) ) b5 = rio_loaddata( join(join(tmp_path, folder), "{}b5.tif".format(folder)) ) b6 = rio_loaddata( join(join(tmp_path, folder), "{}b7.tif".format(folder)) ) b7 = rio_loaddata( join( join(tmp_path, "{}_BT".format(folder[0 : len(folder) - 3])), "{}_BTB6.tif".format(folder[0 : len(folder) - 3]), ) ) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error( "Cannot open Landsat bands for {}: {}".format(folder, e) ) return elif sensor == "LC8" or "LC9": try: b1 = rio_loaddata( join(join(tmp_path, folder), "{}b2.tif".format(folder)) ) b2 = rio_loaddata( join(join(tmp_path, folder), "{}b3.tif".format(folder)) ) b3 = rio_loaddata( join(join(tmp_path, folder), "{}b4.tif".format(folder)) ) b4 = rio_loaddata( join(join(tmp_path, folder), "{}b5.tif".format(folder)) ) b5 = rio_loaddata( join(join(tmp_path, folder), "{}b6.tif".format(folder)) ) b6 = rio_loaddata( join(join(tmp_path, folder), "{}b7.tif".format(folder)) ) b7 = rio_loaddata( join( join(tmp_path, "{}_BT".format(folder[0 : len(folder) - 3])), "{}_BTB10.tif".format(folder[0 : len(folder) - 3]), ) ) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error( "Cannot open Landsat bands for {}: {}".format(folder, e) ) return else: if sensor == "LT5" or sensor == "LE7" or sensor == "LT4": try: b1 = rio_loaddata( join(join(tmp_path, folder), "{}_B1.TIF".format(folder)) ) b2 = rio_loaddata( join(join(tmp_path, folder), "{}_B2.TIF".format(folder)) ) b3 = rio_loaddata( join(join(tmp_path, folder), "{}_B3.TIF".format(folder)) ) b4 = rio_loaddata( join(join(tmp_path, folder), "{}_B4.TIF".format(folder)) ) b5 = rio_loaddata( join(join(tmp_path, folder), "{}_B5.TIF".format(folder)) ) b6 = rio_loaddata( join(join(tmp_path, folder), "{}_B7.TIF".format(folder)) ) b7 = rio_loaddata( join( join(tmp_path, "{}_BT".format(folder[0 : len(folder) - 3])), "{}_BT_B6.TIF".format(folder[0 : len(folder) - 3]), ) ) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error( "Cannot open Landsat bands for {}: {}".format(folder, e) ) return elif sensor == "LC8" or "LC9": try: b1 = rio_loaddata( join(join(tmp_path, folder), "{}_B2.TIF".format(folder)) ) b2 = rio_loaddata( join(join(tmp_path, folder), "{}_B3.TIF".format(folder)) ) b3 = rio_loaddata( join(join(tmp_path, folder), "{}_B4.TIF".format(folder)) ) b4 = rio_loaddata( join(join(tmp_path, folder), "{}_B5.TIF".format(folder)) ) b5 = rio_loaddata( join(join(tmp_path, folder), "{}_B6.TIF".format(folder)) ) b6 = rio_loaddata( join(join(tmp_path, folder), "{}_B7.TIF".format(folder)) ) b7 = rio_loaddata( join( tmp_path, "{}_BT".format(folder[0 : len(folder) - 3]), "{}_BT_B10.TIF".format(folder[0 : len(folder) - 3]), ) ) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error( "Cannot open Landsat bands for {}: {}".format(folder, e) ) return if ( (b1 is None) or (b2 is None) or (b3 is None) or (b4 is None) or (b5 is None) or (b6 is None) or (b7 is None) ): logger.error("Reading Landsat band fails for {}".format(folder)) return if b_c2 is True: b1 = (10000 * (b1 * 2.75e-05 - 0.2)).astype(np.int16) b2 = (10000 * (b2 * 2.75e-05 - 0.2)).astype(np.int16) b3 = (10000 * (b3 * 2.75e-05 - 0.2)).astype(np.int16) b4 = (10000 * (b4 * 2.75e-05 - 0.2)).astype(np.int16) b5 = (10000 * (b5 * 2.75e-05 - 0.2)).astype(np.int16) b7 = (10000 * (b7 * 2.75e-05 - 0.2)).astype(np.int16) b6 = (10 * (b6 * 0.00341802 + 149)).astype(np.int16) # if path_array is not None, we will eliminate those observation that has different path with its assigned path if path_array is not None: # meaning that single-path processing if not os.path.exists( join(join(tmp_path, folder), folder.replace("_SR", ".xml")) ): logger.error( "Cannot find xml file for {}".format( join(join(tmp_path, folder), folder.replace("_SR", ".xml")) ) ) return tree = ET.parse(join(join(tmp_path, folder), folder.replace("_SR", ".xml"))) # get root element root = tree.getroot() if b_c2 is False: elements = root.findall( "./{https://landsat.usgs.gov/ard/v1}scene_metadata/{https://landsat.usgs.gov/" "ard/v1}global_metadata/{https://landsat.usgs.gov/ard/v1}wrs" ) else: elements = root.findall("SCENE_METADATA/IMAGE_ATTRIBUTES") if len(elements) == 0: logger.error("Parsing xml fails for {}".format(folder)) return if b_c2 is False: pathid = int(elements[0].attrib["path"]) else: if elements[0][3].text is not None: pathid = int(elements[0][3].text) else: raise ValueError("path id could not be located.") # print(pathid) # assign filled value to the pixels has different path id so won't be processed QA_band_unpacked[path_array != pathid] = QA_FILL if is_partition is True: # width of a block bytesize = 2 # short16 = 2 * byte # source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles-in-python-using-numpy-d1bf0dd7b6f7 B1_blocks = np.lib.stride_tricks.as_strided( b1, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B2_blocks = np.lib.stride_tricks.as_strided( b2, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B3_blocks = np.lib.stride_tricks.as_strided( b3, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B4_blocks = np.lib.stride_tricks.as_strided( b4, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B5_blocks = np.lib.stride_tricks.as_strided( b5, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B6_blocks = np.lib.stride_tricks.as_strided( b6, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B7_blocks = np.lib.stride_tricks.as_strided( b7, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) QA_blocks = np.lib.stride_tricks.as_strided( QA_band_unpacked, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) for i in range(dataset_info.n_block_y): for j in range(dataset_info.n_block_x): # check if no valid pixels in the chip, then eliminate qa_unique = np.unique(QA_blocks[i][j]) # skip blocks are all cloud, shadow or filled values # in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD', # 'QA_SHADOW', or FILLED value (255) if ( QA_CLEAR not in qa_unique and QA_WATER not in qa_unique and QA_SNOW not in qa_unique ): continue block_folder = "block_x{}_y{}".format(j + 1, i + 1) np.save( join(join(out_dir, block_folder), file_name), np.dstack( [ B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j], B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j], QA_blocks[i][j], ] ).astype(np.int16), ) else: np.save( join(out_dir, file_name), np.dstack([b1, b2, b3, b4, b5, b6, b7, QA_band_unpacked]).astype( np.int16 ), ) # scene_list.append(folder_name) else: # logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3])) logger.warn( "Not enough clear observations for {}".format(folder[0 : len(folder) - 3]) ) # delete unzip folder shutil.rmtree(join(tmp_path, folder), ignore_errors=True) shutil.rmtree(join(tmp_path, folder.replace("SR", "BT")), ignore_errors=True)
[docs] def single_image_stacking_collection2( tmp_path: str, source_dir: str, out_dir: str, clear_threshold: float, logger: Logger, dataset_info: DatasetInfo, reference_path: str, is_partition: bool, low_date_bound: str, upp_date_bound: str, folder: str, ): """ for collection 2 :param tmp_path: tmp folder to save unzip image :param source_dir: image folder save source zipped files :param out_dir: the folder to save result :param clear_threshold: threshold of clear pixel percentage, if lower than threshold, won't be processed :param logger: the handler of logger file :param dataset_info:DatasetInfo :param is_partition: True, partition each image into blocks; False, save original size of image :param low_date_bound: the lower bound of user interested date range :param upp_date_bound: the upper bound of user interested date range :param bounds :param folder: the folder name of image :return: """ # unzip SR if os.path.exists(join(tmp_path, folder)): shutil.rmtree(join(tmp_path, folder), ignore_errors=True) try: with tarfile.open(join(source_dir, folder + ".tar")) as tar_ref: try: tar_ref.extractall(join(tmp_path, folder)) except Exception: # logger.warning('Unzip fails for {}'.format(folder)) logger.error("Unzip fails for {}".format(folder)) return except IOError as e: logger.error("Unzip fails for {}: {}".format(folder, e)) # return try: rio_warp( join(tmp_path, folder, f"{folder}_QA_PIXEL.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) QA_band = rio_loaddata(join(tmp_path, folder, f"{folder}_QA_PIXEL.TIF")) except Exception as e: # logger.error('Cannot open QA band for {}: {}'.format(folder, e)) logger.error("Cannot open QA band for {}: {}".format(folder, e)) return # convertQA = np.vectorize(qabitval) QA_band_unpacked = qabitval_array_c2(QA_band).astype(np.short) if clear_threshold > 0: clear_ratio = np.sum( np.logical_or(QA_band_unpacked == QA_CLEAR, QA_band_unpacked == QA_WATER) ) / np.sum(QA_band_unpacked != QA_FILL) else: clear_ratio = 1 if clear_ratio > clear_threshold: if folder[3] == "5": sensor = "LT5" elif folder[3] == "7": sensor = "LE7" elif folder[3] == "8": sensor = "LC8" elif folder[3] == "9": sensor = "LC9" elif folder[3] == "4": sensor = "LT4" else: logger.error( "Sensor is not correctly formatted for the scene {}".format(folder) ) path = folder[10:13] row = folder[13:16] year = folder[17:21] doy = datetime(int(year), int(folder[21:23]), int(folder[23:25])).strftime("%j") collection = "C2" version = folder[len(folder) - 2 : len(folder)] file_name = sensor + path + row + year + doy + collection + version if low_date_bound is not None: if (dt.datetime(int(year), 1, 1) + dt.timedelta(int(doy) - 1)) < parse( low_date_bound ): return True if upp_date_bound is not None: if (dt.datetime(int(year), 1, 1) + dt.timedelta(int(doy) - 1)) > parse( upp_date_bound ): return True if sensor == "LT5" or sensor == "LE7" or sensor == "LT4": try: # b1 = rio_loaddata(join(join(tmp_path, folder), # "{}_SR_B1.TIF".format(folder))) rio_warp( join(tmp_path, folder, f"{folder}_SR_B1.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b1 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B2.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b2 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B3.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b3 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B4.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b4 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B5.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b5 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B7.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b6 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_ST_B6.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b7 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error("Cannot open Landsat bands for {}: {}".format(folder, e)) return elif sensor == "LC8" or "LC9": try: rio_warp( join(tmp_path, folder, f"{folder}_SR_B2.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b1 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B3.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b2 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B4.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b3 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B5.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b4 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B6.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b5 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_SR_B7.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b6 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) rio_warp( join(tmp_path, folder, f"{folder}_ST_B10.TIF"), join(tmp_path, folder, "_tmp_img.tif"), reference_path, ) b7 = rio_loaddata(join(tmp_path, folder, "_tmp_img.tif")) except Exception as e: # logger.error('Cannot open spectral bands for {}: {}'.format(folder, e)) logger.error(f"Cannot open Landsat bands for {folder}: {e}") return if ( (b1 is None) or (b2 is None) or (b3 is None) or (b4 is None) or (b5 is None) or (b6 is None) or (b7 is None) ): logger.error("Reading Landsat band fails for {}".format(folder)) return # source: https://www.usgs.gov/faqs/how-do-i-use-scale-factor-landsat-level-2-science-products?qt- # news_science_products=0#qt-news_science_products recommended by yongquan B1_t = (10000 * (b1 * 2.75e-05 - 0.2)).astype(np.int16) B2_t = (10000 * (b2 * 2.75e-05 - 0.2)).astype(np.int16) B3_t = (10000 * (b3 * 2.75e-05 - 0.2)).astype(np.int16) B4_t = (10000 * (b4 * 2.75e-05 - 0.2)).astype(np.int16) B5_t = (10000 * (b5 * 2.75e-05 - 0.2)).astype(np.int16) B7_t = (10000 * (b7 * 2.75e-05 - 0.2)).astype(np.int16) B6_t = (10 * (b6 * 0.00341802 + 149)).astype(np.int16) # padding zeros b1 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b1[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B1_t b2 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b2[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B2_t b3 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b3[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B3_t b4 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b4[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B4_t b5 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b5[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B5_t b6 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b6[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B6_t b7 = np.zeros([dataset_info.n_rows, dataset_info.n_cols]).astype(np.int16) b7[0 : B1_t.shape[0], 0 : B1_t.shape[1]] = B7_t if is_partition is True: # width of a block bytesize = 2 # short16 = 2 * byte # source: https://towardsdatascience.com/efficiently-splitting-an-image-into-tiles- # in-python-using-numpy-d1bf0dd7b6f7 B1_blocks = np.lib.stride_tricks.as_strided( b1, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B2_blocks = np.lib.stride_tricks.as_strided( b2, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B3_blocks = np.lib.stride_tricks.as_strided( b3, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B4_blocks = np.lib.stride_tricks.as_strided( b4, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B5_blocks = np.lib.stride_tricks.as_strided( b5, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B6_blocks = np.lib.stride_tricks.as_strided( b6, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) B7_blocks = np.lib.stride_tricks.as_strided( b7, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) QA_blocks = np.lib.stride_tricks.as_strided( QA_band_unpacked, shape=( dataset_info.n_block_y, dataset_info.n_block_x, dataset_info.block_height, dataset_info.block_width, ), strides=( dataset_info.n_cols * dataset_info.block_height * bytesize, dataset_info.block_width * bytesize, dataset_info.n_cols * bytesize, bytesize, ), ) for i in range(dataset_info.n_block_y): for j in range(dataset_info.n_block_x): # check if no valid pixels in the chip, then eliminate qa_unique = np.unique(QA_blocks[i][j]) # skip blocks are all cloud, shadow or filled values # in DHTC, we also don't need to save pixel that has qa value of 'QA_CLOUD', # 'QA_SHADOW', or FILLED value (255) if ( QA_CLEAR not in qa_unique and QA_WATER not in qa_unique and QA_SNOW not in qa_unique ): continue block_folder = "block_x{}_y{}".format(j + 1, i + 1) np.save( join(join(out_dir, block_folder), file_name), np.dstack( [ B1_blocks[i][j], B2_blocks[i][j], B3_blocks[i][j], B4_blocks[i][j], B5_blocks[i][j], B6_blocks[i][j], B7_blocks[i][j], QA_blocks[i][j], ] ).astype(np.int16), ) else: np.save( join(out_dir, file_name), np.dstack([b1, b2, b3, b4, b5, b6, b7, QA_band_unpacked]).astype( np.int16 ), ) # scene_list.append(folder_name) else: # logger.info('Not enough clear observations for {}'.format(folder[0:len(folder) - 3])) logger.warn( "Not enough clear observations for {}".format(folder[0 : len(folder) - 3]) ) # delete unzip folder shutil.rmtree(join(tmp_path, folder), ignore_errors=True) shutil.rmtree(join(tmp_path, folder.replace("SR", "BT")), ignore_errors=True)
[docs] def checkfinished_step1(tmp_path): """ :param tmp_path: :return: """ if not os.path.exists(tmp_path): return False return True
[docs] def checkfinished_step2(out_dir, n_cores): """ :param out_dir: :param n_cores: :return: """ for i in range(n_cores): if not os.path.exists(join(out_dir, "rank{}_finished.txt".format(i + 1))): return False return True
[docs] def checkfinished_step3_partition(out_dir): """ :param out_dir: :return: """ if not os.path.exists(join(out_dir, "starting_last_dates.txt")): return False else: return True
[docs] def checkfinished_step3_nopartition(out_dir): """ :param out_dir: :return: """ if not os.path.exists(join(out_dir, "scene_list.txt")): return False return True
[docs] def get_extent(extent_geojson, res, buf=0): """ read shapefile of a tile from an S3 bucket, and convert projection to be aligned with sample image. arg: 'extent_geojson': sharply geojson object res: planet resolution return: (float, float, float, float), (int, int)) tuple """ # txmin = min([row[0] for row in extent_geojson['coordinates'][0]]) - res / 2.0 # txmax = max([row[0] for row in extent_geojson['coordinates'][0]]) + res / 2.0 # tymin = min([row[1] for row in extent_geojson['coordinates'][0]]) - res / 2.0 # tymax = max([row[1] for row in extent_geojson['coordinates'][0]]) + res / 2.0 txmin = extent_geojson["bbox"][0] - res * buf txmax = extent_geojson["bbox"][2] + res * buf tymin = extent_geojson["bbox"][1] - res * buf tymax = extent_geojson["bbox"][3] + res * buf n_row = np.ceil((tymax - tymin) / res) n_col = np.ceil((txmax - txmin) / res) txmin_new = (txmin + txmax) / 2 - n_col / 2 * res txmax_new = (txmin + txmax) / 2 + n_col / 2 * res tymin_new = (tymin + tymax) / 2 - n_row / 2 * res tymax_new = (tymin + tymax) / 2 + n_row / 2 * res return (txmin_new, txmax_new, tymin_new, tymax_new), (n_row, n_col)
[docs] def get_feature(shp, id): for feature in shp: if feature["properties"]["id"] == id: return feature
[docs] def explode(coords): """Explode a GeoJSON geometry's coordinates object and yield coordinate tuples. As long as the input is conforming, the type of the geometry doesn't matter.""" for e in coords: if isinstance(e, (float, int)): yield coords break else: for f in explode(e): yield f
[docs] def bbox(f): x, y = zip(*list(explode(f["geometry"]["coordinates"]))) return min(x), min(y), max(x), max(y)
[docs] def checkfinished(signal_path) -> bool: """Check if the signal file exists""" return exists(signal_path)
[docs] def safe_stack_single_image_hls( source_dir: str, out_dir: str, data_info: DatasetInfo, logger: Logger, folder: str, low_date_bound: str, upp_date_bound: str, ) -> bool: """Process single HLS image stacking""" [_, _, _, imagetime, _, _] = folder.rsplit(".") img_date = dt.datetime(int(imagetime[0:4]), 1, 1) + dt.timedelta( int(imagetime[4:7]) - 1 ) if low_date_bound is not None: if img_date.date() < parse(low_date_bound).date(): logger.info(f"Skipping {folder} (before {low_date_bound})") return True if upp_date_bound is not None: if img_date.date() > parse(upp_date_bound).date(): logger.info(f"Skipping {folder} (after {upp_date_bound})") return True if single_image_stacking_hls( source_dir=source_dir, out_dir=out_dir, logger=logger, dataset_info=data_info, is_partition=True, clear_threshold=0.0, low_date_bound=low_date_bound, upp_date_bound=upp_date_bound, folder=folder, ): return True logger.warning("Stacking %s failed!", folder) return False
[docs] def process_folder( folder_list, out_path, logger, data_info, low_date_bound, upp_date_bound, ): """Process single folder""" [source_dir, folder] = os.path.split(folder_list) [_, _, _, imagetime, _, _] = folder.rsplit(".") img_date = dt.datetime(int(imagetime[0:4]), 1, 1) + dt.timedelta( int(imagetime[4:7]) - 1 ) if low_date_bound is not None and img_date.date() < parse(low_date_bound).date(): logger.info(f"Skipping {folder} (before {low_date_bound})") return if upp_date_bound is not None and img_date.date() > parse(upp_date_bound).date(): logger.info(f"Skipping {folder} (after {upp_date_bound})") return tile = os.path.basename(source_dir) out_dir = join(out_path, tile + "_stack") safe_stack_single_image_hls( source_dir, out_dir, data_info, logger, folder, low_date_bound, upp_date_bound ) logger.info("Stacking %s completed!", folder)
script_dir = os.path.dirname(os.path.abspath(__file__)) @click.command() @click.option("--source_path", type=str, default=None, help="HLS data directory") @click.option("--yaml_path", type=str, help="Configuration file path") @click.option("--rank", type=int, default=1, help="Current process rank") @click.option("--n_cores", type=int, default=1, help="Parallel cores number") @click.option("--out_path", type=str, default=None, help="Output directory") @click.option( "--low_date_bound", type=str, default=None, help="the lower bound of the date range of user interest. Example: 2019-01-01", ) @click.option( "--upp_date_bound", type=str, default=None, help="the upper bound of the date range of user interest. Example: 2024-12-31", ) def main( source_path, yaml_path, rank, n_cores, out_path, low_date_bound, upp_date_bound, ): """Main processing function""" st = time.time() # Load configuration with open(yaml_path, encoding="utf-8") as yaml_obj: config_general = yaml.safe_load(yaml_obj) data_info = class_from_dict(DatasetInfo, config_general["DATASETINFO"]) folder_list = [] tile_name = os.path.basename(source_path) if exists(source_path): folder_list = [ join(source_path, f) for f in os.listdir(source_path) if os.path.isdir(join(source_path, f)) and f.strip() ] # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", handlers=[logging.StreamHandler()], ) logger = logging.getLogger(__name__) logger.info(f"Processing tile: {tile_name}") logger.info(f"Number of HLS folders found: {len(folder_list)}") # Prepare output directories if rank == 1: tile_dir = join(out_path, tile_name + "_stack") Path(tile_dir).mkdir(exist_ok=True) for i in range(data_info.n_block_y): for j in range(data_info.n_block_x): block_dir = join(tile_dir, f"block_x{j+1}_y{i+1}") if not exists(block_dir): os.mkdir(block_dir) # Create completion marker file with open(join(out_path, f"tmp_creation_finished_{tile_name}.txt"), "w"): pass # Wait for directory preparation to complete while not checkfinished(join(out_path, f"tmp_creation_finished_{tile_name}.txt")): time.sleep(5) # Parallel processing with multiprocessing.Pool(n_cores) as pool: pool.map( functools.partial( process_folder, out_path=out_path, logger=logger, data_info=data_info, low_date_bound=low_date_bound, upp_date_bound=upp_date_bound, ), folder_list, ) # Cleanup and reporting if rank == 1: logger.info("Stacking procedure finished") os.remove(join(out_path, f"tmp_creation_finished_{tile_name}.txt")) logger.info("Total running time: %.2f hours", (time.time() - st) / 3600) if __name__ == "__main__": main()