Stxxl  1.4.0
include/stxxl/bits/algo/stable_ksort.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/stable_ksort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00008  *
00009  *  Distributed under the Boost Software License, Version 1.0.
00010  *  (See accompanying file LICENSE_1_0.txt or copy at
00011  *  http://www.boost.org/LICENSE_1_0.txt)
00012  **************************************************************************/
00013 
00014 #ifndef STXXL_STABLE_KSORT_HEADER
00015 #define STXXL_STABLE_KSORT_HEADER
00016 
00017 // it is a first try: distribution sort without sampling
00018 // I rework the stable_ksort when I would have a time
00019 
00020 
00021 #include <stxxl/bits/mng/mng.h>
00022 #include <stxxl/bits/mng/buf_istream.h>
00023 #include <stxxl/bits/mng/buf_ostream.h>
00024 #include <stxxl/bits/common/simple_vector.h>
00025 #include <stxxl/bits/algo/intksort.h>
00026 #include <stxxl/bits/algo/sort_base.h>
00027 #include <stxxl/bits/common/utils.h>
00028 
00029 
00030 #ifndef STXXL_VERBOSE_STABLE_KSORT
00031 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1
00032 #endif
00033 
00034 
00035 __STXXL_BEGIN_NAMESPACE
00036 
00037 //! \addtogroup stlalgo
00038 //! \{
00039 
00040 /*! \internal
00041  */
00042 namespace stable_ksort_local
00043 {
00044     template <class type_, class type_key>
00045     void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift)
00046     {
00047         for (type_ * p = begin; p < end; p++, out++) // count & create references
00048         {
00049             out->ptr = p;
00050             typename type_::key_type key = p->key();
00051             int_type ibucket = (key - offset) >> shift;
00052             out->key = key;
00053             bucket[ibucket]++;
00054         }
00055     }
00056 
00057     template <typename type>
00058     struct type_key
00059     {
00060         typedef typename type::key_type key_type;
00061         key_type key;
00062         type * ptr;
00063 
00064         type_key() { }
00065         type_key(key_type k, type * p) : key(k), ptr(p)
00066         { }
00067     };
00068 
00069     template <typename type>
00070     bool operator < (const type_key<type> & a, const type_key<type> & b)
00071     {
00072         return a.key < b.key;
00073     }
00074 
00075     template <typename type>
00076     bool operator > (const type_key<type> & a, const type_key<type> & b)
00077     {
00078         return a.key > b.key;
00079     }
00080 
00081 
00082     template <typename BIDType_, typename AllocStrategy_>
00083     class bid_sequence
00084     {
00085     public:
00086         typedef BIDType_ bid_type;
00087         typedef bid_type & reference;
00088         typedef AllocStrategy_ alloc_strategy;
00089         typedef typename simple_vector<bid_type>::size_type size_type;
00090         typedef typename simple_vector<bid_type>::iterator iterator;
00091 
00092     protected:
00093         simple_vector<bid_type> * bids;
00094         alloc_strategy alloc_strategy_;
00095 
00096     public:
00097         bid_sequence() { }
00098         bid_sequence(size_type size_)
00099         {
00100             bids = new simple_vector<bid_type>(size_);
00101             block_manager * mng = block_manager::get_instance();
00102             mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00103         }
00104         void init(size_type size_)
00105         {
00106             bids = new simple_vector<bid_type>(size_);
00107             block_manager * mng = block_manager::get_instance();
00108             mng->new_blocks(alloc_strategy_, bids->begin(), bids->end());
00109         }
00110         reference operator [] (size_type i)
00111         {
00112             size_type size_ = size();             // cache size in a register
00113             if (i < size_)
00114                 return *(bids->begin() + i);
00115 
00116             block_manager * mng = block_manager::get_instance();
00117             simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2);
00118             std::copy(bids->begin(), bids->end(), larger_bids->begin());
00119             mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end());
00120             delete bids;
00121             bids = larger_bids;
00122             return *(larger_bids->begin() + i);
00123         }
00124         size_type size() { return bids->size(); }
00125         iterator begin() { return bids->begin(); }
00126         ~bid_sequence()
00127         {
00128             block_manager::get_instance()->delete_blocks(bids->begin(), bids->end());
00129             delete bids;
00130         }
00131     };
00132 
00133     template <typename ExtIterator_>
00134     void distribute(
00135         bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type,
00136                      typename ExtIterator_::vector_type::alloc_strategy_type> * bucket_bids,
00137         int64 * bucket_sizes,
00138         const int_type nbuckets,
00139         const int_type lognbuckets,
00140         ExtIterator_ first,
00141         ExtIterator_ last,
00142         const int_type nread_buffers,
00143         const int_type nwrite_buffers)
00144     {
00145         typedef typename ExtIterator_::vector_type::value_type value_type;
00146         typedef typename value_type::key_type key_type;
00147         typedef typename ExtIterator_::block_type block_type;
00148         typedef typename block_type::bid_type bid_type;
00149         typedef buf_istream<typename ExtIterator_::block_type,
00150                             typename ExtIterator_::bids_container_iterator> buf_istream_type;
00151 
00152         int_type i = 0;
00153 
00154         buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0),
00155                             nread_buffers);
00156 
00157         buffered_writer<block_type> out(
00158             nbuckets + nwrite_buffers,
00159             nwrite_buffers);
00160 
00161         unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets];
00162         unsigned_type * bucket_iblock = new unsigned_type[nbuckets];
00163         block_type ** bucket_blocks = new block_type *[nbuckets];
00164 
00165         std::fill(bucket_sizes, bucket_sizes + nbuckets, 0);
00166         std::fill(bucket_iblock, bucket_iblock + nbuckets, 0);
00167         std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0);
00168 
00169         for (i = 0; i < nbuckets; i++)
00170             bucket_blocks[i] = out.get_free_block();
00171 
00172 
00173         ExtIterator_ cur = first - first.block_offset();
00174 
00175         // skip part of the block before first untouched
00176         for ( ; cur != first; cur++)
00177             ++in;
00178 
00179 
00180         const int_type shift = sizeof(key_type) * 8 - lognbuckets;
00181         // search in the the range [_begin,_end)
00182         STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets);
00183         for ( ; cur != last; cur++)
00184         {
00185             key_type cur_key = in.current().key();
00186             int_type ibucket = cur_key >> shift;
00187 
00188             int_type block_offset = bucket_block_offsets[ibucket];
00189             in >> (bucket_blocks[ibucket]->elem[block_offset++]);
00190             if (block_offset == block_type::size)
00191             {
00192                 block_offset = 0;
00193                 int_type iblock = bucket_iblock[ibucket]++;
00194                 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]);
00195             }
00196             bucket_block_offsets[ibucket] = block_offset;
00197         }
00198         for (i = 0; i < nbuckets; i++)
00199         {
00200             if (bucket_block_offsets[i])
00201             {
00202                 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]);
00203             }
00204             bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] +
00205                               bucket_block_offsets[i];
00206             STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] <<
00207                                        ", estimated size: " << ((last - first) / int64(nbuckets)));
00208         }
00209 
00210         delete[] bucket_blocks;
00211         delete[] bucket_block_offsets;
00212         delete[] bucket_iblock;
00213     }
00214 }
00215 
00216 //! \brief Sort records with integer keys
00217 //! \param first object of model of \c ext_random_access_iterator concept
00218 //! \param last object of model of \c ext_random_access_iterator concept
00219 //! \param M amount of memory for internal use (in bytes)
00220 //! \remark Elements must provide a method key() which returns the integer key.
00221 //! \remark Not yet fully implemented, it assumes that the keys are uniformly
00222 //! distributed between [0,(std::numeric_limits<key_type>::max)().
00223 template <typename ExtIterator_>
00224 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M)
00225 {
00226     STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]");
00227     typedef typename ExtIterator_::vector_type::value_type value_type;
00228     typedef typename value_type::key_type key_type;
00229     typedef typename ExtIterator_::block_type block_type;
00230     typedef typename block_type::bid_type bid_type;
00231     typedef typename ExtIterator_::vector_type::alloc_strategy_type alloc_strategy;
00232     typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type;
00233     typedef stable_ksort_local::type_key<value_type> type_key_;
00234 
00235     first.flush();     // flush container
00236 
00237     double begin = timestamp();
00238 
00239     unsigned_type i = 0;
00240     config * cfg = config::get_instance();
00241     const unsigned_type m = M / block_type::raw_size;
00242     assert(2 * block_type::raw_size <= M);
00243     const unsigned_type write_buffers_multiple = 2;
00244     const unsigned_type read_buffers_multiple = 2;
00245     const unsigned_type ndisks = cfg->disks_number();
00246     const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks;
00247     const unsigned_type nmaxbuckets = m - min_num_read_write_buffers;
00248     const unsigned_type lognbuckets = log2_floor(nmaxbuckets);
00249     const unsigned_type nbuckets = 1 << lognbuckets;
00250     const unsigned_type est_bucket_size = div_ceil((last - first) / nbuckets, block_type::size);      //in blocks
00251 
00252     if (m < min_num_read_write_buffers + 2 || nbuckets < 2) {
00253         STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m <<
00254                      ", required for r/w buffers: " << min_num_read_write_buffers <<
00255                      ", required for buckets: 2, nbuckets: " << nbuckets);
00256         throw bad_parameter("stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
00257     }
00258     STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first));
00259     STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets);
00260     const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00261     const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple);
00262 
00263     STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers);
00264     STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers);
00265 
00266     bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets];
00267     for (i = 0; i < nbuckets; ++i)
00268         bucket_bids[i].init(est_bucket_size);
00269 
00270     int64 * bucket_sizes = new int64[nbuckets];
00271 
00272     disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00273 
00274     stable_ksort_local::distribute(
00275         bucket_bids,
00276         bucket_sizes,
00277         nbuckets,
00278         lognbuckets,
00279         first,
00280         last,
00281         nread_buffers,
00282         nwrite_buffers);
00283 
00284     double dist_end = timestamp(), end;
00285     double io_wait_after_d = stats::get_instance()->get_io_wait_time();
00286 
00287     {
00288         // sort buckets
00289         unsigned_type write_buffers_multiple_bs = 2;
00290         unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks
00291         int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size;        // in number of records
00292         int64 max_bucket_size_act = 0;                                                   // actual max bucket size
00293         // establish output stream
00294 
00295         for (i = 0; i < nbuckets; i++)
00296         {
00297             max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act);
00298             if (bucket_sizes[i] > max_bucket_size_rec)
00299             {
00300                 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] <<
00301                              " records, maximum: " << max_bucket_size_rec);
00302                 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting.");
00303                 abort();
00304             }
00305         }
00306         // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i])
00307         // ... and decrease max_bucket_size_bl
00308         const int_type max_bucket_size_act_bl = div_ceil(max_bucket_size_act, block_type::size);
00309         STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " <<
00310                                    max_bucket_size_bl << " to " << max_bucket_size_act_bl);
00311         max_bucket_size_rec = max_bucket_size_act;
00312         max_bucket_size_bl = max_bucket_size_act_bl;
00313         const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl;
00314         STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs);
00315 
00316         typedef buf_ostream<block_type, typename ExtIterator_::bids_container_iterator> buf_ostream_type;
00317         buf_ostream_type out(first.bid(), nwrite_buffers_bs);
00318 
00319         disk_queues::get_instance()->set_priority_op(request_queue::READ);
00320 
00321         if (first.block_offset())
00322         {
00323             // has to skip part of the first block
00324             block_type * block = new block_type;
00325             request_ptr req;
00326             req = block->read(*first.bid());
00327             req->wait();
00328 
00329             for (i = 0; i < first.block_offset(); i++)
00330             {
00331                 out << block->elem[i];
00332             }
00333             delete block;
00334         }
00335         block_type * blocks1 = new block_type[max_bucket_size_bl];
00336         block_type * blocks2 = new block_type[max_bucket_size_bl];
00337         request_ptr * reqs1 = new request_ptr[max_bucket_size_bl];
00338         request_ptr * reqs2 = new request_ptr[max_bucket_size_bl];
00339         type_key_ * refs1 = new type_key_[max_bucket_size_rec];
00340         type_key_ * refs2 = new type_key_[max_bucket_size_rec];
00341 
00342         // submit reading first 2 buckets (Peter's scheme)
00343         unsigned_type nbucket_blocks = div_ceil(bucket_sizes[0], block_type::size);
00344         for (i = 0; i < nbucket_blocks; i++)
00345             reqs1[i] = blocks1[i].read(bucket_bids[0][i]);
00346 
00347 
00348         nbucket_blocks = div_ceil(bucket_sizes[1], block_type::size);
00349         for (i = 0; i < nbucket_blocks; i++)
00350             reqs2[i] = blocks2[i].read(bucket_bids[1][i]);
00351 
00352 
00353         key_type offset = 0;
00354         const unsigned log_k1 = STXXL_MAX<unsigned>(log2_ceil(max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE), 1);
00355         unsigned_type k1 = 1 << log_k1;
00356         int_type * bucket1 = new int_type[k1];
00357 
00358         const unsigned shift = sizeof(key_type) * 8 - lognbuckets;
00359         const unsigned shift1 = shift - log_k1;
00360 
00361         STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec <<
00362                                    " block size:" << block_type::size << " log_k1:" << log_k1);
00363 
00364         for (unsigned_type k = 0; k < nbuckets; k++)
00365         {
00366             nbucket_blocks = div_ceil(bucket_sizes[k], block_type::size);
00367             const unsigned log_k1_k = STXXL_MAX<unsigned>(log2_ceil(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE), 1);
00368             assert(log_k1_k <= log_k1);
00369             k1 = 1 << log_k1_k;
00370             std::fill(bucket1, bucket1 + k1, 0);
00371 
00372             STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] <<
00373                                        " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k);
00374             // classify first nbucket_blocks-1 blocks, they are full
00375             type_key_ * ref_ptr = refs1;
00376             key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k);
00377             for (i = 0; i < nbucket_blocks - 1; i++)
00378             {
00379                 reqs1[i]->wait();
00380                 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 /*,k1*/);
00381             }
00382             // last block might be non-full
00383             const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size;
00384             reqs1[i]->wait();
00385 
00386             //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size);
00387 
00388             classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1);
00389 
00390             exclusive_prefix_sum(bucket1, k1);
00391             classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1);
00392 
00393             type_key_ * c = refs2;
00394             type_key_ * d = refs1;
00395             for (i = 0; i < k1; i++)
00396             {
00397                 type_key_ * cEnd = refs2 + bucket1[i];
00398                 type_key_ * dEnd = refs1 + bucket1[i];
00399 
00400                 const unsigned log_k2 = log2_floor(bucket1[i]) - 1;        // adaptive bucket size
00401                 const unsigned_type k2 = 1 << log_k2;
00402                 int_type * bucket2 = new int_type[k2];
00403                 const unsigned shift2 = shift1 - log_k2;
00404 
00405                 // STXXL_MSG("Sorting bucket "<<k<<":"<<i);
00406                 l1sort(c, cEnd, d, bucket2, k2,
00407                        offset1 + (key_type(1) << key_type(shift1)) * key_type(i),
00408                        shift2);
00409 
00410                 // write out all
00411                 for (type_key_ * p = d; p < dEnd; p++)
00412                     out << (*(p->ptr));
00413 
00414 
00415                 delete[] bucket2;
00416                 c = cEnd;
00417                 d = dEnd;
00418             }
00419             // submit next read
00420             const unsigned_type bucket2submit = k + 2;
00421             if (bucket2submit < nbuckets)
00422             {
00423                 nbucket_blocks = div_ceil(bucket_sizes[bucket2submit], block_type::size);
00424                 for (i = 0; i < nbucket_blocks; i++)
00425                     reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]);
00426             }
00427 
00428             std::swap(blocks1, blocks2);
00429             std::swap(reqs1, reqs2);
00430         }
00431 
00432         delete[] bucket1;
00433         delete[] refs1;
00434         delete[] refs2;
00435         delete[] blocks1;
00436         delete[] blocks2;
00437         delete[] reqs1;
00438         delete[] reqs2;
00439         delete[] bucket_bids;
00440         delete[] bucket_sizes;
00441 
00442         if (last.block_offset())
00443         {
00444             // has to skip part of the first block
00445             block_type * block = new block_type;
00446             request_ptr req = block->read(*last.bid());
00447             req->wait();
00448 
00449             for (i = last.block_offset(); i < block_type::size; i++)
00450             {
00451                 out << block->elem[i];
00452             }
00453             delete block;
00454         }
00455 
00456         end = timestamp();
00457     }
00458 
00459     STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Distribution time: " <<
00460                   dist_end - begin << " s");
00461     STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s");
00462     STXXL_VERBOSE(*stats::get_instance());
00463     STXXL_UNUSED(begin + dist_end + end + io_wait_after_d);
00464 }
00465 
00466 //! \}
00467 
00468 __STXXL_END_NAMESPACE
00469 
00470 #endif // !STXXL_STABLE_KSORT_HEADER
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines