方法的伪代码:

 KernelADASYN过采样方法 随笔

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

 

KADASYN Python代码实现:

class KernelADASYN(OverSampling):
    """ 
    
    Notes:
        * The method of sampling was not specified, Markov Chain Monte Carlo has been implemented.
        * Not prepared for improperly conditioned covariance matrix.
    """
    
    categories= [OverSampling.cat_density_estimation,
                 OverSampling.cat_extensive,
                 OverSampling.cat_borderline]
    
    def __init__(self, proportion= 1.0, k= 5, h= 1.0, n_jobs= 1):
        """
        Constructor of the sampling object
        
        Args:
            proportion (float): proportion of the difference of n_maj and n_min to sample
                                    e.g. 1.0 means that after sampling the number of minority
                                    samples will be equal to the number of majority samples
            k (int): number of neighbors in the nearest neighbors component
            h (float): kernel bandwidth
            n_jobs (int): number of parallel jobs
        """
        super().__init__()
        self.check_greater_or_equal(proportion, "proportion", 0)
        self.check_greater_or_equal(k, 'k', 1)
        self.check_greater(h, 'h', 0)
        self.check_n_jobs(n_jobs, 'n_jobs')
        
        self.proportion= proportion
        self.k= k
        self.h= h
        self.n_jobs= n_jobs
        
    @classmethod
    def parameter_combinations(cls):
        """
        Generates reasonable paramter combinations.
        
        Returns:
            list(dict): a list of meaningful paramter combinations
        """
        return cls.generate_parameter_combinations({'proportion': [0.1, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0], 
                                                    'k': [5, 7, 9], 
                                                    'h': [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 10.0]})
    
    def sample(self, X, y):
        """
        Does the sample generation according to the class paramters.
        
        Args:
            X (np.ndarray): training set
            y (np.array): target labels
            
        Returns:
            (np.ndarray, np.array): the extended training set and target labels
        """
        _logger.info(self.__class__.__name__ + ": " +"Running sampling via %s" % self.descriptor())
        
        self.class_label_statistics(X, y)
        
        num_to_sample= self.number_of_instances_to_sample(self.proportion, self.class_stats[self.majority_label], self.class_stats[self.minority_label])
        
        if num_to_sample == 0:
            _logger.warning(self.__class__.__name__ + ": " + "Sampling is not needed")
            return X.copy(), y.copy()
        
        X_min= X[y == self.minority_label]
        
        # fitting the nearest neighbors model
        nn= NearestNeighbors(min([len(X_min), self.k+1]), n_jobs= self.n_jobs)
        nn.fit(X)
        distances, indices= nn.kneighbors(X_min)
        
        # computing majority score
        r= np.array([np.sum(y[indices[i][1:]] == self.majority_label) for i in range(len(X_min))])
        
        if np.sum(r > 0) < 2:
            _logger.info(self.__class__.__name__ + ": " + "majority score is 0 for all or all but one minority samples")
            return X.copy(), y.copy()
        
        r= r/np.sum(r)
        
        # kernel density function
        def p_x(x):
            """
            Returns minority density value at x
            
            Args:
                x (np.array): feature vector
                
            Returns:
                float: density value
            """
            result= 1.0/(len(X_min)*self.h)
            result= result*(1.0/(np.sqrt(2*np.pi)*self.h)**len(X[0]))
            
            return result*np.inner(r, np.exp(-0.5*np.linalg.norm(x - X_min, axis= 1)**2/self.h))
            
            #return result*np.sum([r[i]*np.exp(-0.5*np.linalg.norm(x - X_min[i])**2/self.h) for i in range(len(X_min))])
        
        samples= []
        it= 0
        
        # parameters of the Monte Carlo sampling
        burn_in= 1000
        periods= 50
        
        # covariance is used to generate a random sample in the neighborhood
        covariance= np.cov(X_min[r > 0], rowvar= False)
        
        if len(covariance) > 1 and np.linalg.cond(covariance) > 10000:
            _logger.info(self.__class__.__name__ + ": " + "reducing dimensions due to inproperly conditioned covariance matrix")
            if len(X[0]) <= 2:
                _logger.info(self.__class__.__name__ + ": " + "matrix ill-conditioned")
                return X.copy(), y.copy()
            n_components= int(np.rint(len(covariance)/2))
            pca= PCA(n_components= n_components)
            X_trans= pca.fit_transform(X)
            ka= KernelADASYN(proportion= self.proportion, k= self.k, h= self.h)
            X_samp, y_samp= ka.sample(X_trans, y)
            return pca.inverse_transform(X_samp), y_samp
        
        # starting Markov-Chain Monte Carlo for sampling
        x_old= X_min[np.random.choice(np.where(r > 0)[0])]
        p_old= p_x(x_old)
        
        # Cholesky decomposition
        L= np.linalg.cholesky(covariance)
        
        while len(samples) < num_to_sample:
            x_new= x_old + np.dot(np.random.normal(size= len(x_old)), L)
            p_new= p_x(x_new)
            
            alpha= p_new/p_old
            u= np.random.random()
            if u < alpha:
                x_old= x_new
                p_old= p_new
            else:
                pass
            
            it= it + 1
            if it % periods == 0 and it > burn_in:
                samples.append(x_old)
            
        return np.vstack([X, np.vstack(samples)]), np.hstack([y, np.repeat(self.minority_label, len(samples))])
        
    def get_params(self):
        """
        Returns:
            dict: the parameters of the current sampling object
        """
        return {'proportion': self.proportion, 
                'k': self.k, 
                'h': self.h, 
                'n_jobs': self.n_jobs}

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄