Source code for pyxccd.pyclassifier

import os
import datetime as dt
from os.path import join, exists
import joblib
import time
import logging
from logging import Logger
import sys
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

# from osgeo import gdal_array
from typing import Optional
from pyxccd.app import defaults
from pyxccd.utils import (
    get_block_y,
    get_block_x,
    get_col_index,
    get_row_index,
    assemble_array,
    rio_loaddata,
    extract_features,
)
from pyxccd.common import DatasetInfo


[docs] def generate_sample_num(label: np.ndarray, sample_parameters: dict) -> np.ndarray: """generate sample number for each land cover category using the method from the paper 'Optimizing selection of training and auxiliary data for operational land cover classification for the LCMAP initiative' Parameters ---------- label : np.ndarray a label map sample_parameters : dict a dictionary must include "total_landcover_category", "total_samples", "max_category_samples", and "min_category_samples" Returns ------- np.ndarray 1-d array that stores the sample number for each label """ unique_category, unique_counts_category = np.unique( label[label <= sample_parameters["total_landcover_category"]], return_counts=True, ) counts_category = np.array([0] * sample_parameters["total_landcover_category"]) for x in range(len(unique_counts_category)): counts_category[int(unique_category[x] - 1)] = unique_counts_category[x] percate_samples = np.array( [ round(x / sum(counts_category) * sample_parameters["total_samples"]) for x in counts_category ] ) percate_samples[percate_samples > sample_parameters["max_category_samples"]] = ( sample_parameters["max_category_samples"] ) percate_samples[percate_samples < sample_parameters["min_category_samples"]] = ( sample_parameters["min_category_samples"] ) # needs to check not exceed the total category pixels percate_samples = np.minimum(percate_samples, counts_category) return percate_samples
[docs] def get_features(path: str) -> np.ndarray: """get block-based features Parameters ---------- path : str Path for feature output Returns ------- np.ndarray 2-d array for block-based features, (block_width*block_height, total feature) """ return np.load(path)
[docs] class PyClassifier: def __init__( self, dataset_info: DatasetInfo, feature_outputs: list = ["a0", "a1", "b1"], logger: Optional[Logger] = None, band_num: int = 7, ): """_summary_ Parameters ---------- dataset_info : :py:type:`~pyxccd.common.DatasetInfo` time-series basic dataset info feature_outputs : list, optional Indicate which features to be outputted. They must be within [a0, c1, a1, b1,a2, b2, a3, b3, rmse], by default ["a0", "a1", "b1"] logger : Optional[Logger], optional log handler, by default None band_num : int, optional the band number to be processed, by default 7 """ self.dataset_info = dataset_info # self.dataset_info.block_width = int(self.dataset_info.n_cols / self.dataset_info.n_block_x) # self.dataset_info.block_height = int(self.dataset_info.n_rows / self.dataset_info.n_block_y) # self.dataset_info.nblocks = self.dataset_info.n_block_x * self.dataset_info.n_block_y for feature in feature_outputs: assert feature in ["a0", "c1", "a1", "b1", "a2", "b2", "a3", "b3", "rmse"] self.n_features = band_num * len(feature_outputs) self.feature_outputs = feature_outputs if logger is None: logging.basicConfig( level=logging.DEBUG, format="%(asctime)s |%(levelname)s| %(funcName)-15s| %(message)s", stream=sys.stdout, ) self.logger = logging.getLogger(__name__) else: self.logger = logger self.band_num = band_num
[docs] def predict_features( self, block_id: int, cold_block: np.ndarray, year_list_to_predict: list ) -> np.ndarray: """ Parameters ---------- block_id: int Block id, started from 1 cold_block: np.ndarray Block-based :py:type:`~pyxccd.common.cold_rec_cg` produced by COLD algorithms year_list_to_predict: list A list of the years to extract features Note that the reason for not parsing cold_block to get year bounds is that the year range of blocks may vary from each other, so the year bounds are required to be defined from the tile level, not block level such as from 'starting_end_date.txt' Returns ------- np.ndarray 3d array, (len(year_list_to_predict), block_width*block_height, n_features) """ block_features = np.full( ( len(year_list_to_predict), self.dataset_info.block_width * self.dataset_info.block_height, self.n_features, ), defaults["COMMON"]["NAN_VAL"], dtype=np.float32, ) ordinal_day_list = [ dt.date(year, 7, 1).toordinal() for year in year_list_to_predict ] if len(cold_block) == 0: self.logger.warning( "the rec_cg file for block_id={} has no records".format(block_id) ) return block_features cold_block_split = np.split( cold_block, np.argwhere(np.diff(cold_block["pos"]) != 0)[:, 0] + 1 ) for element in cold_block_split: # the relative column number in the block i_col = get_col_index( element[0]["pos"], self.dataset_info.n_cols, get_block_x(block_id, self.dataset_info.n_block_x), self.dataset_info.block_width, ) i_row = get_row_index( element[0]["pos"], self.dataset_info.n_cols, get_block_y(block_id, self.dataset_info.n_block_x), self.dataset_info.block_height, ) for band in range(self.band_num): feature_row = extract_features( element, band, ordinal_day_list, defaults["COMMON"]["NAN_VAL"], feature_outputs=self.feature_outputs, ) for index in range(int(self.n_features / self.band_num)): block_features[ :, i_row * self.dataset_info.block_width + i_col, int(band * self.n_features / self.band_num) + index, ] = feature_row[index] return block_features
[docs] def train_rfmodel(self, full_feature_array: np.ndarray, label: np.ndarray): """training a random forest model based on feature layers and a label map Parameters ---------- full_feature_array : np.ndarray 3-d array for full feature layers, (nrows, ncols, feature_number) label : np.ndarray 1-d array for a label map Returns ------- class A sklearn random forest model """ assert label.shape == (self.dataset_info.n_rows, self.dataset_info.n_cols) samplecount = generate_sample_num(label, defaults["CLASSIFIER"]) index_list = [] label_list = [] for i in range(defaults["CLASSIFIER"]["total_landcover_category"]): index = np.argwhere(label == i + 1) np.random.seed(42) # set random seed to reproduce the same result index_sample = index[ np.random.choice(len(index), int(samplecount[i]), replace=False) ] index_list.append(index_sample) label_list.append(np.array([i + 1] * len(index_sample))) index_list = np.vstack(index_list) label_list = np.hstack(label_list) feature_extraction = np.array( [full_feature_array[tuple(x)] for x in index_list] ).astype(np.float32) rf_model = RandomForestClassifier(random_state=42, max_depth=20) rf_model.fit(feature_extraction, label_list) return rf_model
[docs] def classify_block(self, rf_model, tmp_feature: np.ndarray) -> np.ndarray: """classify feature block for a single year Parameters ---------- rf_model : dict sklearn random forest model tmp_feature : np.ndarray 2-d array for features, (block_width*block_height, n_features) Returns ------- np.ndarray 2d array, a classification map, (block_height, block_width) """ cmap = rf_model.predict(tmp_feature).reshape( self.dataset_info.block_height, self.dataset_info.block_width ) mask = np.all(tmp_feature == defaults["COMMON"]["NAN_VAL"], axis=1).reshape( self.dataset_info.block_height, self.dataset_info.block_width ) cmap[mask] = 255 return cmap
# def _assemble_covermap(self, blocklist_yearlyclass, year): # full_yearlyclass_array = assemble_array(blocklist_yearlyclass, self.dataset_info.n_block_x) # if (full_yearlyclass_array.shape[1] != self.dataset_info.n_cols) or (full_yearlyclass_array.shape[0] != # self.dataset_info.n_rows): # logger.error('The assemble category map is incomplete for {}'.format(year)) # return full_yearlyclass_array[:, :, 0] # we only return the first channel
[docs] class PyClassifierHPC(PyClassifier): """ this class adds IO functions based on the HPC environment for the base class """ def __init__( self, dataset_info: DatasetInfo, record_path: str, band_num: int = 7, year_list_to_predict=list(range(1982, 2022)), tmp_path: Optional[str] = None, output_path: Optional[str] = None, feature_outputs: list = ["a0", "a1", "b1"], seedmap_path: Optional[str] = None, rf_path: Optional[str] = None, logger: Optional[Logger] = None, ): """_summary_ Parameters ---------- dataset_info : DatasetInfo Time-series dataset information data class record_path : str Path that are source folder for the COLD results band_num : int, optional The band number, by default 7 year_list_to_predict : _type_, optional A list of years for classification, by default list(range(1982, 2022)) tmp_path : Optional[str], optional Path for saving temporal files, by default None. if None, will set /record_path/feature_maps output_path : Optional[str], optional Path to save classification outputS, by default None feature_outputs : list, optional A list of outputted feature name, it must be within [a0, c1, a1, b1,a2, b2, a3, b3, rmse]. by default ["a0", "a1", "b1"] seedmap_path : Optional[str], optional Path for the seed map that provides labels to produce rf model, by default None rf_path : Optional[str], optional Path for existing random forest forest, by default None logger : Optional[Logger], optional Logger handler, by default None Raises ------ e Raise the error when the parameter inputs for initializing classifier are not correct """ try: self._check_inputs_thematic( dataset_info, record_path, tmp_path, seedmap_path, rf_path ) except (ValueError, FileExistsError) as e: raise e self.dataset_info = dataset_info self.record_path = record_path for feature in feature_outputs: assert feature in ["a0", "c1", "a1", "b1", "a2", "b2", "a3", "b3", "rmse"] self.feature_outputs = feature_outputs if tmp_path is None: self.tmp_path = join(record_path, "feature_maps") # default path else: self.tmp_path = tmp_path if output_path is None: self.output_path = join(record_path, "feature_maps") # default path else: self.output_path = self.tmp_path self.n_features = band_num * len(feature_outputs) self.year_list_to_predict = year_list_to_predict self.seedmap_path = seedmap_path if rf_path is None: self.rf_path = join(self.output_path, "rf.model") # default path else: self.rf_path = rf_path if logger is None: logging.basicConfig( level=logging.DEBUG, format="%(asctime)s |%(levelname)s| %(funcName)-15s| %(message)s", stream=sys.stdout, ) self.logger = logging.getLogger(__name__) else: self.logger = logger self.band_num = band_num @staticmethod def _check_inputs_thematic( dataset_info, record_path, tmp_path, seedmap_path, rf_path ): """check the inputs for initializing a classifier Parameters ---------- dataset_info : DatasetInfo Basic information of time-series dataset record_path : str Path that saved COLD results tmp_path : str Path for saving temporal files, by default None. if None, will set /record_path/feature_maps seedmap_path : str Path for the seed map that provides labels to produce rf model, by default None rf_path : str Path for existing random forest forest, by default None """ if (isinstance(dataset_info.n_rows, int) is False) or (dataset_info.n_rows < 0): raise ValueError("n_rows must be positive integer") if (isinstance(dataset_info.n_cols, int) is False) or (dataset_info.n_cols < 0): raise ValueError("n_cols must be positive integer") if (isinstance(dataset_info.n_block_x, int) is False) or ( dataset_info.n_block_x < 0 ): raise ValueError("n_block_x must be positive integer") if (isinstance(dataset_info.n_block_y, int) is False) or ( dataset_info.n_block_y < 0 ): raise ValueError("n_block_y must be positive integer") if (isinstance(dataset_info.n_block_y, int) is False) or ( dataset_info.n_block_y < 0 ): raise ValueError("n_block_y must be positive integer") if os.path.isdir(record_path) is False: raise FileExistsError("No such directory: {}".format(record_path)) if seedmap_path is not None: if os.path.isfile(seedmap_path) is False: raise FileExistsError("No such file: {}".format(seedmap_path)) if rf_path is not None: if os.path.isfile(rf_path) is False: raise FileExistsError("No such file: {}".format(rf_path)) def _save_features(self, block_id, block_features): """ Parameters ---------- block_id: integer block_features Returns ------- """ for id, year in enumerate(self.year_list_to_predict): np.save( os.path.join(self.tmp_path, "tmp_feature_year{}_block{}.npy").format( year, block_id ), block_features[id, :, :], )
[docs] def is_finished_step1_predict_features(self): for iblock in range(self.dataset_info.nblocks): if not exists( join( self.tmp_path, "tmp_step1_predict_{}_finished.txt".format(iblock + 1), ) ): return False return True
@staticmethod def _save_rf_model(rf_model, rf_path): joblib.dump(rf_model, rf_path, compress=3) def _is_finished_step2_train_rfmodel(self): return exists(self.rf_path) def _get_rf_model(self): return joblib.load(self.rf_path) def _save_yearlyclassification_maps(self, block_id, year, cmap): outfile = join( self.tmp_path, "tmp_yearlyclassification{}_block{}.npy".format(year, block_id), ) np.save(outfile, cmap) def _is_finished_step3_classification(self): """ :return: True or false """ for iblock in range(self.dataset_info.nblocks): if not exists( join( self.tmp_path, "tmp_step3_classification_{}_finished.txt".format(iblock + 1), ) ): return False return True def _save_covermaps(self, full_yearlyclass_array, year): np.save( join(self.output_path, "yearlyclassification_{}.npy".format(year)), full_yearlyclass_array, ) def _clean(self): tmp_yearlyclass_filenames = [ file for file in os.listdir(self.tmp_path) if file.startswith("tmp_") ] for file in tmp_yearlyclass_filenames: os.remove(join(self.tmp_path, file)) def _get_fullclassification_forcertainyear(self, year): tmp_yearlyclass_filenames = [ file for file in os.listdir(self.tmp_path) if file.startswith("tmp_yearlyclassification{}".format(year)) ] # sort to guarantee order follows low to high rows tmp_yearlyclass_filenames.sort( key=lambda t: int(t[t.find("block") + 5 : t.find(".npy")]) ) return [ np.load(join(self.tmp_path, file)).reshape( self.dataset_info.block_height, self.dataset_info.block_width, 1 ) for file in tmp_yearlyclass_filenames ]
[docs] def get_fullfeature_forcertainyear(self, year: int) -> list: """get Parameters ---------- year : int Year to get a feature Returns ------- list A list of all blocks as 3-d array (block_height, block_width, n_features) """ tmp_feature_filenames = [ file for file in os.listdir(self.tmp_path) if file.startswith("tmp_feature_year{}".format(year)) ] if len(tmp_feature_filenames) < self.dataset_info.nblocks: self.logger.warning( "tmp features are incomplete! should have {}; but actually have {} feature images".format( self.dataset_info.nblocks, len(tmp_feature_filenames) ) ) tmp_feature_filenames.sort( key=lambda t: int(t[t.find("block") + 5 : t.find(".npy")]) ) # sorted by row number return [ np.load(join(self.tmp_path, file)).reshape( self.dataset_info.block_height, self.dataset_info.block_width, self.n_features, ) for file in tmp_feature_filenames ]
# full_feature_array = assemble_array(, self.dataset_info.n_block_x) # if (full_feature_array.shape[1] != self.dataset_info.n_cols) or (full_feature_array.shape[0] != # self.dataset_info.n_rows): # logger.error('The feature image is incomplete for {}'.format(year)) # return full_feature_array
[docs] def hpc_preparation(self): if exists(self.tmp_path) is False: try: os.mkdir(self.tmp_path) except IOError as e: raise e if exists(self.output_path) is False: try: os.mkdir(self.output_path) except IOError as e: raise e
[docs] def step1_feature_generation(self, block_id: int): """generate feature based on COLD results Parameters ---------- block_id : int the id of block """ if os.path.exists( join(self.record_path, "record_change_x{}_y{}_cold.npy").format( get_block_x(block_id, self.dataset_info.n_block_x), get_block_y(block_id, self.dataset_info.n_block_x), ) ): cold_block = np.load( join(self.record_path, "record_change_x{}_y{}_cold.npy").format( get_block_x(block_id, self.dataset_info.n_block_x), get_block_y(block_id, self.dataset_info.n_block_x), ) ) block_features = self.predict_features( block_id, cold_block, self.year_list_to_predict ) else: block_features = np.full( ( len(self.year_list_to_predict), self.dataset_info.block_width * self.dataset_info.block_height, self.n_features, ), defaults["COMMON"]["NAN_VAL"], dtype=np.float32, ) self._save_features(block_id, block_features) with open( join(self.tmp_path, "tmp_step1_predict_{}_finished.txt".format(block_id)), "w", ): pass
[docs] def step2_train_rf( self, ref_year: Optional[int] = None, rf_path: Optional[str] = None ): """training a random forest model, and save it into rf_path Parameters ---------- ref_year : int, optional The year to provide features which will be further connected to seed map, by default None rf_path : str, optional Path to save random forest model , by default None. If none, will saved to rf_path Raises ------ ValueError couldn't locate seedmap that provide label maps """ while not self.is_finished_step1_predict_features(): time.sleep(5) if ref_year is None: ref_year = defaults["CLASSIFIER"]["training_year"] # if ref_year not in self.year_list_to_predict: # raise Exception("Ref_year {} is not in year_list_to_predict {}. " # "PLease included it and re-run step1_feature_generation".format(ref_year, # self.year_list_to_predict)) full_feature_array = assemble_array( self.get_fullfeature_forcertainyear(ref_year), self.dataset_info.n_block_x ) if self.seedmap_path is None: raise ValueError("seedmap_path is not assigned") label_array = rio_loaddata(os.fspath(self.seedmap_path)) rf_model = self.train_rfmodel(full_feature_array, label_array) if rf_path is None: self._save_rf_model(rf_model, self.rf_path) else: self._save_rf_model(rf_model, rf_path)
[docs] def step3_classification(self, block_id: int): """classify a block Parameters ---------- block_id : int_ the block id Raises ------ IOError _description_ """ while not self._is_finished_step2_train_rfmodel(): time.sleep(5) time.sleep( 5 ) # wait for 5 more seconds to guarantee all data of rf.model was down to the disk try: rf_model = self._get_rf_model() except IOError as e: raise IOError( "Please double check your rf model file directory or generate random forest model first" ) from e for year in self.year_list_to_predict: tmp_feature_block = get_features( join( self.tmp_path, "tmp_feature_year{}_block{}.npy".format(year, block_id), ) ) cmap = self.classify_block(rf_model, tmp_feature_block) self._save_yearlyclassification_maps(block_id, year, cmap) with open( join( self.tmp_path, "tmp_step3_classification_{}_finished.txt".format(block_id), ), "w", ): pass
[docs] def step3_classification_sccd(self, block_id: int): """classifying based on sccd output Parameters ---------- block_id : int block id Raises ------ IOError couldn't locate rf model file """ while not self._is_finished_step2_train_rfmodel(): time.sleep(5) try: rf_model = self._get_rf_model() except IOError as e: raise IOError( "Please double check your rf model file directory or generate random forest model first" ) from e # for year in self.year_list_to_predict: # tmp_feature_block = get_features(join(self.tmp_path, 'tmp_feature_year{}_block{}.npy'.format(year, # block_id))) # cmap = self.classify_block(rf_model, tmp_feature_block) # self._save_yearlyclassification_maps(block_id, year, cmap) tmp_feature_block = get_features( join(self.tmp_path, "tmp_feature_now_block{}.npy".format(block_id)) ) if exists( join( self.tmp_path, "tmp_step3_classification_{}_finished.txt".format(block_id), ) ): return cmap = self.classify_block(rf_model, tmp_feature_block) self._save_yearlyclassification_maps(block_id, "now", cmap) with open( join( self.tmp_path, "tmp_step3_classification_{}_finished.txt".format(block_id), ), "w", ): pass
[docs] def step4_assemble(self, clean=True): """assesmble all block-based classification results into one map Parameters ---------- clean : bool, optional _description_, by default True """ while not self._is_finished_step3_classification(): time.sleep(5) for year in self.year_list_to_predict: full_yearlyclass_array = assemble_array( self._get_fullclassification_forcertainyear(year), self.dataset_info.n_block_x, )[:, :, 0] self._save_covermaps(full_yearlyclass_array, year) if clean: self._clean() # _clean all temp files
[docs] def step4_assemble_sccd(self, clean=True): """assessmble all block-based classification results based upon sccd into one map Parameters ---------- clean : bool, optional _description_, by default True """ while not self._is_finished_step3_classification(): time.sleep(5) full_yearlyclass_array = assemble_array( self._get_fullclassification_forcertainyear("now"), self.dataset_info.n_block_x, )[:, :, 0] self._save_covermaps(full_yearlyclass_array.astype(np.uint8), "now") # for year in self.year_list_to_predict: # full_yearlyclass_array = assemble_array(self._get_fullclassification_forcertainyear(year), # self.dataset_info.n_block_x)[:, :, 0] # self._save_covermaps(full_yearlyclass_array, year) if clean: self._clean() # _clean all temp files
[docs] def is_finished_step4_assemble(self) -> bool: """check if step is finished Returns ------- bool """ for year in self.year_list_to_predict: if not os.path.exists( join(self.output_path, "yearlyclassification_{}.npy".format(year)) ): return False else: return True