Stxxl  1.4.0
include/stxxl/bits/containers/priority_queue.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/containers/priority_queue.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 1999 Peter Sanders <sanders@mpi-sb.mpg.de>
00007  *  Copyright (C) 2003, 2004, 2007 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00008  *  Copyright (C) 2007-2009 Johannes Singler <singler@ira.uka.de>
00009  *  Copyright (C) 2007-2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00010  *
00011  *  Distributed under the Boost Software License, Version 1.0.
00012  *  (See accompanying file LICENSE_1_0.txt or copy at
00013  *  http://www.boost.org/LICENSE_1_0.txt)
00014  **************************************************************************/
00015 
00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER
00017 #define STXXL_PRIORITY_QUEUE_HEADER
00018 
00019 #include <vector>
00020 
00021 #include <stxxl/bits/deprecated.h>
00022 #include <stxxl/bits/mng/typed_block.h>
00023 #include <stxxl/bits/mng/block_alloc.h>
00024 #include <stxxl/bits/mng/read_write_pool.h>
00025 #include <stxxl/bits/mng/prefetch_pool.h>
00026 #include <stxxl/bits/mng/write_pool.h>
00027 #include <stxxl/bits/common/tmeta.h>
00028 #include <stxxl/bits/algo/sort_base.h>
00029 #include <stxxl/bits/parallel.h>
00030 #include <stxxl/bits/common/is_sorted.h>
00031 
00032 #if STXXL_PARALLEL
00033 
00034 #if defined(STXXL_PARALLEL_MODE) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400)
00035 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00036 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00037 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00038 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0
00039 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0
00040 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0
00041 #endif
00042 
00043 // enable/disable parallel merging for certain cases, for performance tuning
00044 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00045 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
00046 #endif
00047 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00048 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
00049 #endif
00050 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00051 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
00052 #endif
00053 
00054 #endif //STXXL_PARALLEL
00055 
00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00057 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences
00058 #else
00059 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1
00060 #endif
00061 
00062 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00063 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences
00064 #else
00065 #define STXXL_PQ_INTERNAL_LOSER_TREE 1
00066 #endif
00067 
00068 #define STXXL_VERBOSE_PQ(msg) STXXL_VERBOSE2("[" << static_cast<void *>(this) << "] priority_queue::" << msg)
00069 
00070 #include <stxxl/bits/containers/pq_helpers.h>
00071 #include <stxxl/bits/containers/pq_mergers.h>
00072 #include <stxxl/bits/containers/pq_ext_merger.h>
00073 #include <stxxl/bits/containers/pq_losertree.h>
00074 
00075 __STXXL_BEGIN_NAMESPACE
00076 
00077 /*
00078    KNBufferSize1 = 32;
00079    KNN = 512; // length of group 1 sequences
00080    KNKMAX = 64;  // maximal arity
00081    LogKNKMAX = 6;  // ceil(log KNKMAX)
00082    KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
00083  */
00084 
00085 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
00086 
00087 template <
00088     class Tp_,
00089     class Cmp_,
00090     unsigned BufferSize1_ = 32,                    // equalize procedure call overheads etc.
00091     unsigned N_ = 512,                             // length of group 1 sequences
00092     unsigned IntKMAX_ = 64,                        // maximal arity for internal mergers
00093     unsigned IntLevels_ = 4,                       // number of internal groups
00094     unsigned BlockSize_ = (2 * 1024 * 1024),       // external block size
00095     unsigned ExtKMAX_ = 64,                        // maximal arity for external mergers
00096     unsigned ExtLevels_ = 2,                       // number of external groups
00097     class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
00098     >
00099 struct priority_queue_config
00100 {
00101     typedef Tp_ value_type;
00102     typedef Cmp_ comparator_type;
00103     typedef AllocStr_ alloc_strategy_type;
00104     enum
00105     {
00106         delete_buffer_size = BufferSize1_,
00107         N = N_,
00108         IntKMAX = IntKMAX_,
00109         num_int_groups = IntLevels_,
00110         num_ext_groups = ExtLevels_,
00111         BlockSize = BlockSize_,
00112         ExtKMAX = ExtKMAX_,
00113         element_size = sizeof(Tp_)
00114     };
00115 };
00116 
00117 __STXXL_END_NAMESPACE
00118 
00119 namespace std
00120 {
00121     template <class BlockType_,
00122               class Cmp_,
00123               unsigned Arity_,
00124               class AllocStr_>
00125     void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a,
00126               stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b)
00127     {
00128         a.swap(b);
00129     }
00130     template <class ValTp_, class Cmp_, unsigned KNKMAX>
00131     void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a,
00132               stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b)
00133     {
00134         a.swap(b);
00135     }
00136 }
00137 
00138 __STXXL_BEGIN_NAMESPACE
00139 
00140 //! \brief External priority queue data structure
00141 template <class Config_>
00142 class priority_queue : private noncopyable
00143 {
00144 public:
00145     typedef Config_ Config;
00146     enum
00147     {
00148         delete_buffer_size = Config::delete_buffer_size,
00149         N = Config::N,
00150         IntKMAX = Config::IntKMAX,
00151         num_int_groups = Config::num_int_groups,
00152         num_ext_groups = Config::num_ext_groups,
00153         total_num_groups = Config::num_int_groups + Config::num_ext_groups,
00154         BlockSize = Config::BlockSize,
00155         ExtKMAX = Config::ExtKMAX
00156     };
00157 
00158     //! \brief The type of object stored in the \b priority_queue
00159     typedef typename Config::value_type value_type;
00160     //! \brief Comparison object
00161     typedef typename Config::comparator_type comparator_type;
00162     typedef typename Config::alloc_strategy_type alloc_strategy_type;
00163     //! \brief An unsigned integral type (64 bit)
00164     typedef stxxl::uint64 size_type;
00165     typedef typed_block<BlockSize, value_type> block_type;
00166     typedef read_write_pool<block_type> pool_type;
00167 
00168 protected:
00169     typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type>
00170     insert_heap_type;
00171 
00172     typedef priority_queue_local::loser_tree<
00173         value_type,
00174         comparator_type,
00175         IntKMAX> int_merger_type;
00176 
00177     typedef priority_queue_local::ext_merger<
00178         block_type,
00179         comparator_type,
00180         ExtKMAX,
00181         alloc_strategy_type> ext_merger_type;
00182 
00183 
00184     int_merger_type int_mergers[num_int_groups];
00185     pool_type * pool;
00186     bool pool_owned;
00187     ext_merger_type ** ext_mergers;
00188 
00189     // one delete buffer for each tree => group buffer
00190     value_type group_buffers[total_num_groups][N + 1];          // tree->group_buffers->delete_buffer (extra space for sentinel)
00191     value_type * group_buffer_current_mins[total_num_groups];   // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
00192 
00193     // overall delete buffer
00194     value_type delete_buffer[delete_buffer_size + 1];
00195     value_type * delete_buffer_current_min;                     // current start of delete_buffer
00196     value_type * delete_buffer_end;                             // end of delete_buffer
00197 
00198     comparator_type cmp;
00199 
00200     // insert buffer
00201     insert_heap_type insert_heap;
00202 
00203     // how many groups are active
00204     unsigned_type num_active_groups;
00205 
00206     // total size not counting insert_heap and delete_buffer
00207     size_type size_;
00208 
00209 private:
00210     void init();
00211 
00212     void refill_delete_buffer();
00213     unsigned_type refill_group_buffer(unsigned_type k);
00214 
00215     unsigned_type make_space_available(unsigned_type level);
00216     void empty_insert_heap();
00217 
00218     value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
00219     unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
00220     unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
00221 
00222 public:
00223     //! \brief Constructs external priority queue object
00224     //! \param pool_ pool of blocks that will be used
00225     //! for data writing and prefetching for the disk<->memory transfers
00226     //! happening in the priority queue. Larger pool size
00227     //! helps to speed up operations.
00228     priority_queue(pool_type & pool_);
00229 
00230     //! \brief Constructs external priority queue object
00231     //! \param p_pool_ pool of blocks that will be used
00232     //! for data prefetching for the disk<->memory transfers
00233     //! happening in the priority queue. Larger pool size
00234     //! helps to speed up operations.
00235     //! \param w_pool_ pool of blocks that will be used
00236     //! for writing data for the memory<->disk transfers
00237     //! happening in the priority queue. Larger pool size
00238     //! helps to speed up operations.
00239     _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_));
00240 
00241     //! \brief Constructs external priority queue object
00242     //! \param p_pool_mem memory (in bytes) for prefetch pool that will be used
00243     //! for data prefetching for the disk<->memory transfers
00244     //! happening in the priority queue. Larger pool size
00245     //! helps to speed up operations.
00246     //! \param w_pool_mem memory (in bytes) for buffered write pool that will be used
00247     //! for writing data for the memory<->disk transfers
00248     //! happening in the priority queue. Larger pool size
00249     //! helps to speed up operations.
00250     priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
00251 
00252 #if 0
00253     //! \brief swap this priority queue with another one
00254     //! Implementation correctness is questionable.
00255     void swap(priority_queue & obj)
00256     {
00257         //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
00258         for (unsigned_type i = 0; i < num_int_groups; ++i)
00259             std::swap(int_mergers[i], obj.int_mergers[i]);
00260 
00261         //std::swap(pool,obj.pool);
00262         //std::swap(pool_owned, obj.pool_owned);
00263         std::swap(ext_mergers, obj.ext_mergers);
00264         for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
00265             for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
00266                 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
00267 
00268         STXXL_STATIC_ASSERT(false);
00269         // Shoot yourself in the foot: group_buffer_current_mins contains pointers into group_buffers ...
00270         // either recompute them or add/subtract (&this->group_buffers[0][0] - &obj->group_buffers[0][0])
00271         swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
00272         swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
00273         std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
00274         std::swap(delete_buffer_end, obj.delete_buffer_end);
00275         std::swap(cmp, obj.cmp);
00276         std::swap(insert_heap, obj.insert_heap);
00277         std::swap(num_active_groups, obj.num_active_groups);
00278         std::swap(size_, obj.size_);
00279     }
00280 #endif
00281 
00282     virtual ~priority_queue();
00283 
00284     //! \brief Returns number of elements contained
00285     //! \return number of elements contained
00286     size_type size() const;
00287 
00288     //! \brief Returns true if queue has no elements
00289     //! \return \b true if queue has no elements, \b false otherwise
00290     bool empty() const { return (size() == 0); }
00291 
00292     //! \brief Returns "largest" element
00293     //!
00294     //! Returns a const reference to the element at the
00295     //! top of the priority_queue. The element at the top is
00296     //! guaranteed to be the largest element in the \b priority queue,
00297     //! as determined by the comparison function \b Config_::comparator_type
00298     //! (the same as the second parameter of PRIORITY_QUEUE_GENERATOR utility
00299     //! class). That is,
00300     //! for every other element \b x in the priority_queue,
00301     //! \b Config_::comparator_type(Q.top(), x) is false.
00302     //! Precondition: \c empty() is false.
00303     const value_type & top() const;
00304 
00305     //! \brief Removes the element at the top
00306     //!
00307     //! Removes the element at the top of the priority_queue, that
00308     //! is, the largest element in the \b priority_queue.
00309     //! Precondition: \c empty() is \b false.
00310     //! Postcondition: \c size() will be decremented by 1.
00311     void pop();
00312 
00313     //! \brief Inserts x into the priority_queue.
00314     //!
00315     //! Inserts x into the priority_queue. Postcondition:
00316     //! \c size() will be incremented by 1.
00317     void push(const value_type & obj);
00318 
00319     //! \brief Returns number of bytes consumed by
00320     //! the \b priority_queue
00321     //! \brief number of bytes consumed by the \b priority_queue from
00322     //! the internal memory not including pools (see the constructor)
00323     unsigned_type mem_cons() const
00324     {
00325         unsigned_type dynam_alloc_mem = 0;
00326         //dynam_alloc_mem += w_pool.mem_cons();
00327         //dynam_alloc_mem += p_pool.mem_cons();
00328         for (int i = 0; i < num_int_groups; ++i)
00329             dynam_alloc_mem += int_mergers[i].mem_cons();
00330 
00331         for (int i = 0; i < num_ext_groups; ++i)
00332             dynam_alloc_mem += ext_mergers[i]->mem_cons();
00333 
00334 
00335         return (sizeof(*this) +
00336                 sizeof(ext_merger_type) * num_ext_groups +
00337                 dynam_alloc_mem);
00338     }
00339 
00340     void dump_sizes() const;
00341     void dump_params() const;
00342 };
00343 
00344 
00345 template <class Config_>
00346 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const
00347 {
00348     return size_ +
00349            insert_heap.size() - 1 +
00350            (delete_buffer_end - delete_buffer_current_min);
00351 }
00352 
00353 
00354 template <class Config_>
00355 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const
00356 {
00357     assert(!insert_heap.empty());
00358 
00359     const typename priority_queue<Config_>::value_type & t = insert_heap.top();
00360     if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
00361         return t;
00362     else
00363         return *delete_buffer_current_min;
00364 }
00365 
00366 template <class Config_>
00367 inline void priority_queue<Config_>::pop()
00368 {
00369     //STXXL_VERBOSE1("priority_queue::pop()");
00370     assert(!insert_heap.empty());
00371 
00372     if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
00373         insert_heap.pop();
00374     else
00375     {
00376         assert(delete_buffer_current_min < delete_buffer_end);
00377         ++delete_buffer_current_min;
00378         if (delete_buffer_current_min == delete_buffer_end)
00379             refill_delete_buffer();
00380     }
00381 }
00382 
00383 template <class Config_>
00384 inline void priority_queue<Config_>::push(const value_type & obj)
00385 {
00386     //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
00387     assert(int_mergers->not_sentinel(obj));
00388     if (insert_heap.size() == N + 1)
00389         empty_insert_heap();
00390 
00391 
00392     assert(!insert_heap.empty());
00393 
00394     insert_heap.push(obj);
00395 }
00396 
00397 
00398 ////////////////////////////////////////////////////////////////
00399 
00400 template <class Config_>
00401 priority_queue<Config_>::priority_queue(pool_type & pool_) :
00402     pool(&pool_),
00403     pool_owned(false),
00404     delete_buffer_end(delete_buffer + delete_buffer_size),
00405     insert_heap(N + 2),
00406     num_active_groups(0), size_(0)
00407 {
00408     STXXL_VERBOSE_PQ("priority_queue(pool)");
00409     init();
00410 }
00411 
00412 // DEPRECATED
00413 template <class Config_>
00414 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) :
00415     pool(new pool_type(p_pool_, w_pool_)),
00416     pool_owned(true),
00417     delete_buffer_end(delete_buffer + delete_buffer_size),
00418     insert_heap(N + 2),
00419     num_active_groups(0), size_(0)
00420 {
00421     STXXL_VERBOSE_PQ("priority_queue(p_pool, w_pool)");
00422     init();
00423 }
00424 
00425 template <class Config_>
00426 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) :
00427     pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
00428     pool_owned(true),
00429     delete_buffer_end(delete_buffer + delete_buffer_size),
00430     insert_heap(N + 2),
00431     num_active_groups(0), size_(0)
00432 {
00433     STXXL_VERBOSE_PQ("priority_queue(pool sizes)");
00434     init();
00435 }
00436 
00437 template <class Config_>
00438 void priority_queue<Config_>::init()
00439 {
00440     assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
00441 
00442     ext_mergers = new ext_merger_type*[num_ext_groups];
00443     for (unsigned_type j = 0; j < num_ext_groups; ++j) {
00444         ext_mergers[j] = new ext_merger_type;
00445         ext_mergers[j]->set_pool(pool);
00446     }
00447 
00448     value_type sentinel = cmp.min_value();
00449     insert_heap.push(sentinel);                                // always keep the sentinel
00450     delete_buffer[delete_buffer_size] = sentinel;              // sentinel
00451     delete_buffer_current_min = delete_buffer_end;             // empty
00452     for (unsigned_type i = 0; i < total_num_groups; i++)
00453     {
00454         group_buffers[i][N] = sentinel;                        // sentinel
00455         group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
00456     }
00457 }
00458 
00459 template <class Config_>
00460 priority_queue<Config_>::~priority_queue()
00461 {
00462     STXXL_VERBOSE_PQ("~priority_queue()");
00463     if (pool_owned)
00464         delete pool;
00465 
00466     for (unsigned_type j = 0; j < num_ext_groups; ++j)
00467         delete ext_mergers[j];
00468     delete[] ext_mergers;
00469 }
00470 
00471 //--------------------- Buffer refilling -------------------------------
00472 
00473 // refill group_buffers[j] and return number of elements found
00474 template <class Config_>
00475 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group)
00476 {
00477     STXXL_VERBOSE_PQ("refill_group_buffer(" << group << ")");
00478 
00479     value_type * target;
00480     unsigned_type length;
00481     size_type group_size = (group < num_int_groups) ?
00482                            int_mergers[group].size() :
00483                            ext_mergers[group - num_int_groups]->size();                        // elements left in segments
00484     unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
00485     if (group_size + left_elements >= size_type(N))
00486     {                                                                                          // buffer will be filled completely
00487         target = group_buffers[group];
00488         length = N - left_elements;
00489     }
00490     else
00491     {
00492         target = group_buffers[group] + N - group_size - left_elements;
00493         length = group_size;
00494     }
00495 
00496     if (length > 0)
00497     {
00498         // shift remaininig elements to front
00499         memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
00500         group_buffer_current_mins[group] = target;
00501 
00502         // fill remaining space from group
00503         if (group < num_int_groups)
00504             int_mergers[group].multi_merge(target + left_elements, length);
00505         else
00506             ext_mergers[group - num_int_groups]->multi_merge(
00507                 target + left_elements,
00508                 target + left_elements + length);
00509     }
00510 
00511     //STXXL_MSG(length + left_elements);
00512     //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
00513 #if STXXL_CHECK_ORDER_IN_SORTS
00514     priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00515     if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
00516     {
00517         STXXL_VERBOSE_PQ("refill_grp... length: " << length << " left_elements: " << left_elements);
00518         for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
00519         {
00520             if (inv_cmp(*v, *(v - 1)))
00521             {
00522                 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << "   " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
00523             }
00524         }
00525         assert(false);
00526     }
00527 #endif
00528 
00529     return length + left_elements;
00530 }
00531 
00532 
00533 template <class Config_>
00534 void priority_queue<Config_>::refill_delete_buffer()
00535 {
00536     STXXL_VERBOSE_PQ("refill_delete_buffer()");
00537 
00538     size_type total_group_size = 0;
00539     //num_active_groups is <= 4
00540     for (int i = num_active_groups - 1; i >= 0; i--)
00541     {
00542         if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
00543         {
00544             unsigned_type length = refill_group_buffer(i);
00545             // max active level dry now?
00546             if (length == 0 && unsigned(i) == num_active_groups - 1)
00547                 --num_active_groups;
00548 
00549             total_group_size += length;
00550         }
00551         else
00552             total_group_size += delete_buffer_size;  // actually only a sufficient lower bound
00553     }
00554 
00555     unsigned_type length;
00556     if (total_group_size >= delete_buffer_size)      // buffer can be filled completely
00557     {
00558         length = delete_buffer_size;                 // amount to be copied
00559         size_ -= size_type(delete_buffer_size);      // amount left in group_buffers
00560     }
00561     else
00562     {
00563         length = total_group_size;
00564         assert(size_ == size_type(length)); // trees and group_buffers get empty
00565         size_ = 0;
00566     }
00567 
00568     priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00569 
00570     // now call simplified refill routines
00571     // which can make the assumption that
00572     // they find all they are asked in the buffers
00573     delete_buffer_current_min = delete_buffer_end - length;
00574     STXXL_VERBOSE_PQ("refill_del... Active groups = " << num_active_groups);
00575     switch (num_active_groups)
00576     {
00577     case 0:
00578         break;
00579     case 1:
00580         std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
00581         group_buffer_current_mins[0] += length;
00582         break;
00583     case 2:
00584 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00585         {
00586             std::pair<value_type *, value_type *> seqs[2] =
00587             {
00588                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00589                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
00590             };
00591             parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00592 
00593             group_buffer_current_mins[0] = seqs[0].first;
00594             group_buffer_current_mins[1] = seqs[1].first;
00595         }
00596 #else
00597         priority_queue_local::merge_iterator(
00598             group_buffer_current_mins[0],
00599             group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
00600 #endif
00601         break;
00602     case 3:
00603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00604         {
00605             std::pair<value_type *, value_type *> seqs[3] =
00606             {
00607                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00608                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00609                 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
00610             };
00611             parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00612 
00613             group_buffer_current_mins[0] = seqs[0].first;
00614             group_buffer_current_mins[1] = seqs[1].first;
00615             group_buffer_current_mins[2] = seqs[2].first;
00616         }
00617 #else
00618         priority_queue_local::merge3_iterator(
00619             group_buffer_current_mins[0],
00620             group_buffer_current_mins[1],
00621             group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
00622 #endif
00623         break;
00624     case 4:
00625 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00626         {
00627             std::pair<value_type *, value_type *> seqs[4] =
00628             {
00629                 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00630                 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00631                 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
00632                 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
00633             };
00634             parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
00635 
00636             group_buffer_current_mins[0] = seqs[0].first;
00637             group_buffer_current_mins[1] = seqs[1].first;
00638             group_buffer_current_mins[2] = seqs[2].first;
00639             group_buffer_current_mins[3] = seqs[3].first;
00640         }
00641 #else
00642         priority_queue_local::merge4_iterator(
00643             group_buffer_current_mins[0],
00644             group_buffer_current_mins[1],
00645             group_buffer_current_mins[2],
00646             group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free
00647 #endif
00648         break;
00649     default:
00650         STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
00651                     "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
00652     }
00653 
00654 #if STXXL_CHECK_ORDER_IN_SORTS
00655     if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
00656     {
00657         for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
00658         {
00659             if (inv_cmp(*v, *(v - 1)))
00660             {
00661                 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << "   " << *(v - 1) << " " << *v);
00662             }
00663         }
00664         assert(false);
00665     }
00666 #endif
00667     //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
00668 }
00669 
00670 //--------------------------------------------------------------------
00671 
00672 // check if space is available on level k and
00673 // empty this level if necessary leading to a recursive call.
00674 // return the level where space was finally available
00675 template <class Config_>
00676 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level)
00677 {
00678     STXXL_VERBOSE_PQ("make_space_available(" << level << ")");
00679     unsigned_type finalLevel;
00680     assert(level < total_num_groups);
00681     assert(level <= num_active_groups);
00682 
00683     if (level == num_active_groups)
00684         ++num_active_groups;
00685 
00686     const bool spaceIsAvailable_ =
00687         (level < num_int_groups) ? int_mergers[level].is_space_available()
00688         : (ext_mergers[level - num_int_groups]->is_space_available());
00689 
00690     if (spaceIsAvailable_)
00691     {
00692         finalLevel = level;
00693     }
00694     else if (level == total_num_groups - 1)
00695     {
00696         size_type capacity = N;
00697         for (int i = 0; i < num_int_groups; ++i)
00698             capacity *= IntKMAX;
00699         for (int i = 0; i < num_ext_groups; ++i)
00700             capacity *= ExtKMAX;
00701         STXXL_ERRMSG("priority_queue OVERFLOW - all groups full, size=" << size() <<
00702                      ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 << "))=" << capacity);
00703         dump_sizes();
00704 
00705         int extLevel = level - num_int_groups;
00706         const size_type segmentSize = ext_mergers[extLevel]->size();
00707         STXXL_VERBOSE1("Inserting segment into last level external: " << level << " " << segmentSize);
00708         ext_merger_type * overflow_merger = new ext_merger_type;
00709         overflow_merger->set_pool(pool);
00710         overflow_merger->insert_segment(*ext_mergers[extLevel], segmentSize);
00711         std::swap(ext_mergers[extLevel], overflow_merger);
00712         delete overflow_merger;
00713         finalLevel = level;
00714     }
00715     else
00716     {
00717         finalLevel = make_space_available(level + 1);
00718 
00719         if (level < num_int_groups - 1)                                  // from internal to internal tree
00720         {
00721             unsigned_type segmentSize = int_mergers[level].size();
00722             value_type * newSegment = new value_type[segmentSize + 1];
00723             int_mergers[level].multi_merge(newSegment, segmentSize);     // empty this level
00724 
00725             newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
00726             // for queues where size << #inserts
00727             // it might make sense to stay in this level if
00728             // segmentSize < alpha * KNN * k^level for some alpha < 1
00729             int_mergers[level + 1].insert_segment(newSegment, segmentSize);
00730         }
00731         else
00732         {
00733             if (level == num_int_groups - 1) // from internal to external tree
00734             {
00735                 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
00736                 STXXL_VERBOSE_PQ("make_space... Inserting segment into first level external: " << level << " " << segmentSize);
00737                 ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize);
00738             }
00739             else // from external to external tree
00740             {
00741                 const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
00742                 STXXL_VERBOSE_PQ("make_space... Inserting segment into second level external: " << level << " " << segmentSize);
00743                 ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize);
00744             }
00745         }
00746     }
00747     return finalLevel;
00748 }
00749 
00750 
00751 // empty the insert heap into the main data structure
00752 template <class Config_>
00753 void priority_queue<Config_>::empty_insert_heap()
00754 {
00755     STXXL_VERBOSE_PQ("empty_insert_heap()");
00756     assert(insert_heap.size() == (N + 1));
00757 
00758     const value_type sup = get_supremum();
00759 
00760     // build new segment
00761     value_type * newSegment = new value_type[N + 1];
00762     value_type * newPos = newSegment;
00763 
00764     // put the new data there for now
00765     //insert_heap.sortTo(newSegment);
00766     value_type * SortTo = newSegment;
00767 
00768     insert_heap.sort_to(SortTo);
00769 
00770     SortTo = newSegment + N;
00771     insert_heap.clear();
00772     insert_heap.push(*SortTo);
00773 
00774     assert(insert_heap.size() == 1);
00775 
00776     newSegment[N] = sup; // sentinel
00777 
00778     // copy the delete_buffer and group_buffers[0] to temporary storage
00779     // (the temporary can be eliminated using some dirty tricks)
00780     const unsigned_type tempSize = N + delete_buffer_size;
00781     value_type temp[tempSize + 1];
00782     unsigned_type sz1 = current_delete_buffer_size();
00783     unsigned_type sz2 = current_group_buffer_size(0);
00784     value_type * pos = temp + tempSize - sz1 - sz2;
00785     std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
00786     std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
00787     temp[tempSize] = sup; // sentinel
00788 
00789     // refill delete_buffer
00790     // (using more complicated code it could be made somewhat fuller
00791     // in certain circumstances)
00792     priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
00793 
00794     // refill group_buffers[0]
00795     // (as above we might want to take the opportunity
00796     // to make group_buffers[0] fuller)
00797     priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
00798 
00799     // merge the rest to the new segment
00800     // note that merge exactly trips into the footsteps
00801     // of itself
00802     priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
00803 
00804     // and insert it
00805     unsigned_type freeLevel = make_space_available(0);
00806     assert(freeLevel == 0 || int_mergers[0].size() == 0);
00807     int_mergers[0].insert_segment(newSegment, N);
00808 
00809     // get rid of invalid level 2 buffers
00810     // by inserting them into tree 0 (which is almost empty in this case)
00811     if (freeLevel > 0)
00812     {
00813         for (int_type i = freeLevel; i >= 0; i--)
00814         {
00815             // reverse order not needed
00816             // but would allow immediate refill
00817 
00818             newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
00819             std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
00820             int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
00821             group_buffer_current_mins[i] = group_buffers[i] + N;           // empty
00822         }
00823     }
00824 
00825     // update size
00826     size_ += size_type(N);
00827 
00828     // special case if the tree was empty before
00829     if (delete_buffer_current_min == delete_buffer_end)
00830         refill_delete_buffer();
00831 }
00832 
00833 template <class Config_>
00834 void priority_queue<Config_>::dump_sizes() const
00835 {
00836     unsigned_type capacity = N;
00837     STXXL_MSG("pq::size()\t= " << size());
00838     STXXL_MSG("  insert_heap\t= " << insert_heap.size() - 1 << "/" << capacity);
00839     STXXL_MSG("  delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) << "/" << delete_buffer_size);
00840     for (int i = 0; i < num_int_groups; ++i) {
00841         capacity *= IntKMAX;
00842         STXXL_MSG("  grp " << i << " int" <<
00843                 " grpbuf=" << current_group_buffer_size(i) <<
00844                 " size=" << int_mergers[i].size() << "/" << capacity << 
00845                 " space=" << int_mergers[i].is_space_available());
00846     }
00847     for (int i = 0; i < num_ext_groups; ++i) {
00848         capacity *= ExtKMAX;
00849         STXXL_MSG("  grp " << i + num_int_groups << " ext" <<
00850                 " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
00851                 " size=" << ext_mergers[i]->size() << "/" << capacity <<
00852                 " space=" << ext_mergers[i]->is_space_available());
00853     }
00854     dump_params();
00855 }
00856 
00857 template <class Config_>
00858 void priority_queue<Config_>::dump_params() const
00859 {
00860     STXXL_MSG("params: delete_buffer_size=" << delete_buffer_size << " N=" << N << " IntKMAX=" << IntKMAX << " num_int_groups=" << num_int_groups << " ExtKMAX=" << ExtKMAX << " num_ext_groups=" << num_ext_groups << " BlockSize=" << BlockSize);
00861 }
00862 
00863 namespace priority_queue_local
00864 {
00865     struct Parameters_for_priority_queue_not_found_Increase_IntM
00866     {
00867         enum { fits = false };
00868         typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00869     };
00870 
00871     struct dummy
00872     {
00873         enum { fits = false };
00874         typedef dummy result;
00875     };
00876 
00877     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false>
00878     struct find_B_m
00879     {
00880         typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self;
00881         enum {
00882             k = IntM_ / B_,    // number of blocks that fit into M
00883             element_size = E_, // element size
00884             IntM = IntM_,
00885             B = B_,            // block size
00886             m = m_,            // number of blocks fitting into buffers
00887             c = k - m_,
00888             // memory occ. by block must be at least 10 times larger than size of ext sequence
00889             // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2
00890             fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_
00891                    && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128),
00892             step = 1
00893         };
00894 
00895         typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1;
00896         typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2;
00897         typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result;
00898     };
00899 
00900     // specialization for the case when no valid parameters are found
00901     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop>
00902     struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop>
00903     {
00904         enum { fits = false };
00905         typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00906     };
00907 
00908     // to speedup search
00909     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_>
00910     struct find_B_m<E_, IntM_, MaxS_, B_, m_, true>
00911     {
00912         enum { fits = false };
00913         typedef dummy result;
00914     };
00915 
00916     // E_ size of element in bytes
00917     template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_>
00918     struct find_settings
00919     {
00920         // start from block size (8*1024*1024) bytes
00921         typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result;
00922     };
00923 
00924     struct Parameters_not_found_Try_to_change_the_Tune_parameter
00925     {
00926         typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00927     };
00928 
00929 
00930     template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_>
00931     struct compute_N
00932     {
00933         typedef compute_N<AI_, X_, CriticalSize_> Self;
00934         enum
00935         {
00936             X = X_,
00937             AI = AI_,
00938             N = X / (AI * AI) // two stage internal
00939         };
00940         typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result;
00941     };
00942 
00943     template <unsigned_type X_, unsigned_type CriticalSize_>
00944     struct compute_N<1, X_, CriticalSize_>
00945     {
00946         typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00947     };
00948 }
00949 
00950 //! \}
00951 
00952 //! \addtogroup stlcont
00953 //! \{
00954 
00955 //! \brief Priority queue type generator
00956 
00957 //! Implements a data structure from "Peter Sanders. Fast Priority Queues
00958 //! for Cached Memory. ALENEX'99" for external memory.
00959 //! <BR>
00960 //! \tparam type of the contained objects (POD with no references to internal memory)
00961 //! \tparam the comparison type used to determine
00962 //! whether one element is smaller than another element.
00963 //! If Cmp_(x,y) is true, then x is smaller than y. The element
00964 //! returned by Q.top() is the largest element in the priority
00965 //! queue. That is, it has the property that, for every other
00966 //! element \b x in the priority queue, Cmp_(Q.top(), x) is false.
00967 //! Cmp_ must also provide min_value method, that returns value of type Tp_ that is
00968 //! smaller than any element of the queue \b x , i.e. Cmp_(Cmp_.min_value(),x) is
00969 //! always \b true . <BR>
00970 //! <BR>
00971 //! Example: comparison object for priority queue
00972 //! where \b top() returns the \b smallest contained integer:
00973 //! \verbatim
00974 //! struct CmpIntGreater
00975 //! {
00976 //!   bool operator () (const int & a, const int & b) const { return a>b; }
00977 //!   int min_value() const  { return std::numeric_limits<int>::max(); }
00978 //! };
00979 //! \endverbatim
00980 //! Example: comparison object for priority queue
00981 //! where \b top() returns the \b largest contained integer:
00982 //! \verbatim
00983 //! struct CmpIntLess
00984 //! {
00985 //!   bool operator () (const int & a, const int & b) const { return a<b; }
00986 //!   int min_value() const  { return std::numeric_limits<int>::min(); }
00987 //! };
00988 //! \endverbatim
00989 //! Note that Cmp_ must define strict weak ordering.
00990 //! (<A HREF="http://www.sgi.com/tech/stl/StrictWeakOrdering.html">see what it is</A>)
00991 //! - \c IntM_ upper limit for internal memory consumption in bytes.
00992 //! - \c MaxS_ upper limit for number of elements contained in the priority queue (in 1024 units).
00993 //! Example: if you are sure that priority queue contains no more than
00994 //! one million elements in a time, then the right parameter is (1000000/1024)= 976 .
00995 //! - \c Tune_ tuning parameter. Try to play with it if the code does not compile
00996 //! (larger than default values might help). Code does not compile
00997 //! if no suitable internal parameters were found for given IntM_ and MaxS_.
00998 //! It might also happen that given IntM_ is too small for given MaxS_, try larger values.
00999 //! <BR>
01000 //! \c PRIORITY_QUEUE_GENERATOR is template meta program that searches
01001 //! for \b 7 configuration parameters of \b stxxl::priority_queue that both
01002 //! minimize internal memory consumption of the priority queue to
01003 //! match IntM_ and maximize performance of priority queue operations.
01004 //! Actual memory consumption might be larger (use
01005 //! \c stxxl::priority_queue::mem_cons() method to track it), since the search
01006 //! assumes rather optimistic schedule of push'es and pop'es for the
01007 //! estimation of the maximum memory consumption. To keep actual memory
01008 //! requirements low increase the value of MaxS_ parameter.
01009 //! <BR>
01010 //! For functioning a priority queue object requires two pools of blocks
01011 //! (See constructor of \c priority_queue ). To construct \c <stxxl> block
01012 //! pools you might need \b block \b type that will be used by priority queue.
01013 //! Note that block's size and hence it's type is generated by
01014 //! the \c PRIORITY_QUEUE_GENERATOR in compile type from IntM_, MaxS_ and sizeof(Tp_) and
01015 //! not given directly by user as a template parameter. Block type can be extracted as
01016 //! \c PRIORITY_QUEUE_GENERATOR<some_parameters>::result::block_type .
01017 //! For an example see p_queue.cpp .
01018 //! Configured priority queue type is available as \c PRIORITY_QUEUE_GENERATOR<>::result. <BR> <BR>
01019 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6>
01020 class PRIORITY_QUEUE_GENERATOR
01021 {
01022 public:
01023     // actual calculation of B, m, k and element_size
01024     typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings;
01025     enum {
01026         B = settings::B,
01027         m = settings::m,
01028         X = B * (settings::k - m) / settings::element_size,  // interpretation of result
01029         Buffer1Size = 32                                     // fixed
01030     };
01031     // derivation of N, AI, AE
01032     typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN;
01033     enum
01034     {
01035         N = ComputeN::N,
01036         AI = ComputeN::AI,
01037         AE = (m / 2 < 2) ? 2 : (m / 2)            // at least 2
01038     };
01039     enum {
01040         // Estimation of maximum internal memory consumption (in bytes)
01041         EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
01042     };
01043     /*
01044         unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
01045         unsigned N_ = 512,          // bandwidth
01046         unsigned IntKMAX_ = 64,     // maximal arity for internal mergers
01047         unsigned IntLevels_ = 4,
01048         unsigned BlockSize_ = (2*1024*1024),
01049         unsigned ExtKMAX_ = 64,     // maximal arity for external mergers
01050         unsigned ExtLevels_ = 2,
01051      */
01052     typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result;
01053 };
01054 
01055 //! \}
01056 
01057 __STXXL_END_NAMESPACE
01058 
01059 
01060 namespace std
01061 {
01062     template <class Config_>
01063     void swap(stxxl::priority_queue<Config_> & a,
01064               stxxl::priority_queue<Config_> & b)
01065     {
01066         a.swap(b);
01067     }
01068 }
01069 
01070 #endif // !STXXL_PRIORITY_QUEUE_HEADER
01071 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines