<[email protected]>
<http://www.gnu.org/licenses/>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <vector>
#include "../tools/debug.h"
#include "../tools/contest.h"
#include "../tools/stringtools.h"
#include "../tools/jobqueue.h"
#undef DBGX
#define DBGX DBGX_OMP
#include "../sequential/inssort.h"
extern size_t g_smallsort;
namespace bingmann_parallel_radix_sort {
using namespace stringtools;
using namespace jobqueue;
static const bool debug_jobs = false;
static const bool use_work_sharing = true;
typedef unsigned char* string;
static const size_t g_inssort_threshold = 64;
size_t g_totalsize;             
size_t g_sequential_threshold;  
size_t g_threadnum;             
template <typename bigsort_key_type>
void Enqueue(JobQueue& jobqueue, const StringPtr& strings, size_t n, size_t depth);
void EnqueueSmallsortJob8(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth);
template <typename BktSizeType>
struct SmallsortJob8 : public Job
{
    StringPtr   strptr;
    size_t      n, depth;
    typedef BktSizeType bktsize_type;
    SmallsortJob8(JobQueue& jobqueue, const StringPtr& _strptr, size_t _n, size_t _depth)
        : strptr(_strptr), n(_n), depth(_depth)
    {
        jobqueue.enqueue(this);
    }
    struct RadixStep8_CI
    {
        string* str;
        size_t idx;
        bktsize_type bktsize[256];
        RadixStep8_CI(string* strings, size_t n, size_t depth, uint8_t* charcache)
        {
            
#if 1
            
            
            for (size_t i=0; i < n; ++i)
                charcache[i] = strings[i][depth];
            memset(bktsize, 0, sizeof(bktsize));
            for (size_t i=0; i < n; ++i)
                ++bktsize[ charcache[i] ];
#else
            memset(bktsize, 0, sizeof(bktsize));
            for (size_t i=0; i < n; ++i) {
                ++bktsize[ (charcache[i] = strings[i][depth]) ];
            }
#endif
            
            bktsize_type bkt[256];
            bkt[0] = bktsize[0];
            bktsize_type last_bkt_size = bktsize[0];
            for (unsigned int i=1; i < 256; ++i) {
                bkt[i] = bkt[i-1] + bktsize[i];
                if (bktsize[i]) last_bkt_size = bktsize[i];
            }
            
            for (size_t i=0, j; i < n - last_bkt_size; )
            {
                string perm = strings[i];
                uint8_t permch = charcache[i];
                while ( (j = --bkt[ permch ]) > i )
                {
                    std::swap(perm, strings[j]);
                    std::swap(permch, charcache[j]);
                }
                strings[i] = perm;
                i += bktsize[ permch ];
            }
            str = strings + bktsize[0];
            idx = 0; 
        }
    };
    virtual void run(JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Process SmallsortJob8 " << this << " of size " << n);
        string* strings = strptr.to_original(n);
        if (n < g_inssort_threshold)
            return inssort::inssort(strings,n,depth);
        uint8_t* charcache = new uint8_t[n];
        
        size_t pop_front = 0;
        std::vector<RadixStep8_CI> radixstack;
        radixstack.push_back( RadixStep8_CI(strings,n,depth,charcache) );
        while ( radixstack.size() > pop_front )
        {
            while ( radixstack.back().idx < 255 )
            {
                RadixStep8_CI& rs = radixstack.back();
                ++rs.idx; 
                if (rs.bktsize[rs.idx] == 0)
                    continue;
                else if (rs.bktsize[rs.idx] < g_inssort_threshold)
                {
                    inssort::inssort(rs.str, rs.bktsize[rs.idx], depth + radixstack.size());
                    rs.str += rs.bktsize[ rs.idx ];
                }
                else
                {
                    rs.str += rs.bktsize[rs.idx];
                    radixstack.push_back( RadixStep8_CI(rs.str - rs.bktsize[rs.idx],
                                                        rs.bktsize[rs.idx],
                                                        depth + radixstack.size(),
                                                        charcache) );
                    
                }
                if (use_work_sharing && jobqueue.has_idle())
                {
                    
                    DBG(debug_jobs, "Freeing top level of SmallsortJob8's radixsort stack");
                    RadixStep8_CI& rt = radixstack[pop_front];
                    while ( rt.idx < 255 )
                    {
                        ++rt.idx; 
                        if (rt.bktsize[rt.idx] == 0) continue;
                        EnqueueSmallsortJob8(jobqueue, StringPtr(rt.str), rt.bktsize[rt.idx], depth + pop_front);
                        rt.str += rt.bktsize[rt.idx];
                    }
                    
                    ++pop_front;
                }
            }
            radixstack.pop_back();
        }
        delete [] charcache;
    }
};
void EnqueueSmallsortJob8(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth)
{
    if (n < ((uint64_t)1 << 32))
        new SmallsortJob8<uint32_t>(jobqueue, strptr, n, depth);
    else
        new SmallsortJob8<uint64_t>(jobqueue, strptr, n, depth);
}
void EnqueueSmallsortJob16(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth);
template <typename BktSizeType>
struct SmallsortJob16 : public Job
{
    StringPtr   strptr;
    size_t      n, depth;
    typedef BktSizeType bktsize_type;
    SmallsortJob16(JobQueue& jobqueue, const StringPtr& _strptr, size_t _n, size_t _depth)
        : strptr(_strptr), n(_n), depth(_depth)
    {
        jobqueue.enqueue(this);
    }
    struct RadixStep16_CI
    {
        typedef uint16_t key_type;
        static const size_t numbkts = key_traits<key_type>::radix;
        string* str;
        size_t idx;
        bktsize_type bktsize[numbkts];
        RadixStep16_CI(string* strings, size_t n, size_t depth, key_type* charcache)
        {
            
            for (size_t i=0; i < n; ++i)
                charcache[i] = get_char<key_type>(strings[i], depth);
            
            memset(bktsize, 0, sizeof(bktsize));
            for (size_t i=0; i < n; ++i)
                ++bktsize[ charcache[i] ];
            
            bktsize_type bkt[numbkts];
            bkt[0] = bktsize[0];
            bktsize_type last_bkt_size = bktsize[0];
            for (unsigned int i=1; i < numbkts; ++i) {
                bkt[i] = bkt[i-1] + bktsize[i];
                if (bktsize[i]) last_bkt_size = bktsize[i];
            }
            
            for (size_t i=0, j; i < n - last_bkt_size; )
            {
                string perm = strings[i];
                key_type permch = charcache[i];
                while ( (j = --bkt[ permch ]) > i )
                {
                    std::swap(perm, strings[j]);
                    std::swap(permch, charcache[j]);
                }
                strings[i] = perm;
                i += bktsize[ permch ];
            }
            idx = 0x0100; 
            str = strings + bkt[idx+1];
        }
    };
    virtual void run(JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Process SmallsortJob16 " << this << " of size " << n);
        string* strings = strptr.to_original(n);
        if (n < g_inssort_threshold)
            return inssort::inssort(strings,n,depth);
        typedef uint16_t key_type;
        static const size_t numbkts = key_traits<key_type>::radix;
        key_type* charcache = new key_type[n];
        
        size_t pop_front = 0;
        std::vector<RadixStep16_CI> radixstack;
        radixstack.push_back( RadixStep16_CI(strings,n,depth,charcache) );
        while ( radixstack.size() > pop_front )
        {
            while ( radixstack.back().idx < numbkts-1 )
            {
                RadixStep16_CI& rs = radixstack.back();
                ++rs.idx; 
                if ( (rs.idx & 0xFF) == 0 ) { 
                    rs.str += rs.bktsize[rs.idx];
                    ++rs.idx;
                }
                if (rs.bktsize[rs.idx] == 0)
                    continue;
                else if (rs.bktsize[rs.idx] < g_inssort_threshold)
                {
                    inssort::inssort(rs.str, rs.bktsize[rs.idx], depth + 2*radixstack.size());
                    rs.str += rs.bktsize[rs.idx];
                }
                else
                {
                    rs.str += rs.bktsize[rs.idx];
                    radixstack.push_back( RadixStep16_CI(rs.str - rs.bktsize[rs.idx],
                                                         rs.bktsize[rs.idx],
                                                         depth + 2*radixstack.size(),
                                                         charcache) );
                    
                }
                if (use_work_sharing && jobqueue.has_idle())
                {
                    
                    DBG(debug_jobs, "Freeing top level of SmallsortJob16's radixsort stack");
                    RadixStep16_CI& rt = radixstack[pop_front];
                    while ( rt.idx < numbkts-1 )
                    {
                        ++rt.idx; 
                        if (rt.bktsize[rt.idx] == 0) continue;
                        EnqueueSmallsortJob16(jobqueue, StringPtr(rt.str), rt.bktsize[rt.idx], depth + 2*pop_front);
                        rt.str += rt.bktsize[rt.idx];
                    }
                    
                    ++pop_front;
                }
            }
            radixstack.pop_back();
        }
        delete [] charcache;
    }
};
void EnqueueSmallsortJob16(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth)
{
    if (n < ((uint64_t)1 << 32))
        new SmallsortJob16<uint32_t>(jobqueue, strptr, n, depth);
    else
        new SmallsortJob16<uint64_t>(jobqueue, strptr, n, depth);
}
template <typename KeyType>
struct RadixStepCE
{
    typedef KeyType key_type;
    static const size_t numbkts = key_traits<key_type>::radix; 
    StringPtr           strptr;
    size_t              n, depth;
    unsigned int        parts;
    size_t              psize;
    std::atomic<unsigned int> pwork;
    size_t*             bkt;
    key_type*           charcache;
    RadixStepCE(JobQueue& jobqueue, const StringPtr& _strptr, size_t _n, size_t _depth);
    void count(unsigned int p, JobQueue& jobqueue);
    void count_finished(JobQueue& jobqueue);
    void distribute(unsigned int p, JobQueue& jobqueue);
    void distribute_finished(JobQueue& jobqueue);
};
template <typename key_type>
struct CountJob : public Job
{
    RadixStepCE<key_type>*      step;
    unsigned int                p;
    CountJob(RadixStepCE<key_type>* _step, unsigned int _p)
        : step(_step), p(_p) { }
    virtual void run(JobQueue& jobqueue)
    {
        step->count(p, jobqueue);
    }
};
template <typename key_type>
struct DistributeJob : public Job
{
    RadixStepCE<key_type>*      step;
    unsigned int                p;
    DistributeJob(RadixStepCE<key_type>* _step, unsigned int _p)
        : step(_step), p(_p) { }
    virtual void run(JobQueue& jobqueue)
    {
        step->distribute(p, jobqueue);
    }
};
template <typename key_type>
RadixStepCE<key_type>::RadixStepCE(JobQueue& jobqueue, const StringPtr& _strptr, size_t _n, size_t _depth)
    : strptr(_strptr), n(_n), depth(_depth)
{
    parts = (n + g_sequential_threshold-1) / g_sequential_threshold;
    if (parts == 0) parts = 1;
    psize = (n + parts-1) / parts;
    DBG(debug_jobs, "Area split into " << parts << " parts of size " << psize);
    bkt = new size_t[numbkts * parts + 1];
    charcache = new key_type[n];
    
    pwork = parts;
    for (unsigned int p = 0; p < parts; ++p)
        jobqueue.enqueue( new CountJob<key_type>(this, p) );
}
template <typename key_type>
void RadixStepCE<key_type>::count(unsigned int p, JobQueue& jobqueue)
{
    DBG(debug_jobs, "Process CountJob " << p << " @ " << this);
    string* strB = strptr.active() + p * psize;
    string* strE = strptr.active() + std::min((p+1) * psize, n);
    if (strE < strB) strE = strB;
    key_type* mycache = charcache + p * psize;
    key_type* mycacheE = mycache + (strE - strB);
    
    
    
#if 0
    for (string* str = strB; str != strE; ++str, ++mycache)
        *mycache = get_char<key_type>(*str, depth);
    size_t* mybkt = bkt + p * numbkts;
    memset(mybkt, 0, numbkts * sizeof(size_t));
    for (mycache = charcache + p * psize; mycache != mycacheE; ++mycache)
        ++mybkt[ *mycache ];
#else
    for (string* str = strB; str != strE; ++str, ++mycache)
        *mycache = get_char<key_type>(*str, depth);
    size_t mybkt[numbkts] = { 0 };
    for (mycache = charcache + p * psize; mycache != mycacheE; ++mycache)
        ++mybkt[ *mycache ];
    memcpy(bkt + p * numbkts, mybkt, sizeof(mybkt));
#endif
    if (--pwork == 0)
        count_finished(jobqueue);
}
template <typename key_type>
void RadixStepCE<key_type>::count_finished(JobQueue& jobqueue)
{
    DBG(debug_jobs, "Finishing CountJob " << this << " with prefixsum");
    
    size_t sum = 0;
    for (unsigned int i = 0; i < numbkts; ++i)
    {
        for (unsigned int p = 0; p < parts; ++p)
        {
            bkt[p * numbkts + i] = (sum += bkt[p * numbkts + i]);
        }
    }
    assert(sum == n);
    
    pwork = parts;
    for (unsigned int p = 0; p < parts; ++p)
        jobqueue.enqueue( new DistributeJob<key_type>(this, p) );
}
template <typename key_type>
void RadixStepCE<key_type>::distribute(unsigned int p, JobQueue& jobqueue)
{
    DBG(debug_jobs, "Process DistributeJob " << p << " @ " << this);
    string* strB = strptr.active() + p * psize;
    string* strE = strptr.active() + std::min((p+1) * psize, n);
    if (strE < strB) strE = strB;
    string* sorted = strptr.shadow(); 
    key_type* mycache = charcache + p * psize;
    
    
    
#if 0
    size_t* mybkt = bkt + p * numbkts;
    for (string* str = strB; str != strE; ++str, ++mycache)
        sorted[ --mybkt[ *mycache ] ] = *str;
#else
    size_t mybkt[numbkts];
    memcpy(mybkt, bkt + p * numbkts, sizeof(mybkt));
    for (string* str = strB; str != strE; ++str, ++mycache)
        sorted[ --mybkt[ *mycache ] ] = *str;
    if (p == 0) 
        memcpy(bkt, mybkt, sizeof(mybkt));
#endif
    if (--pwork == 0)
        distribute_finished(jobqueue);
}
template <>
void RadixStepCE<uint8_t>::distribute_finished(JobQueue& jobqueue)
{
    DBG(debug_jobs, "Finishing DistributeJob " << this << " with enqueuing subjobs");
    delete [] charcache;
    
    assert(bkt[0] == 0);
    bkt[numbkts] = n;
    for (unsigned int i = 1; i < numbkts; ++i)
    {
        if (bkt[i] == bkt[i+1])
            continue;
        else if (bkt[i] + 1 == bkt[i+1]) 
            strptr.flip_ptr(bkt[i]).to_original(1);
        else
            Enqueue<key_type>(jobqueue, strptr.flip_ptr(bkt[i]), bkt[i+1] - bkt[i],
                              depth + key_traits<key_type>::add_depth);
    }
    
    strptr.flip_ptr(0).to_original(bkt[1] - bkt[0]);
    delete [] bkt;
    delete this;
}
template <>
void RadixStepCE<uint16_t>::distribute_finished(JobQueue& jobqueue)
{
    DBG(debug_jobs, "Finishing DistributeJob " << this << " with enqueuing subjobs");
    delete [] charcache;
    
    assert(bkt[0] == 0);
    bkt[numbkts] = n;
    for (unsigned int i = 0x0101; i < numbkts; ++i)
    {
        if ((i & 0xFF) == 0) ++i; 
        if (bkt[i] == bkt[i+1])
            continue;
        else if (bkt[i] + 1 == bkt[i+1]) 
            strptr.flip_ptr(bkt[i]).to_original(1);
        else
            Enqueue<key_type>(jobqueue, strptr.flip_ptr(bkt[i]), bkt[i+1] - bkt[i],
                              depth + key_traits<key_type>::add_depth);
    }
    
    if (!strptr.flipped()) 
    {
        strptr.flip_ptr(0).to_original(bkt[0x0100] - bkt[0]);
        for (unsigned int i = 0x0100; i < numbkts; i += 0x0100)
        {
            if (bkt[i] == bkt[i+1]) continue;
            strptr.flip_ptr(bkt[i]).to_original(bkt[i+1] - bkt[i]);
        }
    }
    delete [] bkt;
    delete this;
}
void EnqueueSmall(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth)
{
    EnqueueSmallsortJob8(jobqueue, strptr, n, depth);
}
template <typename bigsort_key_type>
void Enqueue(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth)
{
    if (n > g_sequential_threshold)
        new RadixStepCE<bigsort_key_type>(jobqueue, strptr, n, depth);
    else
        EnqueueSmallsortJob8(jobqueue, strptr, n, depth);
}
static void parallel_radix_sort_8bit(string* strings, size_t n)
{
    g_totalsize = n;
    g_threadnum = omp_get_max_threads();
    g_sequential_threshold = std::max(g_inssort_threshold, g_totalsize / g_threadnum);
    string* shadow = new string[n]; 
    JobQueue jobqueue;
    Enqueue<uint8_t>(jobqueue, StringPtr(strings, shadow), n, 0);
    jobqueue.loop();
    delete [] shadow;
}
CONTESTANT_REGISTER_PARALLEL(parallel_radix_sort_8bit,
                             "bingmann/parallel_radix_sort_8bit",
                             "Parallel MSD Radix sort with load balancing, 8-bit BigSorts")
static void parallel_radix_sort_16bit(string* strings, size_t n)
{
    g_totalsize = n;
    g_threadnum = omp_get_max_threads();
    g_sequential_threshold = std::max(g_inssort_threshold, g_totalsize / g_threadnum);
    string* shadow = new string[n]; 
    JobQueue jobqueue;
    Enqueue<uint16_t>(jobqueue, StringPtr(strings, shadow), n, 0);
    jobqueue.loop();
    delete [] shadow;
}
CONTESTANT_REGISTER_PARALLEL(parallel_radix_sort_16bit,
                             "bingmann/parallel_radix_sort_16bit",
                             "Parallel MSD Radix sort with load balancing, 16-bit BigSorts")
}