Source code for pyxccd.imagetool.tile_processing

# Author: Su Ye
# This script is an example for running pyscold in UCONN job array environment
# Due to the independence features of job array, we use writing disk files for communication
# Two types of logging files are used: 1) print() to save block-based  processing info into individual slurm file;
# 2) tile_processing_report.log records config for each tile
# 2) is only called when rank == 1
# python tile_processing.py --stack_path=/data/results/204_22_stack --result_path=/data/results/204_22_ccdc --yaml_path=/home/colory666/pyxccd_imagetool/config.yaml --n_cores=30 "--source_dir=/data/landsat_c2/204_22"
import yaml
import os
from os.path import join
import pandas as pd
import datetime as dt
import numpy as np
import pyxccd
from datetime import datetime
from pytz import timezone
import click
import time
import pickle
from dateutil.parser import parse
from pyxccd import cold_detect, sccd_detect

from pyxccd.utils import (
    get_rowcol_intile,
    unindex_sccdpack,
    class_from_dict,
)
import functools
import multiprocessing
from pyxccd.common import DatasetInfo

TZ = timezone("Asia/Shanghai")


[docs] def tileprocessing_report( result_log_path, stack_path, version, algorithm, dataset_info, startpoint, cold_timepoint, tz, n_cores, probability_threshold, conse, ): """ output tile-based processing report Parameters ---------- result_log_path: string outputted log path stack_path: string stack path version: string algorithm: string dataset_info: dictionary structure startpoint: a time point, when the program starts tz: string, time zone n_cores: the core number used probability_threshold: change probability threshold conse: number of consecutive observations """ endpoint = datetime.now(TZ) file = open(result_log_path, "w") file.write("pyxccd V{} \n".format(version)) file.write("Author: Su Ye(remoteseningsuy@gmail.com)\n") file.write("Algorithm: {} \n".format(algorithm)) file.write("Starting_time: {}\n".format(startpoint.strftime("%Y-%m-%d %H:%M:%S"))) file.write("Change probability threshold: {}\n".format(probability_threshold)) file.write("Conse: {}\n".format(conse)) file.write("stack_path: {}\n".format(stack_path)) file.write("The number of requested cores: {}\n".format(n_cores)) file.write( "The program starts at {}\n".format(startpoint.strftime("%Y-%m-%d %H:%M:%S")) ) file.write( "The processing ends at {}\n".format( cold_timepoint.strftime("%Y-%m-%d %H:%M:%S") ) ) file.write( "The program ends at {}\n".format(endpoint.strftime("%Y-%m-%d %H:%M:%S")) ) file.write( "The program lasts for {:.2f}mins\n".format( (endpoint - startpoint) / dt.timedelta(minutes=1) ) ) file.close()
[docs] def is_finished_cold_blockfinished(result_path, nblocks): """ check if the algorithm finishes all blocks Parameters ---------- result_path: the path that save results nblocks: the block number Returns: boolean ------- True -> all block finished """ for n in range(nblocks): if not os.path.exists( os.path.join(result_path, "COLD_block{}_finished.txt".format(n + 1)) ): return False return True
[docs] def get_stack_date( dataset_info: DatasetInfo, block_x: int, block_y: int, stack_path: str, low_datebound: int = 0, high_datebound: int = 999999, nband: int = 8, ): """ :param dataset_info: :param block_x: block id at x axis :param block_y: block id at y axis :param stack_path: stack path :param low_datebound: ordinal data of low bounds of selection date range :param high_datebound: ordinal data of upper bounds of selection date range :return: img_tstack, img_dates_sorted img_tstack - 3-d array (dataset_info.block_width * dataset_info.block_height, nband, nimage) """ dataset_info.block_width = int( dataset_info.n_cols / dataset_info.n_block_x ) # width of a block dataset_info.block_height = int( dataset_info.n_rows / dataset_info.n_block_y ) # height of a block block_folder = join(stack_path, "block_x{}_y{}".format(block_x, block_y)) img_files = [ f for f in os.listdir(block_folder) if f.startswith("L") or f.startswith("S") ] if len(img_files) == 0: return None, None # sort image files by dates img_dates = [ pd.Timestamp.toordinal( dt.datetime(int(folder_name[9:13]), 1, 1) + dt.timedelta(int(folder_name[13:16]) - 1) ) for folder_name in img_files ] img_dates_selected = [ img_dates[i] for i, date in enumerate(img_dates) if img_dates[i] >= low_datebound and img_dates[i] <= high_datebound ] img_files_selected = [ img_files[i] for i, date in enumerate(img_dates) if img_dates[i] >= low_datebound and img_dates[i] <= high_datebound ] files_date_zip = sorted(zip(img_dates_selected, img_files_selected)) img_files_sorted = [x[1] for x in files_date_zip] img_dates_sorted = np.asarray([x[0] for x in files_date_zip]) img_tstack = np.dstack( [ np.load(join(block_folder, f)).reshape( dataset_info.block_width * dataset_info.block_height, nband ) for f in img_files_sorted ] ) return img_tstack, img_dates_sorted
[docs] def block_tile_processing( dataset_info, stack_path, result_path, method, probability_threshold, conse, b_c2, low_datebound, upper_datebound, block_id, ): if block_id > dataset_info.n_block_x * dataset_info.n_block_y: return # note that block_x and block_y start from 1 block_y = int((block_id - 1) / dataset_info.n_block_x) + 1 block_x = int((block_id - 1) % dataset_info.n_block_x) + 1 finished_file = ( "COLD_block{}_finished.txt".format(block_id) if method == "COLD" else "SCCD_block{}_finished.txt".format(block_id) ) if os.path.exists(join(result_path, finished_file)): print( "Per-pixel {} processing is finished for block_x{}_y{} ({})".format( method, block_x, block_y, datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") ) ) return img_tstack, img_dates_sorted = get_stack_date( dataset_info, block_x, block_y, stack_path, low_datebound, upper_datebound ) if img_tstack is None: # empty block print( "Empty block block_x{}_y{} ({})".format( block_x, block_y, datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") ) ) with open(join(result_path, finished_file), "w"): pass return # for sccd, as the output is heterogeneous, we continuously save the sccd pack for each pixel if method == "SCCDOFFLINE": result_file = join( result_path, "record_change_x{}_y{}_sccd.npy".format(block_x, block_y), ) # start looping every pixel in the block try: with open(result_file, "wb+") as f: for pos in range(dataset_info.block_width * dataset_info.block_height): original_row, original_col = get_rowcol_intile( pos, dataset_info.block_width, dataset_info.block_height, block_x, block_y, ) try: sccd_result = sccd_detect( img_dates_sorted, img_tstack[pos, 0, :].astype(np.int64), img_tstack[pos, 1, :].astype(np.int64), img_tstack[pos, 2, :].astype(np.int64), img_tstack[pos, 3, :].astype(np.int64), img_tstack[pos, 4, :].astype(np.int64), img_tstack[pos, 5, :].astype(np.int64), img_tstack[pos, 6, :].astype(np.int64), p_cg=probability_threshold, conse=conse, b_c2=b_c2, pos=dataset_info.n_cols * (original_row - 1) + original_col, ) except RuntimeError: print( "S-CCD fails at original_row {}, original_col {} ({})".format( original_row, original_col, datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S"), ) ) else: pickle.dump(unindex_sccdpack(sccd_result), f) with open(join(result_path, finished_file), "w"): pass print( "Per-pixel SCCD processing is finished for block_x{}_y{} ({})".format( block_x, block_y, datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") ) ) except Exception as e: print( "SCCD processing failed for block_x{}_y{}: {} ({})".format( block_x, block_y, str(e), datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S"), ) ) if os.path.exists(result_file): try: os.remove(result_file) except: pass else: result_collect = [] result_file = join( result_path, "record_change_x{}_y{}_cold.npy".format(block_x, block_y), ) try: for pos in range(dataset_info.block_width * dataset_info.block_height): original_row, original_col = get_rowcol_intile( pos, dataset_info.block_width, dataset_info.block_height, block_x, block_y, ) try: cold_result = cold_detect( img_dates_sorted, img_tstack[pos, 0, :].astype(np.int64), img_tstack[pos, 1, :].astype(np.int64), img_tstack[pos, 2, :].astype(np.int64), img_tstack[pos, 3, :].astype(np.int64), img_tstack[pos, 4, :].astype(np.int64), img_tstack[pos, 5, :].astype(np.int64), img_tstack[pos, 6, :].astype(np.int64), img_tstack[pos, 7, :].astype(np.int64), p_cg=probability_threshold, conse=conse, b_c2=b_c2, pos=dataset_info.n_cols * (original_row - 1) + original_col, ) except RuntimeError: print( "COLD fails at original_row {}, original_col {} ({})".format( original_row, original_col, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) ) except Exception as e: print( "COLD error at original_row {}, original_col {}: {} ({})".format( original_row, original_col, str(e), datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) ) else: result_collect.append(cold_result) # Save COLD results if len(result_collect) > 0: np.save(result_file, np.hstack(result_collect)) with open(join(result_path, finished_file), "w"): pass print( "Per-pixel COLD processing is finished for block_x{}_y{} ({})".format( block_x, block_y, datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") ) ) except Exception as e: print( "COLD processing failed for block_x{}_y{}: {} ({})".format( block_x, block_y, str(e), datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S"), ) ) if os.path.exists(result_file): try: os.remove(result_file) except: pass
@click.command() @click.option("--rank", type=int, default=1, help="the rank id") @click.option("--n_cores", type=int, default=1, help="the total cores assigned") @click.option("--stack_path", type=str, help="the path for stack data") @click.option("--result_path", type=str, help="the path for storing results") @click.option("--yaml_path", type=str, help="YAML path") @click.option( "--method", type=click.Choice(["COLD", "SCCDOFFLINE"]), default="COLD", help="COLD or SCCD-OFFLINE", ) @click.option( "--low_datebound", type=str, default=None, help="low date bound of image selection for processing. Example - 2019-01-01", ) @click.option( "--upper_datebound", type=str, default=None, help="upper date bound of image selection for processing. Example - 2024-12-31", ) def main( rank, n_cores, stack_path, result_path, yaml_path, method, low_datebound, upper_datebound, ): start_time = datetime.now(TZ) if low_datebound is None: low_datebound = 0 else: low_datebound = parse(low_datebound).toordinal() if upper_datebound is None: upper_datebound = 999999 else: upper_datebound = parse(upper_datebound).toordinal() # Reading config with open(yaml_path, "r") as yaml_obj: config_general = yaml.safe_load(yaml_obj) dataset_info = class_from_dict(DatasetInfo, config_general["DATASETINFO"]) conse = int(config_general["ALGORITHMINFO"]["conse"]) probability_threshold = config_general["ALGORITHMINFO"]["probability_threshold"] if (dataset_info.n_cols % dataset_info.block_width != 0) or ( dataset_info.n_rows % dataset_info.block_height != 0 ): print( "n_cols, n_rows must be divisible respectively by dataset_info.block_width, dataset_info.block_height! Please check your config yaml" ) exit() # logging and folder preparation if rank == 1: if not os.path.exists(result_path): os.makedirs(result_path) print( "The per-pixel time series processing begins: {}".format( start_time.strftime("%Y-%m-%d %H:%M:%S") ) ) if not os.path.exists(stack_path): print( "Failed to locate stack folders. The program ends: {}".format( datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") ) ) return ######################################################################### # per-pixel processing procedure # ######################################################################### block_list = list(range(1, dataset_info.n_block_x * dataset_info.n_block_y + 1, 1)) pool = multiprocessing.Pool(n_cores) partial_func = functools.partial( block_tile_processing, dataset_info, stack_path, result_path, method, probability_threshold, conse, True, low_datebound, upper_datebound, ) pool.map(partial_func, block_list) pool.close() pool.join() # wait for all cores to be finished while not is_finished_cold_blockfinished( result_path, dataset_info.n_block_x * dataset_info.n_block_y ): time.sleep(30) if rank == 1: cold_timepoint = datetime.now(TZ) tileprocessing_report( join(result_path, "tile_processing_report.log"), stack_path, pyxccd.__version__, method, dataset_info, start_time, cold_timepoint, TZ, n_cores, probability_threshold, conse, ) print( "The whole procedure finished: {}".format( datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") ) ) if __name__ == "__main__": main()