“快排”笔记

  关于排序,我想没有多少程序员会感觉陌生,信手拈来的可能就有不少:冒泡、选择或者归并等等;可在实际开发中,排序又让许多程序员感到很不熟悉,因为在大多数情况下,自己实现一个排序算法都不是一个好主意,相反的,改而使用语言框架内建提供的排序功能才是明智之举,毕竟她又方便又高效……

  久而久之,“排序”便与许多程序员成了“最熟悉的陌生人”,成了我们生活中司空见惯的“工具”:想要实现数组排序?简单,一个 sort(array) 搞定,便捷又高效!什么?它内部是怎么完成排序的?这个……不是特别清楚了……不过……管他呢……嘿嘿,不知你是否属于这种情况,反正我是 :)

   这些天自己正在看些书籍,上面便提到了不少排序的算法,本以为自己对这些算法早已深谙于心,没想实际实现之时便窘态毕现,我想大抵可能便如上所述,“娇惯纵容”多了,以前只要简单的调调 sort,而今真刀实枪起来便不胜招架了,也罢,有了些许教训,也算进一步认识到“知其然不知其所以然”的道理,在此简单笔记一番,引以为戒吧 ~

  而“快排”(快速排序)便是这次笔记的主题,话说在各类排序算法中,“快排”应该算是“明星”算法了,因其时间、空间复杂度俱佳,而被广泛运用于实际程序开发中(也许上面那个 sort 便是 :)),网上已有非常多优秀的教程说明(譬如这里这里),在此我也无需过多费舌,主要的内容还是实现源码以及心得体会,仅此而已 :)

  

  快速排序的基本思想便是分而治之,大体上分为三个主要步骤:

  1. 在数据集中选取一个pivot元素

  2. 根据选定的pivot,将数据集划分为左右两部分,左部皆小于pivot,右部皆大于pivot

  3. 循环12两步于上述所划分的两部分数据之上,直到部分只剩下一个数据元素为止

  根据上述的算法步骤,一个典型的快排程序,大抵便是这个样子:

  

/*!
    \param array number array pointer
    \param left left number range([left, right])
    \param right right number range([left, right])
    \return pivot index
*/
int GetPivot(int* array, int left, int right)
{
    // simple pivot get
    return left;
}

/*!
    \param array number array pointer
    \param left left number range([left, right])
    \param right right number range([left, right])
    \return pivot index after partition
*/
int Partition(int* array, int left, int right)
{
    int pivot = GetPivot(array, left, right);
    swap(array[left], array[pivot]);

    // now left have the pivot value
    int val = array[left];
    int lp = left;
    int rp = right + 1;

    // preprocessing for avoiding some boder problems
    do { ++lp; } while (array[lp] <= val && lp < rp);
    do { --rp; } while (array[rp] > val);

    // do loop partition
    while (lp < rp)
    {
        swap(array[lp], array[rp]);
        do { ++lp; } while (array[lp] <= val);
        do { --rp; } while (array[rp] > val);
    }

    // swap back pivot value
    swap(array[left], array[rp]);

    return rp;
}

/*!
    \param array number array pointer
    \param left left number range([left, right])
    \param right right number range([left, right])
*/
void QuickSort(int* array, int left, int right)
{
    if (left < right)
    {
        int pivot = Partition(array, left, right);
        QuickSort(array, left, pivot - 1);
        QuickSort(array, pivot + 1, right);
    }
}

  当然,针对上述的三个步骤还有其他的实现方法,譬如这样:


/*!
    \param array number array pointer
    \param left left number range([left, right])
    \param right right number range([left, right])
    \return pivot index
*/
int GetPivot(int* array, int left, int right)
{
    // random chioce pivot
    int count = right - left + 1;
    return left + rand() % count;
}

/*!
    \param array number array pointer
    \param left left number range([left, right])
    \param right right number range([left, right])
    \return pivot index after partition
*/
int Partition(int* array, int left, int right)
{
    int pivot = GetPivot(array, left, right);
    swap(array[left], array[pivot]);

    // now left have the pivot value
    int val = array[left];
    int searchIndex = left + 1;
    int sortedIndex = searchIndex;

    while (searchIndex <= right)
    {
        if (array[searchIndex] <= val)
        {
            swap(array[sortedIndex], array[searchIndex]);
            ++sortedIndex;
        }
        ++searchIndex;
    }

    // swap back pivot value
    swap(array[left], array[sortedIndex - 1]);

    return sortedIndex - 1;
}

/*!
    \param array number array pointer
    \param left left number range([left, right])
    \param right right number range([left, right])
*/
void QuickSort(int* array, int left, int right)
{
    int pivot = Partition(array, left, right);
    if (pivot - 1 > left)
    {
        QuickSort(array, left, pivot - 1);
    }
    if (pivot + 1 < right)
    {
        QuickSort(array, pivot + 1, right);
    }
}

虽说上面的两种实现细节上有所差异,但是基本都属于递归形式,并且递归形式也是快排算法(或者说对于很多二分(甚至多分)算法)实现的一般方法,有趣的是,上面提到的书籍中也说到了另一种实现快排算法的“循环”方式,颇有趣味:


//! sort range structure
struct SortRange
{
    int l;
    int r;
    SortRange(int l_, int r_):l(l_),r(r_) {};
    SortRange(const SortRange& si):l(si.l),r(si.r) {};
    SortRange& operator = (const SortRange& si)
    {
        l = si.l;
        r = si.r;
        return *this;
    }
};

/*!
    \param array number array pointer
    \param l left number range([left, right])
    \param r right number range([left, right])
*/
void QuickSortIter(int* array, int l, int r)
{
    std::queue<SortRange> q;
    q.push(SortRange(l, r));

    while (!q.empty())
    {
        // get front sort index and pop it
        SortRange si = q.front();
        q.pop();

        int left = si.l;
        int right = si.r;

        if (left < right)
        {
            // do partition
            int pivot = Partition(array, left, right);

            // push back sub sort range
            if (pivot - 1 > left)
            {
                q.push(SortRange(left, pivot - 1));
            }
            if (pivot + 1 < right)
            {
                q.push(SortRange(pivot + 1, right));
            }
        }
    }
}

  代码的思路其实非常简单,大体上便是模拟原先递归实现的调用过程,上面的源码中使用 std::queue 模拟了这个过程,当然,使用 std::stack(或者其他类似结构)同样也可以,两者只在子问题的解决顺序上存在差异,并不影响快速排序的正确性。

  接着,书中又顺势提到了快排的各类并行实现方法,其中最直接的一个想法可能就是延承上面的递归算法,为每一次的Partition操作都生成一个线程,由于各个线程之间所操作的数据基本独立,数据竞争问题并不存在(暂不考虑“伪共享”之类的问题),但是由于后期数据粒度过细(基本上每一个数据项都产生了一个线程),往往导致线程调度的开销过大,而对于一些线程数量受限的平台,以上的方法甚至都无法实现,所以总的来看,简单延伸递归来实现并行的想法并不十分“靠谱”……

  但是如果我们扩展思路,并不通过数据分解,而是通过任务分解来看待快排问题的话,那么快排的并行实现就会变的相对明晰,而这个任务分解,其实就是上面快排“循环”实现的一个延伸:


struct SortParam
{
    int* a;
    int l;
    int r;
    SortParam(int* a_, int l_, int r_):a(a_),l(l_),r(r_) {};
    SortParam(const SortParam& sp):a(sp.a),l(sp.l),r(sp.r) {};
    SortParam& operator = (const SortParam& sp)
    {
        a = sp.a;
        l = sp.l;
        r = sp.r;
        return *this;
    }
};

queue<SortParam> g_queue;
LONG g_count = 0;
bool g_done = false;
HANDLE g_sem = NULL;
HANDLE g_signal = NULL;

//! quick sort thread function, param is array number
unsigned __stdcall QuickSort(LPVOID pArg)
{
    int N = *((int*)pArg);

    while (true)
    {
        WaitForSingleObject(g_sem, INFINITE);
        if (g_done) break;
        SortParam sp = g_queue.front();
        g_queue.pop();

        int* a = sp.a;
        int l = sp.l;
        int r = sp.r;
        if (l < r)
        {
            int p = Partition(a, l, r);
            // NOTE: since every partiton will cause pivot set, so we increment sort count here
            InterlockedIncrement(&g_count); 
            g_queue.push(SortParam(a, l, p - 1));
            g_queue.push(SortParam(a, p + 1, r));
            ReleaseSemaphore(g_sem, 2, NULL);
        }
        else if (l == r) // single element, already sorted
        {
            LONG t = InterlockedIncrement(&g_count);
            if (t == N) SetEvent(g_signal); // all sorted
        }
    }

    return 0;
}

int main()
{
    int array[] = { 5, 4, 3, 2, 1, 7, 8, 9, 13, 0, 0, 2 };
    int arrayNum = sizeof(array) / sizeof(arrayNum);

    g_queue.push(SortParam(array, 0, arrayNum - 1));

    g_sem = CreateSemaphore(NULL, 1, arrayNum, NULL);
    g_signal = CreateEvent(NULL, true, false, NULL);

    const int NUM_THREADS = 4;
    for (int i = 0; i < NUM_THREADS; ++i)
    {
        _beginthreadex(NULL, 0, QuickSort, &arrayNum, 0, NULL);
    }

    WaitForSingleObject(g_signal, INFINITE);

    g_done = true;
    ReleaseSemaphore(g_sem, NUM_THREADS, NULL);

    // NOTE: now we just sleep for a while to wait for threads quit
    // TODO: join all
  Sleep(100);
  
    return 0;
}

  基本思路便是通过信号量来获取可能存在的待排序的数据分段(即任务),并通过一个原子操作控制的计数器(g_count)来检测是否排序完毕,并退出相应排序线程,实际开发时甚至可以引入线程池来优化实现,当然,其中涉及的细节还有不少,有兴趣的朋友可以找来原书来看 :)

  网上同样也有很多优秀的代码范例,虽说其中所列的大部分语言我本人都未曾听说过(稍囧),但是对于那些自己稍有了解的语言,范例所给的源码还是非常不错的,其中的一个Python实现令人印象深刻:

def qsort(L):

return (qsort([y for y in L[1:] if y <  L[0]]) + L[:1] + qsort([y for y in L[1:] if y >= L[0]])) if len(L) > 1 else L


  OKThat's It :)




原文地址:https://www.cnblogs.com/dyllove98/p/3137576.html