KernelADASYN过采样方法
方法的伪代码:
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
KADASYN Python代码实现:
class KernelADASYN(OverSampling): """ Notes: * The method of sampling was not specified, Markov Chain Monte Carlo has been implemented. * Not prepared for improperly conditioned covariance matrix. """ categories= [OverSampling.cat_density_estimation, OverSampling.cat_extensive, OverSampling.cat_borderline] def __init__(self, proportion= 1.0, k= 5, h= 1.0, n_jobs= 1): """ Constructor of the sampling object Args: proportion (float): proportion of the difference of n_maj and n_min to sample e.g. 1.0 means that after sampling the number of minority samples will be equal to the number of majority samples k (int): number of neighbors in the nearest neighbors component h (float): kernel bandwidth n_jobs (int): number of parallel jobs """ super().__init__() self.check_greater_or_equal(proportion, "proportion", 0) self.check_greater_or_equal(k, 'k', 1) self.check_greater(h, 'h', 0) self.check_n_jobs(n_jobs, 'n_jobs') self.proportion= proportion self.k= k self.h= h self.n_jobs= n_jobs @classmethod def parameter_combinations(cls): """ Generates reasonable paramter combinations. Returns: list(dict): a list of meaningful paramter combinations """ return cls.generate_parameter_combinations({'proportion': [0.1, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0], 'k': [5, 7, 9], 'h': [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 10.0]}) def sample(self, X, y): """ Does the sample generation according to the class paramters. Args: X (np.ndarray): training set y (np.array): target labels Returns: (np.ndarray, np.array): the extended training set and target labels """ _logger.info(self.__class__.__name__ + ": " +"Running sampling via %s" % self.descriptor()) self.class_label_statistics(X, y) num_to_sample= self.number_of_instances_to_sample(self.proportion, self.class_stats[self.majority_label], self.class_stats[self.minority_label]) if num_to_sample == 0: _logger.warning(self.__class__.__name__ + ": " + "Sampling is not needed") return X.copy(), y.copy() X_min= X[y == self.minority_label] # fitting the nearest neighbors model nn= NearestNeighbors(min([len(X_min), self.k+1]), n_jobs= self.n_jobs) nn.fit(X) distances, indices= nn.kneighbors(X_min) # computing majority score r= np.array([np.sum(y[indices[i][1:]] == self.majority_label) for i in range(len(X_min))]) if np.sum(r > 0) < 2: _logger.info(self.__class__.__name__ + ": " + "majority score is 0 for all or all but one minority samples") return X.copy(), y.copy() r= r/np.sum(r) # kernel density function def p_x(x): """ Returns minority density value at x Args: x (np.array): feature vector Returns: float: density value """ result= 1.0/(len(X_min)*self.h) result= result*(1.0/(np.sqrt(2*np.pi)*self.h)**len(X[0])) return result*np.inner(r, np.exp(-0.5*np.linalg.norm(x - X_min, axis= 1)**2/self.h)) #return result*np.sum([r[i]*np.exp(-0.5*np.linalg.norm(x - X_min[i])**2/self.h) for i in range(len(X_min))]) samples= [] it= 0 # parameters of the Monte Carlo sampling burn_in= 1000 periods= 50 # covariance is used to generate a random sample in the neighborhood covariance= np.cov(X_min[r > 0], rowvar= False) if len(covariance) > 1 and np.linalg.cond(covariance) > 10000: _logger.info(self.__class__.__name__ + ": " + "reducing dimensions due to inproperly conditioned covariance matrix") if len(X[0]) <= 2: _logger.info(self.__class__.__name__ + ": " + "matrix ill-conditioned") return X.copy(), y.copy() n_components= int(np.rint(len(covariance)/2)) pca= PCA(n_components= n_components) X_trans= pca.fit_transform(X) ka= KernelADASYN(proportion= self.proportion, k= self.k, h= self.h) X_samp, y_samp= ka.sample(X_trans, y) return pca.inverse_transform(X_samp), y_samp # starting Markov-Chain Monte Carlo for sampling x_old= X_min[np.random.choice(np.where(r > 0)[0])] p_old= p_x(x_old) # Cholesky decomposition L= np.linalg.cholesky(covariance) while len(samples) < num_to_sample: x_new= x_old + np.dot(np.random.normal(size= len(x_old)), L) p_new= p_x(x_new) alpha= p_new/p_old u= np.random.random() if u < alpha: x_old= x_new p_old= p_new else: pass it= it + 1 if it % periods == 0 and it > burn_in: samples.append(x_old) return np.vstack([X, np.vstack(samples)]), np.hstack([y, np.repeat(self.minority_label, len(samples))]) def get_params(self): """ Returns: dict: the parameters of the current sampling object """ return {'proportion': self.proportion, 'k': self.k, 'h': self.h, 'n_jobs': self.n_jobs}

更多精彩