<[email protected]>
<http://www.gnu.org/licenses/>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#include <iostream>
#include <vector>
#include <algorithm>
#include "../tools/debug.h"
#include "../tools/lcgrandom.h"
#include "../tools/contest.h"
#include "../tools/stringtools.h"
#include "../tools/jobqueue.h"
#include "../tools/logfloor.h"
#undef DBGX
#define DBGX DBGX_OMP
#include "../sequential/inssort.h"
extern size_t g_smallsort;
namespace bingmann_parallel_sample_sortBTC {
using namespace stringtools;
using namespace jobqueue;
static const bool debug_jobs = false;
static const bool debug_splitter = false;
static const bool debug_bucketsize = false;
static const bool debug_recursion = false;
static const bool debug_splitter_tree = false;
static const bool use_work_sharing = true;
static const size_t MAXPROCS = 64+1; 
static const size_t l2cache = 256*1024;
static const size_t g_smallsort_threshold = 64*1024;
static const size_t g_inssort_threshold = 64;
size_t g_totalsize;             
size_t g_sequential_threshold;  
size_t g_threadnum;             
size_t g_para_ss_steps, g_seq_ss_steps, g_bs_steps;  
typedef uint64_t key_type;
template <size_t numsplitters>
struct TreeBuilder
{
    key_type*       m_splitter;
    key_type*       m_tree;
    unsigned char*  m_lcp_iter;
    key_type*       m_samples;
    TreeBuilder(key_type* splitter, key_type* splitter_tree, unsigned char* splitter_lcp, key_type* samples, size_t samplesize)
        : m_splitter( splitter ),
          m_tree( splitter_tree ),
          m_lcp_iter( splitter_lcp ),
          m_samples( samples )
    {
        key_type sentinel = 0;
        recurse(samples, samples + samplesize, 1, sentinel);
        assert(m_splitter == splitter + numsplitters);
        assert(m_lcp_iter == splitter_lcp + numsplitters);
        splitter_lcp[0] &= 0x80; 
        splitter_lcp[numsplitters] = 0; 
    }
    ptrdiff_t snum(key_type* s) const
    {
        return (ptrdiff_t)(s - m_samples);
    }
    key_type recurse(key_type* lo, key_type* hi, unsigned int treeidx,
                     key_type& rec_prevkey)
    {
        DBG(debug_splitter, "rec_buildtree(" << snum(lo) << "," << snum(hi)
            << ", treeidx=" << treeidx << ")");
        
        key_type* mid = lo + (ptrdiff_t)(hi - lo) / 2;
        DBG(debug_splitter, "tree[" << treeidx << "] = samples[" << snum(mid) << "] = "
            << toHex(*mid));
        key_type mykey = m_tree[treeidx] = *mid;
#if 1
        key_type* midlo = mid;
        while (lo < midlo && *(midlo-1) == mykey) midlo--;
        key_type* midhi = mid;
        while (midhi+1 < hi && *midhi == mykey) midhi++;
        if (midhi - midlo > 1)
            DBG(0, "key range = [" << snum(midlo) << "," << snum(midhi) << ")");
#else
        key_type *midlo = mid, *midhi = mid+1;
#endif
        if (2 * treeidx < numsplitters)
        {
            key_type prevkey = recurse(lo, midlo, 2 * treeidx + 0, rec_prevkey);
            key_type xorSplit = prevkey ^ mykey;
            DBG(debug_splitter, "    lcp: " << toHex(prevkey) << " XOR " << toHex(mykey) << " = "
                << toHex(xorSplit) << " - " << count_high_zero_bits(xorSplit) << " bits = "
                << count_high_zero_bits(xorSplit) / 8 << " chars lcp");
            *m_splitter++ = mykey;
            *m_lcp_iter++ = (count_high_zero_bits(xorSplit) / 8)
                | ((mykey & 0xFF) ? 0 : 0x80); 
            return recurse(midhi, hi, 2 * treeidx + 1, mykey);
        }
        else
        {
            key_type xorSplit = rec_prevkey ^ mykey;
            DBG(debug_splitter, "    lcp: " << toHex(rec_prevkey) << " XOR " << toHex(mykey) << " = "
                << toHex(xorSplit) << " - " << count_high_zero_bits(xorSplit) << " bits = "
                << count_high_zero_bits(xorSplit) / 8 << " chars lcp");
            *m_splitter++ = mykey;
            *m_lcp_iter++ = (count_high_zero_bits(xorSplit) / 8)
                | ((mykey & 0xFF) ? 0 : 0x80); 
            return mykey;
        }
    }
};
struct ClassifySimple
{
    
    static inline unsigned int
    find_bkt_tree(const key_type& key, const key_type* splitter, const key_type* splitter_tree, size_t numsplitters)
    {
        
        unsigned int i = 1;
        while ( i <= numsplitters )
        {
#if 0
            
            i = 2 * i + (key <= splitter_tree[i] ? 0 : 1);
#else
            
            if (key <= splitter_tree[i])
                i = 2*i + 0;
            else 
                i = 2*i + 1;
#endif
        }
        i -= numsplitters+1;
        size_t b = i * 2;                                   
        if (i < numsplitters && splitter[i] == key) b += 1; 
        return b;
    }
    
    static inline void
    classify(string* strB, string* strE, uint16_t* bktout,
             const key_type* splitter, const key_type* splitter_tree, size_t numsplitters,
             size_t , size_t depth)
    {
        for (string* str = strB; str != strE; )
        {
            key_type key = get_char<key_type>(*str++, depth);
            unsigned int b = find_bkt_tree(key, splitter, splitter_tree, numsplitters);
            *bktout++ = b;
        }
    }
};
struct ClassifyUnrollTree
{
    
    __attribute__((optimize("unroll-all-loops"))) static inline unsigned int
    find_bkt_tree(const key_type& key, const key_type* splitter, const key_type* splitter_tree, const size_t treebits, const size_t numsplitters)
    {
        
        unsigned int i = 1;
        for (size_t l = 0; l < treebits; ++l)
        {
#if 0
            
            i = 2 * i + (key <= splitter_tree[i] ? 0 : 1);
#else
            
            if (key <= splitter_tree[i])
                i = 2*i + 0;
            else 
                i = 2*i + 1;
#endif
        }
        i -= numsplitters+1;
        size_t b = i * 2;                                   
        if (i < numsplitters && splitter[i] == key) b += 1; 
        return b;
    }
    
    static inline void
    classify(string* strB, string* strE, uint16_t* bktout,
             const key_type* splitter, const key_type* splitter_tree, const size_t numsplitters,
             const size_t treebits, size_t depth)
    {
        for (string* str = strB; str != strE; )
        {
            key_type key = get_char<key_type>(*str++, depth);
            unsigned int b = find_bkt_tree(key, splitter, splitter_tree, treebits, numsplitters);
            *bktout++ = b;
        }
    }
};
struct ClassifyUnrollBoth
{
    
    template <int U>
    __attribute__((optimize("unroll-all-loops"))) static inline void
    find_bkt_tree_unroll(const key_type key[U], const key_type* splitter, const key_type* splitter_tree,
                         const size_t treebits, const size_t numsplitters, uint16_t obkt[U])
    {
        
        unsigned int i[U];
        std::fill(i+0, i+U, 1);
        for (size_t l = 0; l < treebits; ++l)
        {
            for (int u = 0; u < U; ++u)
            {
#if 0
                
                i[u] = 2 * i[u] + (key[u] <= splitter_tree[i[u]] ? 0 : 1);
#else
                
                if (key[u] <= splitter_tree[i[u]])
                    i[u] = 2*i[u] + 0;
                else 
                    i[u] = 2*i[u] + 1;
#endif
            }
        }
        for (int u = 0; u < U; ++u)
            i[u] -= numsplitters+1;
        for (int u = 0; u < U; ++u)
            obkt[u] = i[u] * 2; 
        for (int u = 0; u < U; ++u)
        {
            if (i[u] < numsplitters && splitter[i[u]] == key[u]) obkt[u] += 1; 
        }
    }
    
    static inline void
    classify(string* strB, string* strE, uint16_t* bktout,
             const key_type* splitter, const key_type* splitter_tree, size_t numsplitters,
             const size_t treebits, size_t depth)
    {
        for (string* str = strB; str != strE; )
        {
            static const int rollout = 4;
            if (str + rollout < strE)
            {
                key_type key[rollout];
                key[0] = get_char<key_type>(str[0], depth);
                key[1] = get_char<key_type>(str[1], depth);
                key[2] = get_char<key_type>(str[2], depth);
                key[3] = get_char<key_type>(str[3], depth);
                find_bkt_tree_unroll<rollout>(key, splitter, splitter_tree, treebits, numsplitters, bktout);
                str += rollout;
                bktout += rollout;
            }
            else
            {
                
                key_type key = get_char<key_type>(*str++, depth);
                unsigned int b = ClassifySimple::find_bkt_tree(key, splitter, splitter_tree, numsplitters);
                *bktout++ = b;
            }
        }
    }
};
template <typename Classify>
void EnqueueSmallsortJobBTC(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth);
template <typename Classify, typename BktSizeType>
struct SmallsortJobBTC : public Job
{
    StringPtr   strptr;
    size_t      n, depth;
    typedef BktSizeType bktsize_type;
    SmallsortJobBTC(JobQueue& jobqueue, const StringPtr& _strptr, size_t _n, size_t _depth)
        : strptr(_strptr), n(_n), depth(_depth)
    {
        jobqueue.enqueue(this);
    }
    struct SeqSampleSortStep
    {
#if 0
        static const size_t numsplitters = 2*16;
#else
        
        
        
        static const size_t numsplitters2 = (l2cache - sizeof(size_t)) / (2 * sizeof(size_t));
        static const size_t treebits = logfloor_<numsplitters2>::value;
        static const size_t numsplitters = (1 << treebits) - 1;
#endif
        static const size_t bktnum = 2 * numsplitters + 1;
        string* str;
        size_t idx;
        size_t depth;
        bktsize_type bktsize[bktnum];
        unsigned char splitter_lcp[numsplitters+1];
        SeqSampleSortStep(string* strings, size_t n, size_t depth, uint16_t* bktcache)
        {
            
            const size_t oversample_factor = 2;
            const size_t samplesize = oversample_factor * numsplitters;
            key_type samples[ samplesize ];
            LCGRandom rng(&strings);
            for (unsigned int i = 0; i < samplesize; ++i)
            {
                samples[i] = get_char<key_type>(strings[ rng() % n ], depth);
            }
            std::sort(samples, samples + samplesize);
            key_type splitter[numsplitters];
            key_type splitter_tree[numsplitters+1];
            TreeBuilder<numsplitters>(splitter, splitter_tree, splitter_lcp, samples, samplesize);
            
            Classify::classify(strings, strings+n, bktcache,
                               splitter, splitter_tree, numsplitters,
                               treebits, depth);
            
            memset(bktsize, 0, bktnum * sizeof(bktsize_type));
            for (size_t si = 0; si < n; ++si)
                ++bktsize[ bktcache[si] ];
            
            bktsize_type bktindex[bktnum];
            bktindex[0] = bktsize[0];
            bktsize_type last_bkt_size = bktsize[0];
            for (unsigned int i=1; i < bktnum; ++i) {
                bktindex[i] = bktindex[i-1] + bktsize[i];
                if (bktsize[i]) last_bkt_size = bktsize[i];
            }
            assert(bktindex[bktnum-1] == n);
            
            for (size_t i=0, j; i < n - last_bkt_size; )
            {
                string perm = strings[i];
                uint16_t permbkt = bktcache[i];
                while ( (j = --bktindex[ permbkt ]) > i )
                {
                    std::swap(perm, strings[j]);
                    std::swap(permbkt, bktcache[j]);
                }
                strings[i] = perm;
                i += bktsize[ permbkt ];
            }
            str = strings;
            idx = 0;
            this->depth = depth;
            ++g_seq_ss_steps;
        }
    };
    
    uint16_t* bktcache;
    size_t bktcache_size;
    size_t ss_pop_front;
    std::vector<SeqSampleSortStep> ss_stack;
    virtual void run(JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Process SmallsortJobBTC " << this << " of size " << n);
        bktcache = NULL;
        bktcache_size = 0;
        ss_pop_front = 0;
        ms_pop_front = 0;
        if (n >= g_smallsort_threshold)
        {
            bktcache = new uint16_t[n];
            bktcache_size = n * sizeof(uint16_t);
            sort_sample_sort(jobqueue);
        }
        else
        {
            sort_mkqs_cache(jobqueue, strptr.to_original(n), n, depth);
        }
        delete [] bktcache;
    }
    void sort_sample_sort(JobQueue& jobqueue)
    {
        string* strings = strptr.to_original(n);
        typedef SeqSampleSortStep Step;
        
        ss_pop_front = 0;
        ss_stack.push_back( Step(strings,n,depth,bktcache) );
        
        while ( ss_stack.size() > ss_pop_front )
        {
            while ( ss_stack.back().idx < Step::bktnum )
            {
                Step& s = ss_stack.back();
                size_t i = s.idx++; 
                
                if ((i & 1) == 0)
                {
                    if (s.bktsize[i] == 0)
                        ;
                    else if (s.bktsize[i] < g_smallsort_threshold)
                    {
                        assert(i/2 <= Step::numsplitters);
                        if (i == Step::bktnum-1)
                            DBG(debug_recursion, "Recurse[" << s.depth << "]: > bkt " << i << " size " << s.bktsize[i] << " no lcp");
                        else
                            DBG(debug_recursion, "Recurse[" << s.depth << "]: < bkt " << i << " size " << s.bktsize[i] << " lcp " << int(s.splitter_lcp[i/2] & 0x7F));
                        s.str += s.bktsize[i];
                        sort_mkqs_cache(jobqueue, s.str - s.bktsize[i], s.bktsize[i], s.depth + (s.splitter_lcp[i/2] & 0x7F));
                    }
                    else
                    {
                        if (i == Step::bktnum-1)
                            DBG(debug_recursion, "Recurse[" << s.depth << "]: > bkt " << i << " size " << s.bktsize[i] << " no lcp");
                        else
                            DBG(debug_recursion, "Recurse[" << s.depth << "]: < bkt " << i << " size " << s.bktsize[i] << " lcp " << int(s.splitter_lcp[i/2] & 0x7F));
                        s.str += s.bktsize[i];
                        ss_stack.push_back( Step(s.str - s.bktsize[i], s.bktsize[i], s.depth + (s.splitter_lcp[i/2] & 0x7F), bktcache) );
                        
                    }
                }
                
                else
                {
                    if (s.bktsize[i] == 0)
                        ;
                    else if ( s.splitter_lcp[i/2] & 0x80 ) { 
                        DBG(debug_recursion, "Recurse[" << s.depth << "]: = bkt " << i << " size " << s.bktsize[i] << " is done!");
                        s.str += s.bktsize[i];
                    }
                    else if (s.bktsize[i] < g_smallsort_threshold)
                    {
                        DBG(debug_recursion, "Recurse[" << s.depth << "]: = bkt " << i << " size " << s.bktsize[i] << " lcp keydepth!");
                        s.str += s.bktsize[i];
                        sort_mkqs_cache(jobqueue, s.str - s.bktsize[i], s.bktsize[i], s.depth + sizeof(key_type));
                    }
                    else
                    {
                        DBG(debug_recursion, "Recurse[" << s.depth << "]: = bkt " << i << " size " << s.bktsize[i] << " lcp keydepth!");
                        s.str += s.bktsize[i];
                        ss_stack.push_back( Step(s.str - s.bktsize[i], s.bktsize[i], s.depth + sizeof(key_type), bktcache) );
                        
                    }
                }
                if (use_work_sharing && jobqueue.has_idle())
                    sample_sort_free_work(jobqueue);
            }
            ss_stack.pop_back();
        }
    }
    void sample_sort_free_work(JobQueue& jobqueue)
    {
        assert(ss_stack.size() >= ss_pop_front);
        if (ss_stack.size() == ss_pop_front) {
            
            
            return mkqs_free_work(jobqueue);
        }
        
        DBG(debug_jobs, "Freeing top level of SmallsortJobBTC's sample_sort stack");
        typedef SeqSampleSortStep Step;
        Step& s = ss_stack[ss_pop_front];
        while ( s.idx < Step::bktnum )
        {
            size_t i = s.idx++; 
            
            if ((i & 1) == 0)
            {
                if (s.bktsize[i] == 0)
                    ;
                else
                {
                    if (i == Step::bktnum-1)
                        DBG(debug_recursion, "Recurse[" << s.depth << "]: > bkt " << i << " size " << s.bktsize[i] << " no lcp");
                    else
                        DBG(debug_recursion, "Recurse[" << s.depth << "]: < bkt " << i << " size " << s.bktsize[i] << " lcp " << int(s.splitter_lcp[i/2] & 0x7F));
                    EnqueueSmallsortJobBTC<Classify>( jobqueue, StringPtr(s.str), s.bktsize[i], s.depth + (s.splitter_lcp[i/2] & 0x7F) );
                }
                s.str += s.bktsize[i];
            }
            
            else
            {
                if (s.bktsize[i] == 0)
                    ;
                else if ( s.splitter_lcp[i/2] & 0x80 ) { 
                    DBG(debug_recursion, "Recurse[" << s.depth << "]: = bkt " << i << " size " << s.bktsize[i] << " is done!");
                }
                else
                {
                    DBG(debug_recursion, "Recurse[" << s.depth << "]: = bkt " << i << " size " << s.bktsize[i] << " lcp keydepth!");
                    EnqueueSmallsortJobBTC<Classify>( jobqueue, StringPtr(s.str), s.bktsize[i], s.depth + sizeof(key_type) );
                }
                s.str += s.bktsize[i];
            }
        }
        
        ++ss_pop_front;
    }
#if 0
    
    
    struct RadixStep8_CI
    {
        string* str;
        size_t idx;
        bktsize_type bktsize[256];
        RadixStep8_CI(string* strings, size_t n, size_t depth, uint8_t* charcache)
        {
            
#if 1
            
            
            for (size_t i=0; i < n; ++i)
                charcache[i] = strings[i][depth];
            memset(bktsize, 0, sizeof(bktsize));
            for (size_t i=0; i < n; ++i)
                ++bktsize[ charcache[i] ];
#else
            memset(bktsize, 0, sizeof(bktsize));
            for (size_t i=0; i < n; ++i) {
                ++bktsize[ (charcache[i] = strings[i][depth]) ];
            }
#endif
            
            size_t bkt[256];
            bkt[0] = bktsize[0];
            size_t last_bkt_size = bktsize[0];
            for (unsigned int i=1; i < 256; ++i) {
                bkt[i] = bkt[i-1] + bktsize[i];
                if (bktsize[i]) last_bkt_size = bktsize[i];
            }
            
            for (size_t i=0, j; i < n - last_bkt_size; )
            {
                string perm = strings[i];
                uint8_t permch = charcache[i];
                while ( (j = --bkt[ permch ]) > i )
                {
                    std::swap(perm, strings[j]);
                    std::swap(permch, charcache[j]);
                }
                strings[i] = perm;
                i += bktsize[ permch ];
            }
            str = strings + bktsize[0];
            idx = 0; 
            ++g_bs_steps;
        }
    };
    size_t rs_pop_front;
    size_t rs_depth; 
    std::vector<RadixStep8_CI> radixstack;
    void sort_radix_sort(JobQueue& jobqueue, string* strings, size_t n, size_t depth)
    {
        uint8_t* charcache = (uint8_t*)bktcache;
        
        rs_pop_front = 0;
        rs_depth = depth;
        radixstack.clear();
        radixstack.push_back( RadixStep8_CI(strings,n,rs_depth,charcache) );
        while ( radixstack.size() > rs_pop_front )
        {
            while ( radixstack.back().idx < 255 )
            {
                RadixStep8_CI& rs = radixstack.back();
                size_t i = ++rs.idx; 
                if (rs.bktsize[i] == 0)
                    continue;
                else if (rs.bktsize[i] < g_inssort_threshold)
                {
                    inssort::inssort(rs.str, rs.bktsize[i], depth + radixstack.size());
                    rs.str += rs.bktsize[i];
                }
                else
                {
                    rs.str += rs.bktsize[i];
                    radixstack.push_back( RadixStep8_CI(rs.str - rs.bktsize[i],
                                                        rs.bktsize[i],
                                                        depth + radixstack.size(),
                                                        charcache) );
                    
                }
                if (use_work_sharing && jobqueue.has_idle())
                    sample_sort_free_work(jobqueue);
            }
            radixstack.pop_back();
        }
        rs_pop_front = 0; 
        radixstack.clear();
    }
    void radix_sort_free_work(JobQueue& jobqueue)
    {
        assert(radixstack.size() >= rs_pop_front);
        if (radixstack.size() == rs_pop_front) {
            return;
        }
        
        DBG(debug_jobs, "Freeing top level of SmallsortJobBTC's radixsort stack");
        RadixStep8_CI& rt = radixstack[rs_pop_front];
        while ( rt.idx < 255 )
        {
            ++rt.idx; 
            if (rt.bktsize[rt.idx] == 0) continue;
            EnqueueSmallsortJobBTC<Classify>(jobqueue, StringPtr(rt.str), rt.bktsize[rt.idx], rs_depth + rs_pop_front);
            rt.str += rt.bktsize[rt.idx];
        }
        
        ++rs_pop_front;
    }
#endif
    
    
    static inline int cmp(const key_type& a, const key_type& b)
    {
        return (a > b) ? 1 : (a < b) ? -1 : 0;
    }
    template <typename Type>
    static inline size_t
    med3(Type* A, size_t i, size_t j, size_t k)
    {
        if (cmp(A[i], A[j]) == 0)                         return i;
        if (cmp(A[k], A[i]) == 0 || cmp(A[k], A[j]) == 0) return k;
        if (cmp(A[i], A[j]) < 0) {
            if (cmp(A[j], A[k]) < 0) return j;
            if (cmp(A[i], A[k]) < 0) return k;
            return i;
        }
        if (cmp(A[j], A[k]) > 0) return j;
        if (cmp(A[i], A[k]) < 0) return i;
        return k;
    }
    
    static inline void
    insertion_sort_cache_block(string* strings, key_type* cache, int n)
    {
        unsigned int pi, pj;
        for (pi = 1; --n > 0; ++pi) {
            string tmps = strings[pi];
            key_type tmpc = cache[pi];
            for (pj = pi; pj > 0; --pj) {
                if (cmp(cache[pj-1], tmpc) <= 0)
                    break;
                strings[pj] = strings[pj-1];
                cache[pj] = cache[pj-1];
            }
            strings[pj] = tmps;
            cache[pj] = tmpc;
        }
    }
    template <bool CacheDirty>
    static inline void
    insertion_sort(string* strings, key_type* cache, int n, size_t depth)
    {
        if (n == 0) return;
        if (CacheDirty) return inssort::inssort(strings, n, depth);
        insertion_sort_cache_block(strings, cache, n);
        size_t start=0, cnt=1;
        for (int i=0; i < n-1; ++i) {
            if (cmp(cache[i], cache[i+1]) == 0) {
                ++cnt;
                continue;
            }
            if (cnt > 1 && cache[start] & 0xFF)
                inssort::inssort(strings + start, cnt, depth + sizeof(key_type));
            cnt = 1;
            start = i+1;
        }
        if (cnt > 1 && cache[start] & 0xFF)
            inssort::inssort(strings+start, cnt, depth + sizeof(key_type));
    }
    struct MKQSStep
    {
        string* strings;
        key_type* cache;
        size_t num_lt, num_eq, num_gt, n, depth;
        size_t idx;
        bool eq_recurse;
        MKQSStep(string* strings, key_type* cache, size_t n, size_t depth, bool CacheDirty)
        {
            if (CacheDirty) {
                for (size_t i=0; i < n; ++i) {
                    cache[i] = get_char<key_type>(strings[i], depth);
                }
            }
            
            
            size_t p = med3(cache,
                            med3(cache, 0,       n/8,     n/4),
                            med3(cache, n/2-n/8, n/2,     n/2+n/8),
                            med3(cache, n-1-n/4, n-1-n/8, n-3)
                );
            std::swap(strings[0], strings[p]);
            std::swap(cache[0], cache[p]);
            key_type pivot = cache[0];
            size_t first   = 1;
            size_t last    = n-1;
            size_t beg_ins = 1;
            size_t end_ins = n-1;
            while (true) {
                while (first <= last) {
                    const int res = cmp(cache[first], pivot);
                    if (res > 0) {
                        break;
                    } else if (res == 0) {
                        std::swap(strings[beg_ins], strings[first]);
                        std::swap(cache[beg_ins], cache[first]);
                        beg_ins++;
                    }
                    ++first;
                }
                while (first <= last) {
                    const int res = cmp(cache[last], pivot);
                    if (res < 0) {
                        break;
                    } else if (res == 0) {
                        std::swap(strings[end_ins], strings[last]);
                        std::swap(cache[end_ins], cache[last]);
                        end_ins--;
                    }
                    --last;
                }
                if (first > last)
                    break;
                std::swap(strings[first], strings[last]);
                std::swap(cache[first], cache[last]);
                ++first;
                --last;
            }
            
            const size_t num_eq_beg = beg_ins;
            const size_t num_eq_end = n-1-end_ins;
            num_eq     = num_eq_beg+num_eq_end;
            num_lt     = first-beg_ins;
            num_gt     = end_ins-last;
            assert(num_lt + num_eq + num_gt == n);
            
            const size_t size1 = std::min(num_eq_beg, num_lt);
            std::swap_ranges(strings, strings+size1, strings+first-size1);
            std::swap_ranges(cache, cache+size1, cache+first-size1);
            
            const size_t size2 = std::min(num_eq_end, num_gt);
            std::swap_ranges(strings+first, strings+first+size2, strings+n-size2);
            std::swap_ranges(cache+first, cache+first+size2, cache+n-size2);
            
            this->strings = strings;
            this->cache = cache;
            this->n = n;
            this->depth = depth;
            this->idx = 0;
            this->eq_recurse = (pivot & 0xFF);
            ++g_bs_steps;
        }
    };
    size_t ms_pop_front;
    size_t ms_depth; 
    std::vector<MKQSStep> ms_stack;
    void sort_mkqs_cache(JobQueue& jobqueue, string* strings, size_t n, size_t depth)
    {
        if (n < g_inssort_threshold)
            return inssort::inssort(strings, n, depth);
        if (bktcache_size < n * sizeof(key_type)) {
            delete [] bktcache;
            bktcache = (uint16_t*)new key_type[n];
            bktcache_size = n * sizeof(key_type);
        }
        key_type* cache = (key_type*)bktcache;
        
        ms_pop_front = 0;
        ms_depth = depth;
        ms_stack.clear();
        ms_stack.push_back( MKQSStep(strings,cache,n,ms_depth,true) );
    jumpout:
        while ( ms_stack.size() > ms_pop_front )
        {
            while ( ms_stack.back().idx < 3 )
            {
                if (use_work_sharing && jobqueue.has_idle()) {
                    sample_sort_free_work(jobqueue);
                    goto jumpout;
                }
                MKQSStep& ms = ms_stack.back();
                ++ms.idx; 
                
                if (ms.idx == 1)
                {
                    if (ms.num_lt == 0)
                        ;
                    else if (ms.num_lt < g_inssort_threshold)
                        insertion_sort<false>(ms.strings, ms.cache, ms.num_lt, ms.depth);
                    else
                        ms_stack.push_back( MKQSStep(ms.strings, ms.cache, ms.num_lt, ms.depth, false) );
                }
                
                else if (ms.idx == 2)
                {
                    if (!ms.eq_recurse || ms.num_eq == 0)
                        ;
                    else if (ms.num_eq < g_inssort_threshold)
                        insertion_sort<true>(ms.strings + ms.num_lt,
                                             ms.cache + ms.num_lt,
                                             ms.num_eq, ms.depth + sizeof(key_type));
                    else
                        ms_stack.push_back( MKQSStep(ms.strings + ms.num_lt,
                                                     ms.cache + ms.num_lt,
                                                     ms.num_eq, ms.depth + sizeof(key_type), true) );
                }
                
                else
                {
                    assert(ms.idx == 3);
                    if (ms.num_gt == 0)
                        ;
                    else if (ms.num_gt < g_inssort_threshold)
                        insertion_sort<false>(ms.strings + ms.num_lt + ms.num_eq,
                                              ms.cache + ms.num_lt + ms.num_eq,
                                              ms.num_gt,
                                              ms.depth);
                    else
                        ms_stack.push_back( MKQSStep(ms.strings + ms.num_lt + ms.num_eq,
                                                     ms.cache + ms.num_lt + ms.num_eq,
                                                     ms.num_gt, ms.depth, false) );
                }
            }
            ms_stack.pop_back();
        }
        ms_pop_front = 0; 
        ms_stack.clear();
    }
    void mkqs_free_work(JobQueue& jobqueue)
    {
        assert(ms_stack.size() >= ms_pop_front);
        if (ms_stack.size() == ms_pop_front) {
            return;
        }
        DBG(debug_jobs, "Freeing top level of SmallsortJobBTC's mkqs stack");
        
        MKQSStep& st = ms_stack[ms_pop_front];
        if (st.idx == 0 && st.num_lt != 0)
        {
            EnqueueSmallsortJobBTC<Classify>(jobqueue, StringPtr(st.strings), st.num_lt, st.depth);
        }
        if (st.idx <= 1 && st.num_eq != 0 && st.eq_recurse)
        {
            EnqueueSmallsortJobBTC<Classify>(jobqueue, StringPtr(st.strings + st.num_lt),
                                             st.num_eq, st.depth + sizeof(key_type));
        }
        if (st.idx <= 2 && st.num_gt != 0)
        {
            EnqueueSmallsortJobBTC<Classify>(jobqueue, StringPtr(st.strings + st.num_lt + st.num_eq),
                                             st.num_gt, st.depth);
        }
        
        ++ms_pop_front;
    }
};
template <typename Classify>
void EnqueueSmallsortJobBTC(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth)
{
    if (n < ((uint64_t)1 << 32))
        new SmallsortJobBTC<Classify,uint32_t>(jobqueue, strptr, n, depth);
    else
        new SmallsortJobBTC<Classify,uint64_t>(jobqueue, strptr, n, depth);
}
template <typename Classify>
struct SampleSortStep
{
#if 0
    static const size_t numsplitters = 2*16;       
#else
    
    static const size_t numsplitters2 = (l2cache - sizeof(size_t)) / (2 * sizeof(size_t));
    static const size_t treebits = logfloor_<numsplitters2>::value;
    static const size_t numsplitters = (1 << treebits) - 1;
#endif
    static const size_t bktnum = 2 * numsplitters + 1;
    StringPtr           strptr;
    size_t              n, depth;
    unsigned int        parts;
    size_t              psize;
    std::atomic<unsigned int> pwork;
    key_type            splitter[numsplitters];
    key_type            splitter_tree[numsplitters+1];
    unsigned char       splitter_lcp[numsplitters+1];
    size_t*             bkt;
    uint16_t*           bktcache[MAXPROCS];
    
    struct SampleJob : public Job
    {
        SampleSortStep*     step;
        SampleJob(SampleSortStep* _step)
            : step(_step) { }
        virtual void run(JobQueue& jobqueue)
        {
            step->sample(jobqueue);
        }
    };
    struct CountJob : public Job
    {
        SampleSortStep*     step;
        unsigned int        p;
        CountJob(SampleSortStep* _step, unsigned int _p)
            : step(_step), p(_p) { }
        virtual void run(JobQueue& jobqueue)
        {
            step->count(p, jobqueue);
        }
    };
    struct DistributeJob : public Job
    {
        SampleSortStep*     step;
        unsigned int        p;
        DistributeJob(SampleSortStep* _step, unsigned int _p)
            : step(_step), p(_p) { }
        virtual void run(JobQueue& jobqueue)
        {
            step->distribute(p, jobqueue);
        }
    };
    
    SampleSortStep(JobQueue& jobqueue, const StringPtr& _strptr, size_t _n, size_t _depth)
        : strptr(_strptr), n(_n), depth(_depth)
    {
        parts = (n + g_sequential_threshold-1) / g_sequential_threshold;
        if (parts == 0) parts = 1;
        psize = (n + parts-1) / parts;
        jobqueue.enqueue( new SampleJob(this) );
        ++g_para_ss_steps;
    }
    
    void sample(JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Process SampleJob @ " << this);
        const size_t oversample_factor = 2;
        size_t samplesize = oversample_factor * numsplitters;
        string* strings = strptr.active();
        key_type samples[ samplesize ];
        LCGRandom rng(&samples);
        for (unsigned int i = 0; i < samplesize; ++i)
        {
            samples[i] = get_char<key_type>(strings[ rng() % n ], depth);
        }
        std::sort(samples, samples + samplesize);
        TreeBuilder<numsplitters>(splitter, splitter_tree, splitter_lcp, samples, samplesize);
        bkt = new size_t[bktnum * parts + 1];
        
        pwork = parts;
        for (unsigned int p = 0; p < parts; ++p)
            jobqueue.enqueue( new CountJob(this, p) );
    }
    
    void count(unsigned int p, JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Process CountJob " << p << " @ " << this);
        string* strB = strptr.active() + p * psize;
        string* strE = strptr.active() + std::min((p+1) * psize, n);
        if (strE < strB) strE = strB;
        uint16_t* mybktcache = bktcache[p] = new uint16_t[strE-strB];
        uint16_t* bktout = mybktcache;
        Classify::classify(strB, strE, bktout,
                           splitter, splitter_tree, numsplitters,
                           treebits, depth);
        size_t* mybkt = bkt + p * bktnum;
        memset(mybkt, 0, bktnum * sizeof(size_t));
        for (uint16_t* bc = mybktcache; bc != mybktcache + (strE-strB); ++bc)
            ++mybkt[ *bc ];
        if (--pwork == 0)
            count_finished(jobqueue);
    }
    void count_finished(JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Finishing CountJob " << this << " with prefixsum");
        
        size_t sum = 0;
        for (unsigned int i = 0; i < bktnum; ++i)
        {
            for (unsigned int p = 0; p < parts; ++p)
            {
                bkt[p * bktnum + i] = (sum += bkt[p * bktnum + i]);
            }
        }
        assert(sum == n);
        
        pwork = parts;
        for (unsigned int p = 0; p < parts; ++p)
            jobqueue.enqueue( new DistributeJob(this, p) );
    }
    
    void distribute(unsigned int p, JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Process DistributeJob " << p << " @ " << this);
        string* strB = strptr.active() + p * psize;
        string* strE = strptr.active() + std::min((p+1) * psize, n);
        if (strE < strB) strE = strB;
        string* sorted = strptr.shadow(); 
        uint16_t* mybktcache = bktcache[p];
        size_t mybkt[bktnum]; 
        memcpy(mybkt, bkt + p * bktnum, sizeof(mybkt));
        for (string* str = strB; str != strE; ++str, ++mybktcache)
            sorted[ --mybkt[ *mybktcache ] ] = *str;
        if (p == 0) 
            memcpy(bkt, mybkt, sizeof(mybkt));
        delete [] bktcache[p];
        if (--pwork == 0)
            distribute_finished(jobqueue);
    }
    void distribute_finished(JobQueue& jobqueue)
    {
        DBG(debug_jobs, "Finishing DistributeJob " << this << " with enqueuing subjobs");
        
        assert(bkt[0] == 0);
        bkt[bktnum] = n;
        size_t i = 0;
        while (i < bktnum-1)
        {
            
            size_t bktsize = bkt[i+1] - bkt[i];
            if (bktsize == 0)
                ;
            else if (bktsize == 1) 
                strptr.flip_ptr(bkt[i]).to_original(1);
            else
            {
                DBG(debug_recursion, "Recurse[" << depth << "]: < bkt " << bkt[i] << " size " << bktsize << " lcp " << int(splitter_lcp[i/2] & 0x7F));
                Enqueue(jobqueue, strptr.flip_ptr(bkt[i]), bktsize, depth + (splitter_lcp[i/2] & 0x7F));
            }
            ++i;
            
            bktsize = bkt[i+1] - bkt[i];
            if (bktsize == 0)
                ;
            else if (bktsize == 1) 
                strptr.flip_ptr(bkt[i]).to_original(1);
            else
            {
                if ( splitter_lcp[i/2] & 0x80 ) { 
                    
                    DBG(debug_recursion, "Recurse[" << depth << "]: = bkt " << bkt[i] << " size " << bktsize << " is done!");
                    strptr.flip_ptr(bkt[i]).to_original(bktsize);
                }
                else {
                    DBG(debug_recursion, "Recurse[" << depth << "]: = bkt " << bkt[i] << " size " << bktsize << " lcp keydepth!");
                    Enqueue(jobqueue, strptr.flip_ptr(bkt[i]), bktsize, depth + sizeof(key_type));
                }
            }
            ++i;
        }
        size_t bktsize = bkt[i+1] - bkt[i];
        if (bktsize == 0)
            ;
        else if (bktsize == 1) 
            strptr.flip_ptr(bkt[i]).to_original(1);
        else
        {
            DBG(debug_recursion, "Recurse[" << depth << "]: > bkt " << bkt[i] << " size " << bktsize << " no lcp");
            Enqueue(jobqueue, strptr.flip_ptr(bkt[i]), bktsize, depth);
        }
        delete [] bkt;
        delete this;
    }
    
    static void Enqueue(JobQueue& jobqueue, const StringPtr& strptr, size_t n, size_t depth)
    {
        if (n > g_sequential_threshold) {
            new SampleSortStep(jobqueue, strptr, n, depth);
        }
        else {
            EnqueueSmallsortJobBTC<Classify>(jobqueue, strptr, n, depth);
        }
   }
    static inline void put_stats()
    {
        g_statscache >> "l2cache" << size_t(l2cache)
                     >> "splitter_treebits" << size_t(treebits)
                     >> "numsplitters" << size_t(numsplitters);
    }
};
void parallel_sample_sortBTC(string* strings, size_t n)
{
    g_totalsize = n;
    g_threadnum = omp_get_max_threads();
    g_sequential_threshold = std::max(g_inssort_threshold, g_totalsize / g_threadnum);
    g_para_ss_steps = g_seq_ss_steps = g_bs_steps = 0;
    SampleSortStep<ClassifySimple>::put_stats();
    string* shadow = new string[n]; 
    JobQueue jobqueue;
    SampleSortStep<ClassifySimple>::Enqueue(jobqueue, StringPtr(strings, shadow), n, 0);
    jobqueue.loop();
    delete [] shadow;
    g_statscache >> "steps_para_sample_sort" << g_para_ss_steps
                 >> "steps_seq_sample_sort" << g_seq_ss_steps
                 >> "steps_base_sort" << g_bs_steps;
}
CONTESTANT_REGISTER_PARALLEL(parallel_sample_sortBTC,
                             "bingmann/parallel_sample_sortBTC",
                             "bingmann/parallel_sample_sortBTC: binary tree, bktcache")
void parallel_sample_sortBTCU1(string* strings, size_t n)
{
    g_totalsize = n;
    g_threadnum = omp_get_max_threads();
    g_sequential_threshold = std::max(g_inssort_threshold, g_totalsize / g_threadnum);
    g_para_ss_steps = g_seq_ss_steps = g_bs_steps = 0;
    SampleSortStep<ClassifyUnrollTree>::put_stats();
    string* shadow = new string[n]; 
    JobQueue jobqueue;
    SampleSortStep<ClassifyUnrollTree>::Enqueue(jobqueue, StringPtr(strings, shadow), n, 0);
    jobqueue.loop();
    delete [] shadow;
    g_statscache >> "steps_para_sample_sort" << g_para_ss_steps
                 >> "steps_seq_sample_sort" << g_seq_ss_steps
                 >> "steps_base_sort" << g_bs_steps;
}
CONTESTANT_REGISTER_PARALLEL(parallel_sample_sortBTCU1,
                             "bingmann/parallel_sample_sortBTCU1",
                             "bingmann/parallel_sample_sortBTCU1: binary tree, bktcache, unroll tree")
void parallel_sample_sortBTCU2(string* strings, size_t n)
{
    g_totalsize = n;
    g_threadnum = omp_get_max_threads();
    g_sequential_threshold = std::max(g_inssort_threshold, g_totalsize / g_threadnum);
    g_para_ss_steps = g_seq_ss_steps = g_bs_steps = 0;
    SampleSortStep<ClassifyUnrollBoth>::put_stats();
    string* shadow = new string[n]; 
    JobQueue jobqueue;
    SampleSortStep<ClassifyUnrollBoth>::Enqueue(jobqueue, StringPtr(strings, shadow), n, 0);
    jobqueue.loop();
    delete [] shadow;
    g_statscache >> "steps_para_sample_sort" << g_para_ss_steps
                 >> "steps_seq_sample_sort" << g_seq_ss_steps
                 >> "steps_base_sort" << g_bs_steps;
}
CONTESTANT_REGISTER_PARALLEL(parallel_sample_sortBTCU2,
                             "bingmann/parallel_sample_sortBTCU2",
                             "bingmann/parallel_sample_sortBTCU2: binary tree, bktcache, unroll tree and strings")
}