C++ Multithreading: K-means

C++ Multithreading: K-means

这也是我第一次写并行计算程序,感觉就是好高深 ( ゚∀。)

关于 K-means 算法

K-means 几乎是最直接的聚落分析算法了,它还可以用于解决矢量量化问题。

在本篇文章中,我们研究一个具体的问题:
在平面上给定 (large n) 个点 (large P_1, dots, P_n),按距离远近将这些点分类到 (large k) 个不同的聚落中去。

我们首先随机地在平面上取一些点 (mu_1, dots, mu_k) 作为聚落的中心点,接下来:

  1. 考察对于每一个 (large i),计算 (large k = arg min_j ||p_i-mu_j||)
    于是我们将点 (large p_i) 归类到以 (large mu_k) 为中心点的聚落中去。

  2. 之后考虑更新每个聚落的中心点。具体地,对于每个聚类 (j) 中的点 (large P_{a_1}, dots, P_{a_{Lj}}) 取新的中心点

    [large mu_j = frac{sum_{k=1}^{L_j} P_{a_k} }{L_j} ]

    其中 (L_j) 是该聚类中点的个数。

不断迭代该过程,直到新取的聚类中心点与原点重合为止。

先看看代码:

double CalcuDistance(const Point& a, const Point& b) {
    double deltaX = a.x - b.x, deltaY = a.y - b.y;
    return deltaX * deltaX + deltaY * deltaY;
}

std::mutex mtx;
void runThrough(std::vector<int>& clusterSize,
    std::vector<int>& assignment,
    const std::vector<Point>& m_points,
    const std::vector<Point>& m_centers,
    int lef, int rig, int m_numCenters) {
    std::vector<int> minVec;
    for (int i = lef; i != rig; ++i) {
        int minInd = 0;
        double minDis = 0x7f7f7f7f, curDis;
        for (short j = 0; j != m_numCenters; ++j) {
            curDis = CalcuDistance(m_points[i], m_centers[j]);
            if (curDis < minDis) {
                minInd = j;
                minDis = curDis;
            }
        }
        minVec.push_back(minInd);
    }
    std::lock_guard<std::mutex> lock(mtx);
    for (int i = lef; i != rig; ++i) {
        --clusterSize[assignment[i]];
        assignment[i] = minVec[i - lef];
        ++clusterSize[minVec[i - lef]];
    }
}

std::vector<index_t> Kmeans::Run(int maxIterations) {
    std::vector<index_t> assignment(m_numPoints, 0);
    int currIteration = 0;
    std::cout << "Running kmeans with num points = " << m_numPoints 
              << ", num centers = " << m_numCenters 
              << ", max iterations = " << maxIterations << "...
";

    const double eps = 1e-8;
    double newX[m_numCenters], newY[m_numCenters];
    int interval = m_numPoints / 4;
    std::vector<int> clusterSize(m_numCenters, 0);
    clusterSize[0] = m_numPoints;

    while (currIteration < maxIterations) {
        std::vector<std::thread> thdVec;
        for (int ind = 0; ind < m_numPoints; ind += interval) {
            thdVec.push_back(std::thread(runThrough, 
                std::ref(clusterSize), std::ref(assignment), 
                std::ref(m_points), std::ref(m_centers),
                ind, std::min(m_numPoints, ind + interval), m_numCenters));
        }

        for (auto& thd: thdVec) thd.join();
        for (short i = 0; i != m_numCenters; ++i) {
            newX[i] = newY[i] = 0;
        }
        for (int i = 0; i != m_numPoints; ++i) {
            newX[assignment[i]] += m_points[i].x;
            newY[assignment[i]] += m_points[i].y;
        }
        bool flag = false;
        for (short i = 0; i != m_numCenters; ++i) {
            newX[i] /= clusterSize[i];
            newY[i] /= clusterSize[i];
            Point newCentroid(newX[i], newY[i]);
            if (CalcuDistance(newCentroid, m_centers[i]) > eps) {
                m_centers[i] = newCentroid;
                flag = true;
            }
        }
        if (!flag) break;
        ++currIteration;
    }

    std::cout << "Finished in " << currIteration << " iterations." << std::endl;
    return assignment;
}

关于并发计算

感谢钦霖哥哥提供的海量帮助。

实现 K-means 时,每个样本点 (large P_i) 在运算时不会互相影响,我们可以考虑在几个进程中分别进行运算。

比如可以创建四个进程 ({ m thd}_{1,2,3,4}),用 ( m thd_i) 计算点 ({p_k|k equiv i mod 4})
或可以将点列分成四个连续的区间段,分别交给对应的进程计算。
由于缓存的问题,实践上第二种方法的效率更高。

钦霖哥哥说:

任何不加锁的共享数据,除了只读,基本上都是不安全的
不是很懂并发的话,还是无脑给共享数据加锁吧,

并发想要提高性能,关键就是减少数据共享。
少量共享的小数据、按阶段写回的数据,完全可以复制给每个线程读取计算,然后再一起写回。
数据经过复制私有后,cache 命中率就大大提高了。

(快去上 SICP 吧!结课半年了助教还能热心解答各种问题)

在实践上,稍微改变加锁的顺序就能带来很大的优化:

// code A
void runThrough(/*...*/) {
    for (int i = lef; i != rig; ++i) {
  		/*...*/
        for (short j = 0; j != m_numCenters; ++j) {
           /*...*/
        }
        std::lock_guard<std::mutex> lock(mtx);
        --clusterSize[assignment[i]];
        assignment[i] = minInd;
        ++clusterSize[minInd];
    }
}

// code B
void runThrough(/*...*/) {
    std::vector<int> minVec;
    for (int i = lef; i != rig; ++i) {
		/*...*/
        for (short j = 0; j != m_numCenters; ++j) {
        	/*...*/
        }
        minVec.push_back(minInd);
    }
    std::lock_guard<std::mutex> lock(mtx);
    for (int i = lef; i != rig; ++i) {
        --clusterSize[assignment[i]];
        assignment[i] = minVec[i - lef];
        ++clusterSize[minVec[i - lef]];
    }
}

运行可以发现写法 B 比写法 A 要快一倍多。

钦霖哥哥还说:

连着我上一条消息,比如 让 (t) 个线程把它们各自经手的点先聚成 (k) 个中心 然后让主线程把 (kt) 个中心聚合成 (k) 个中心,这样全程都不用锁了。

由于这个不是一梭子就能实现的、且现在已经够快了,我就没有写(
感兴趣的读者可以尝试。

原文地址:https://www.cnblogs.com/Shimarin/p/14864343.html