Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/containers/priority_queue.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 1999 Peter Sanders <sanders@mpi-sb.mpg.de> 00007 * Copyright (C) 2003, 2004, 2007 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00008 * Copyright (C) 2007-2009 Johannes Singler <singler@ira.uka.de> 00009 * Copyright (C) 2007-2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00010 * 00011 * Distributed under the Boost Software License, Version 1.0. 00012 * (See accompanying file LICENSE_1_0.txt or copy at 00013 * http://www.boost.org/LICENSE_1_0.txt) 00014 **************************************************************************/ 00015 00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER 00017 #define STXXL_PRIORITY_QUEUE_HEADER 00018 00019 #include <vector> 00020 00021 #include <stxxl/bits/deprecated.h> 00022 #include <stxxl/bits/mng/typed_block.h> 00023 #include <stxxl/bits/mng/block_alloc.h> 00024 #include <stxxl/bits/mng/read_write_pool.h> 00025 #include <stxxl/bits/mng/prefetch_pool.h> 00026 #include <stxxl/bits/mng/write_pool.h> 00027 #include <stxxl/bits/common/tmeta.h> 00028 #include <stxxl/bits/algo/sort_base.h> 00029 #include <stxxl/bits/parallel.h> 00030 #include <stxxl/bits/common/is_sorted.h> 00031 00032 #if STXXL_PARALLEL 00033 00034 #if defined(STXXL_PARALLEL_MODE) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400) 00035 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00036 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00037 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00038 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0 00039 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0 00040 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0 00041 #endif 00042 00043 // enable/disable parallel merging for certain cases, for performance tuning 00044 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00045 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1 00046 #endif 00047 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00048 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1 00049 #endif 00050 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00051 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1 00052 #endif 00053 00054 #endif //STXXL_PARALLEL 00055 00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00057 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences 00058 #else 00059 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1 00060 #endif 00061 00062 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00063 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences 00064 #else 00065 #define STXXL_PQ_INTERNAL_LOSER_TREE 1 00066 #endif 00067 00068 #define STXXL_VERBOSE_PQ(msg) STXXL_VERBOSE2("[" << static_cast<void *>(this) << "] priority_queue::" << msg) 00069 00070 #include <stxxl/bits/containers/pq_helpers.h> 00071 #include <stxxl/bits/containers/pq_mergers.h> 00072 #include <stxxl/bits/containers/pq_ext_merger.h> 00073 #include <stxxl/bits/containers/pq_losertree.h> 00074 00075 __STXXL_BEGIN_NAMESPACE 00076 00077 /* 00078 KNBufferSize1 = 32; 00079 KNN = 512; // length of group 1 sequences 00080 KNKMAX = 64; // maximal arity 00081 LogKNKMAX = 6; // ceil(log KNKMAX) 00082 KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels 00083 */ 00084 00085 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext 00086 00087 template < 00088 class Tp_, 00089 class Cmp_, 00090 unsigned BufferSize1_ = 32, // equalize procedure call overheads etc. 00091 unsigned N_ = 512, // length of group 1 sequences 00092 unsigned IntKMAX_ = 64, // maximal arity for internal mergers 00093 unsigned IntLevels_ = 4, // number of internal groups 00094 unsigned BlockSize_ = (2 * 1024 * 1024), // external block size 00095 unsigned ExtKMAX_ = 64, // maximal arity for external mergers 00096 unsigned ExtLevels_ = 2, // number of external groups 00097 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY 00098 > 00099 struct priority_queue_config 00100 { 00101 typedef Tp_ value_type; 00102 typedef Cmp_ comparator_type; 00103 typedef AllocStr_ alloc_strategy_type; 00104 enum 00105 { 00106 delete_buffer_size = BufferSize1_, 00107 N = N_, 00108 IntKMAX = IntKMAX_, 00109 num_int_groups = IntLevels_, 00110 num_ext_groups = ExtLevels_, 00111 BlockSize = BlockSize_, 00112 ExtKMAX = ExtKMAX_, 00113 element_size = sizeof(Tp_) 00114 }; 00115 }; 00116 00117 __STXXL_END_NAMESPACE 00118 00119 namespace std 00120 { 00121 template <class BlockType_, 00122 class Cmp_, 00123 unsigned Arity_, 00124 class AllocStr_> 00125 void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a, 00126 stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b) 00127 { 00128 a.swap(b); 00129 } 00130 template <class ValTp_, class Cmp_, unsigned KNKMAX> 00131 void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a, 00132 stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b) 00133 { 00134 a.swap(b); 00135 } 00136 } 00137 00138 __STXXL_BEGIN_NAMESPACE 00139 00140 //! \brief External priority queue data structure 00141 template <class Config_> 00142 class priority_queue : private noncopyable 00143 { 00144 public: 00145 typedef Config_ Config; 00146 enum 00147 { 00148 delete_buffer_size = Config::delete_buffer_size, 00149 N = Config::N, 00150 IntKMAX = Config::IntKMAX, 00151 num_int_groups = Config::num_int_groups, 00152 num_ext_groups = Config::num_ext_groups, 00153 total_num_groups = Config::num_int_groups + Config::num_ext_groups, 00154 BlockSize = Config::BlockSize, 00155 ExtKMAX = Config::ExtKMAX 00156 }; 00157 00158 //! \brief The type of object stored in the \b priority_queue 00159 typedef typename Config::value_type value_type; 00160 //! \brief Comparison object 00161 typedef typename Config::comparator_type comparator_type; 00162 typedef typename Config::alloc_strategy_type alloc_strategy_type; 00163 //! \brief An unsigned integral type (64 bit) 00164 typedef stxxl::uint64 size_type; 00165 typedef typed_block<BlockSize, value_type> block_type; 00166 typedef read_write_pool<block_type> pool_type; 00167 00168 protected: 00169 typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type> 00170 insert_heap_type; 00171 00172 typedef priority_queue_local::loser_tree< 00173 value_type, 00174 comparator_type, 00175 IntKMAX> int_merger_type; 00176 00177 typedef priority_queue_local::ext_merger< 00178 block_type, 00179 comparator_type, 00180 ExtKMAX, 00181 alloc_strategy_type> ext_merger_type; 00182 00183 00184 int_merger_type int_mergers[num_int_groups]; 00185 pool_type * pool; 00186 bool pool_owned; 00187 ext_merger_type ** ext_mergers; 00188 00189 // one delete buffer for each tree => group buffer 00190 value_type group_buffers[total_num_groups][N + 1]; // tree->group_buffers->delete_buffer (extra space for sentinel) 00191 value_type * group_buffer_current_mins[total_num_groups]; // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N 00192 00193 // overall delete buffer 00194 value_type delete_buffer[delete_buffer_size + 1]; 00195 value_type * delete_buffer_current_min; // current start of delete_buffer 00196 value_type * delete_buffer_end; // end of delete_buffer 00197 00198 comparator_type cmp; 00199 00200 // insert buffer 00201 insert_heap_type insert_heap; 00202 00203 // how many groups are active 00204 unsigned_type num_active_groups; 00205 00206 // total size not counting insert_heap and delete_buffer 00207 size_type size_; 00208 00209 private: 00210 void init(); 00211 00212 void refill_delete_buffer(); 00213 unsigned_type refill_group_buffer(unsigned_type k); 00214 00215 unsigned_type make_space_available(unsigned_type level); 00216 void empty_insert_heap(); 00217 00218 value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; } 00219 unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; } 00220 unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; } 00221 00222 public: 00223 //! \brief Constructs external priority queue object 00224 //! \param pool_ pool of blocks that will be used 00225 //! for data writing and prefetching for the disk<->memory transfers 00226 //! happening in the priority queue. Larger pool size 00227 //! helps to speed up operations. 00228 priority_queue(pool_type & pool_); 00229 00230 //! \brief Constructs external priority queue object 00231 //! \param p_pool_ pool of blocks that will be used 00232 //! for data prefetching for the disk<->memory transfers 00233 //! happening in the priority queue. Larger pool size 00234 //! helps to speed up operations. 00235 //! \param w_pool_ pool of blocks that will be used 00236 //! for writing data for the memory<->disk transfers 00237 //! happening in the priority queue. Larger pool size 00238 //! helps to speed up operations. 00239 _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_)); 00240 00241 //! \brief Constructs external priority queue object 00242 //! \param p_pool_mem memory (in bytes) for prefetch pool that will be used 00243 //! for data prefetching for the disk<->memory transfers 00244 //! happening in the priority queue. Larger pool size 00245 //! helps to speed up operations. 00246 //! \param w_pool_mem memory (in bytes) for buffered write pool that will be used 00247 //! for writing data for the memory<->disk transfers 00248 //! happening in the priority queue. Larger pool size 00249 //! helps to speed up operations. 00250 priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem); 00251 00252 #if 0 00253 //! \brief swap this priority queue with another one 00254 //! Implementation correctness is questionable. 00255 void swap(priority_queue & obj) 00256 { 00257 //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug? 00258 for (unsigned_type i = 0; i < num_int_groups; ++i) 00259 std::swap(int_mergers[i], obj.int_mergers[i]); 00260 00261 //std::swap(pool,obj.pool); 00262 //std::swap(pool_owned, obj.pool_owned); 00263 std::swap(ext_mergers, obj.ext_mergers); 00264 for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1) 00265 for (unsigned_type i2 = 0; i2 < (N + 1); ++i2) 00266 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]); 00267 00268 STXXL_STATIC_ASSERT(false); 00269 // Shoot yourself in the foot: group_buffer_current_mins contains pointers into group_buffers ... 00270 // either recompute them or add/subtract (&this->group_buffers[0][0] - &obj->group_buffers[0][0]) 00271 swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups); 00272 swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1); 00273 std::swap(delete_buffer_current_min, obj.delete_buffer_current_min); 00274 std::swap(delete_buffer_end, obj.delete_buffer_end); 00275 std::swap(cmp, obj.cmp); 00276 std::swap(insert_heap, obj.insert_heap); 00277 std::swap(num_active_groups, obj.num_active_groups); 00278 std::swap(size_, obj.size_); 00279 } 00280 #endif 00281 00282 virtual ~priority_queue(); 00283 00284 //! \brief Returns number of elements contained 00285 //! \return number of elements contained 00286 size_type size() const; 00287 00288 //! \brief Returns true if queue has no elements 00289 //! \return \b true if queue has no elements, \b false otherwise 00290 bool empty() const { return (size() == 0); } 00291 00292 //! \brief Returns "largest" element 00293 //! 00294 //! Returns a const reference to the element at the 00295 //! top of the priority_queue. The element at the top is 00296 //! guaranteed to be the largest element in the \b priority queue, 00297 //! as determined by the comparison function \b Config_::comparator_type 00298 //! (the same as the second parameter of PRIORITY_QUEUE_GENERATOR utility 00299 //! class). That is, 00300 //! for every other element \b x in the priority_queue, 00301 //! \b Config_::comparator_type(Q.top(), x) is false. 00302 //! Precondition: \c empty() is false. 00303 const value_type & top() const; 00304 00305 //! \brief Removes the element at the top 00306 //! 00307 //! Removes the element at the top of the priority_queue, that 00308 //! is, the largest element in the \b priority_queue. 00309 //! Precondition: \c empty() is \b false. 00310 //! Postcondition: \c size() will be decremented by 1. 00311 void pop(); 00312 00313 //! \brief Inserts x into the priority_queue. 00314 //! 00315 //! Inserts x into the priority_queue. Postcondition: 00316 //! \c size() will be incremented by 1. 00317 void push(const value_type & obj); 00318 00319 //! \brief Returns number of bytes consumed by 00320 //! the \b priority_queue 00321 //! \brief number of bytes consumed by the \b priority_queue from 00322 //! the internal memory not including pools (see the constructor) 00323 unsigned_type mem_cons() const 00324 { 00325 unsigned_type dynam_alloc_mem = 0; 00326 //dynam_alloc_mem += w_pool.mem_cons(); 00327 //dynam_alloc_mem += p_pool.mem_cons(); 00328 for (int i = 0; i < num_int_groups; ++i) 00329 dynam_alloc_mem += int_mergers[i].mem_cons(); 00330 00331 for (int i = 0; i < num_ext_groups; ++i) 00332 dynam_alloc_mem += ext_mergers[i]->mem_cons(); 00333 00334 00335 return (sizeof(*this) + 00336 sizeof(ext_merger_type) * num_ext_groups + 00337 dynam_alloc_mem); 00338 } 00339 00340 void dump_sizes() const; 00341 void dump_params() const; 00342 }; 00343 00344 00345 template <class Config_> 00346 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const 00347 { 00348 return size_ + 00349 insert_heap.size() - 1 + 00350 (delete_buffer_end - delete_buffer_current_min); 00351 } 00352 00353 00354 template <class Config_> 00355 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const 00356 { 00357 assert(!insert_heap.empty()); 00358 00359 const typename priority_queue<Config_>::value_type & t = insert_heap.top(); 00360 if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t)) 00361 return t; 00362 else 00363 return *delete_buffer_current_min; 00364 } 00365 00366 template <class Config_> 00367 inline void priority_queue<Config_>::pop() 00368 { 00369 //STXXL_VERBOSE1("priority_queue::pop()"); 00370 assert(!insert_heap.empty()); 00371 00372 if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top())) 00373 insert_heap.pop(); 00374 else 00375 { 00376 assert(delete_buffer_current_min < delete_buffer_end); 00377 ++delete_buffer_current_min; 00378 if (delete_buffer_current_min == delete_buffer_end) 00379 refill_delete_buffer(); 00380 } 00381 } 00382 00383 template <class Config_> 00384 inline void priority_queue<Config_>::push(const value_type & obj) 00385 { 00386 //STXXL_VERBOSE3("priority_queue::push("<< obj <<")"); 00387 assert(int_mergers->not_sentinel(obj)); 00388 if (insert_heap.size() == N + 1) 00389 empty_insert_heap(); 00390 00391 00392 assert(!insert_heap.empty()); 00393 00394 insert_heap.push(obj); 00395 } 00396 00397 00398 //////////////////////////////////////////////////////////////// 00399 00400 template <class Config_> 00401 priority_queue<Config_>::priority_queue(pool_type & pool_) : 00402 pool(&pool_), 00403 pool_owned(false), 00404 delete_buffer_end(delete_buffer + delete_buffer_size), 00405 insert_heap(N + 2), 00406 num_active_groups(0), size_(0) 00407 { 00408 STXXL_VERBOSE_PQ("priority_queue(pool)"); 00409 init(); 00410 } 00411 00412 // DEPRECATED 00413 template <class Config_> 00414 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) : 00415 pool(new pool_type(p_pool_, w_pool_)), 00416 pool_owned(true), 00417 delete_buffer_end(delete_buffer + delete_buffer_size), 00418 insert_heap(N + 2), 00419 num_active_groups(0), size_(0) 00420 { 00421 STXXL_VERBOSE_PQ("priority_queue(p_pool, w_pool)"); 00422 init(); 00423 } 00424 00425 template <class Config_> 00426 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) : 00427 pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)), 00428 pool_owned(true), 00429 delete_buffer_end(delete_buffer + delete_buffer_size), 00430 insert_heap(N + 2), 00431 num_active_groups(0), size_(0) 00432 { 00433 STXXL_VERBOSE_PQ("priority_queue(pool sizes)"); 00434 init(); 00435 } 00436 00437 template <class Config_> 00438 void priority_queue<Config_>::init() 00439 { 00440 assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering 00441 00442 ext_mergers = new ext_merger_type*[num_ext_groups]; 00443 for (unsigned_type j = 0; j < num_ext_groups; ++j) { 00444 ext_mergers[j] = new ext_merger_type; 00445 ext_mergers[j]->set_pool(pool); 00446 } 00447 00448 value_type sentinel = cmp.min_value(); 00449 insert_heap.push(sentinel); // always keep the sentinel 00450 delete_buffer[delete_buffer_size] = sentinel; // sentinel 00451 delete_buffer_current_min = delete_buffer_end; // empty 00452 for (unsigned_type i = 0; i < total_num_groups; i++) 00453 { 00454 group_buffers[i][N] = sentinel; // sentinel 00455 group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty 00456 } 00457 } 00458 00459 template <class Config_> 00460 priority_queue<Config_>::~priority_queue() 00461 { 00462 STXXL_VERBOSE_PQ("~priority_queue()"); 00463 if (pool_owned) 00464 delete pool; 00465 00466 for (unsigned_type j = 0; j < num_ext_groups; ++j) 00467 delete ext_mergers[j]; 00468 delete[] ext_mergers; 00469 } 00470 00471 //--------------------- Buffer refilling ------------------------------- 00472 00473 // refill group_buffers[j] and return number of elements found 00474 template <class Config_> 00475 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group) 00476 { 00477 STXXL_VERBOSE_PQ("refill_group_buffer(" << group << ")"); 00478 00479 value_type * target; 00480 unsigned_type length; 00481 size_type group_size = (group < num_int_groups) ? 00482 int_mergers[group].size() : 00483 ext_mergers[group - num_int_groups]->size(); // elements left in segments 00484 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer 00485 if (group_size + left_elements >= size_type(N)) 00486 { // buffer will be filled completely 00487 target = group_buffers[group]; 00488 length = N - left_elements; 00489 } 00490 else 00491 { 00492 target = group_buffers[group] + N - group_size - left_elements; 00493 length = group_size; 00494 } 00495 00496 if (length > 0) 00497 { 00498 // shift remaininig elements to front 00499 memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type)); 00500 group_buffer_current_mins[group] = target; 00501 00502 // fill remaining space from group 00503 if (group < num_int_groups) 00504 int_mergers[group].multi_merge(target + left_elements, length); 00505 else 00506 ext_mergers[group - num_int_groups]->multi_merge( 00507 target + left_elements, 00508 target + left_elements + length); 00509 } 00510 00511 //STXXL_MSG(length + left_elements); 00512 //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n")); 00513 #if STXXL_CHECK_ORDER_IN_SORTS 00514 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp); 00515 if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp)) 00516 { 00517 STXXL_VERBOSE_PQ("refill_grp... length: " << length << " left_elements: " << left_elements); 00518 for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v) 00519 { 00520 if (inv_cmp(*v, *(v - 1))) 00521 { 00522 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1)); 00523 } 00524 } 00525 assert(false); 00526 } 00527 #endif 00528 00529 return length + left_elements; 00530 } 00531 00532 00533 template <class Config_> 00534 void priority_queue<Config_>::refill_delete_buffer() 00535 { 00536 STXXL_VERBOSE_PQ("refill_delete_buffer()"); 00537 00538 size_type total_group_size = 0; 00539 //num_active_groups is <= 4 00540 for (int i = num_active_groups - 1; i >= 0; i--) 00541 { 00542 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size) 00543 { 00544 unsigned_type length = refill_group_buffer(i); 00545 // max active level dry now? 00546 if (length == 0 && unsigned(i) == num_active_groups - 1) 00547 --num_active_groups; 00548 00549 total_group_size += length; 00550 } 00551 else 00552 total_group_size += delete_buffer_size; // actually only a sufficient lower bound 00553 } 00554 00555 unsigned_type length; 00556 if (total_group_size >= delete_buffer_size) // buffer can be filled completely 00557 { 00558 length = delete_buffer_size; // amount to be copied 00559 size_ -= size_type(delete_buffer_size); // amount left in group_buffers 00560 } 00561 else 00562 { 00563 length = total_group_size; 00564 assert(size_ == size_type(length)); // trees and group_buffers get empty 00565 size_ = 0; 00566 } 00567 00568 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp); 00569 00570 // now call simplified refill routines 00571 // which can make the assumption that 00572 // they find all they are asked in the buffers 00573 delete_buffer_current_min = delete_buffer_end - length; 00574 STXXL_VERBOSE_PQ("refill_del... Active groups = " << num_active_groups); 00575 switch (num_active_groups) 00576 { 00577 case 0: 00578 break; 00579 case 1: 00580 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min); 00581 group_buffer_current_mins[0] += length; 00582 break; 00583 case 2: 00584 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00585 { 00586 std::pair<value_type *, value_type *> seqs[2] = 00587 { 00588 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N), 00589 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N) 00590 }; 00591 parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately 00592 00593 group_buffer_current_mins[0] = seqs[0].first; 00594 group_buffer_current_mins[1] = seqs[1].first; 00595 } 00596 #else 00597 priority_queue_local::merge_iterator( 00598 group_buffer_current_mins[0], 00599 group_buffer_current_mins[1], delete_buffer_current_min, length, cmp); 00600 #endif 00601 break; 00602 case 3: 00603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00604 { 00605 std::pair<value_type *, value_type *> seqs[3] = 00606 { 00607 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N), 00608 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N), 00609 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N) 00610 }; 00611 parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately 00612 00613 group_buffer_current_mins[0] = seqs[0].first; 00614 group_buffer_current_mins[1] = seqs[1].first; 00615 group_buffer_current_mins[2] = seqs[2].first; 00616 } 00617 #else 00618 priority_queue_local::merge3_iterator( 00619 group_buffer_current_mins[0], 00620 group_buffer_current_mins[1], 00621 group_buffer_current_mins[2], delete_buffer_current_min, length, cmp); 00622 #endif 00623 break; 00624 case 4: 00625 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00626 { 00627 std::pair<value_type *, value_type *> seqs[4] = 00628 { 00629 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N), 00630 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N), 00631 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N), 00632 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N) 00633 }; 00634 parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately 00635 00636 group_buffer_current_mins[0] = seqs[0].first; 00637 group_buffer_current_mins[1] = seqs[1].first; 00638 group_buffer_current_mins[2] = seqs[2].first; 00639 group_buffer_current_mins[3] = seqs[3].first; 00640 } 00641 #else 00642 priority_queue_local::merge4_iterator( 00643 group_buffer_current_mins[0], 00644 group_buffer_current_mins[1], 00645 group_buffer_current_mins[2], 00646 group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free 00647 #endif 00648 break; 00649 default: 00650 STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()", 00651 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4"); 00652 } 00653 00654 #if STXXL_CHECK_ORDER_IN_SORTS 00655 if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp)) 00656 { 00657 for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v) 00658 { 00659 if (inv_cmp(*v, *(v - 1))) 00660 { 00661 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v); 00662 } 00663 } 00664 assert(false); 00665 } 00666 #endif 00667 //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n")); 00668 } 00669 00670 //-------------------------------------------------------------------- 00671 00672 // check if space is available on level k and 00673 // empty this level if necessary leading to a recursive call. 00674 // return the level where space was finally available 00675 template <class Config_> 00676 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level) 00677 { 00678 STXXL_VERBOSE_PQ("make_space_available(" << level << ")"); 00679 unsigned_type finalLevel; 00680 assert(level < total_num_groups); 00681 assert(level <= num_active_groups); 00682 00683 if (level == num_active_groups) 00684 ++num_active_groups; 00685 00686 const bool spaceIsAvailable_ = 00687 (level < num_int_groups) ? int_mergers[level].is_space_available() 00688 : (ext_mergers[level - num_int_groups]->is_space_available()); 00689 00690 if (spaceIsAvailable_) 00691 { 00692 finalLevel = level; 00693 } 00694 else if (level == total_num_groups - 1) 00695 { 00696 size_type capacity = N; 00697 for (int i = 0; i < num_int_groups; ++i) 00698 capacity *= IntKMAX; 00699 for (int i = 0; i < num_ext_groups; ++i) 00700 capacity *= ExtKMAX; 00701 STXXL_ERRMSG("priority_queue OVERFLOW - all groups full, size=" << size() << 00702 ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 << "))=" << capacity); 00703 dump_sizes(); 00704 00705 int extLevel = level - num_int_groups; 00706 const size_type segmentSize = ext_mergers[extLevel]->size(); 00707 STXXL_VERBOSE1("Inserting segment into last level external: " << level << " " << segmentSize); 00708 ext_merger_type * overflow_merger = new ext_merger_type; 00709 overflow_merger->set_pool(pool); 00710 overflow_merger->insert_segment(*ext_mergers[extLevel], segmentSize); 00711 std::swap(ext_mergers[extLevel], overflow_merger); 00712 delete overflow_merger; 00713 finalLevel = level; 00714 } 00715 else 00716 { 00717 finalLevel = make_space_available(level + 1); 00718 00719 if (level < num_int_groups - 1) // from internal to internal tree 00720 { 00721 unsigned_type segmentSize = int_mergers[level].size(); 00722 value_type * newSegment = new value_type[segmentSize + 1]; 00723 int_mergers[level].multi_merge(newSegment, segmentSize); // empty this level 00724 00725 newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel 00726 // for queues where size << #inserts 00727 // it might make sense to stay in this level if 00728 // segmentSize < alpha * KNN * k^level for some alpha < 1 00729 int_mergers[level + 1].insert_segment(newSegment, segmentSize); 00730 } 00731 else 00732 { 00733 if (level == num_int_groups - 1) // from internal to external tree 00734 { 00735 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size(); 00736 STXXL_VERBOSE_PQ("make_space... Inserting segment into first level external: " << level << " " << segmentSize); 00737 ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize); 00738 } 00739 else // from external to external tree 00740 { 00741 const size_type segmentSize = ext_mergers[level - num_int_groups]->size(); 00742 STXXL_VERBOSE_PQ("make_space... Inserting segment into second level external: " << level << " " << segmentSize); 00743 ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize); 00744 } 00745 } 00746 } 00747 return finalLevel; 00748 } 00749 00750 00751 // empty the insert heap into the main data structure 00752 template <class Config_> 00753 void priority_queue<Config_>::empty_insert_heap() 00754 { 00755 STXXL_VERBOSE_PQ("empty_insert_heap()"); 00756 assert(insert_heap.size() == (N + 1)); 00757 00758 const value_type sup = get_supremum(); 00759 00760 // build new segment 00761 value_type * newSegment = new value_type[N + 1]; 00762 value_type * newPos = newSegment; 00763 00764 // put the new data there for now 00765 //insert_heap.sortTo(newSegment); 00766 value_type * SortTo = newSegment; 00767 00768 insert_heap.sort_to(SortTo); 00769 00770 SortTo = newSegment + N; 00771 insert_heap.clear(); 00772 insert_heap.push(*SortTo); 00773 00774 assert(insert_heap.size() == 1); 00775 00776 newSegment[N] = sup; // sentinel 00777 00778 // copy the delete_buffer and group_buffers[0] to temporary storage 00779 // (the temporary can be eliminated using some dirty tricks) 00780 const unsigned_type tempSize = N + delete_buffer_size; 00781 value_type temp[tempSize + 1]; 00782 unsigned_type sz1 = current_delete_buffer_size(); 00783 unsigned_type sz2 = current_group_buffer_size(0); 00784 value_type * pos = temp + tempSize - sz1 - sz2; 00785 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos); 00786 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1); 00787 temp[tempSize] = sup; // sentinel 00788 00789 // refill delete_buffer 00790 // (using more complicated code it could be made somewhat fuller 00791 // in certain circumstances) 00792 priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp); 00793 00794 // refill group_buffers[0] 00795 // (as above we might want to take the opportunity 00796 // to make group_buffers[0] fuller) 00797 priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp); 00798 00799 // merge the rest to the new segment 00800 // note that merge exactly trips into the footsteps 00801 // of itself 00802 priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp); 00803 00804 // and insert it 00805 unsigned_type freeLevel = make_space_available(0); 00806 assert(freeLevel == 0 || int_mergers[0].size() == 0); 00807 int_mergers[0].insert_segment(newSegment, N); 00808 00809 // get rid of invalid level 2 buffers 00810 // by inserting them into tree 0 (which is almost empty in this case) 00811 if (freeLevel > 0) 00812 { 00813 for (int_type i = freeLevel; i >= 0; i--) 00814 { 00815 // reverse order not needed 00816 // but would allow immediate refill 00817 00818 newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel 00819 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment); 00820 int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i)); 00821 group_buffer_current_mins[i] = group_buffers[i] + N; // empty 00822 } 00823 } 00824 00825 // update size 00826 size_ += size_type(N); 00827 00828 // special case if the tree was empty before 00829 if (delete_buffer_current_min == delete_buffer_end) 00830 refill_delete_buffer(); 00831 } 00832 00833 template <class Config_> 00834 void priority_queue<Config_>::dump_sizes() const 00835 { 00836 unsigned_type capacity = N; 00837 STXXL_MSG("pq::size()\t= " << size()); 00838 STXXL_MSG(" insert_heap\t= " << insert_heap.size() - 1 << "/" << capacity); 00839 STXXL_MSG(" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) << "/" << delete_buffer_size); 00840 for (int i = 0; i < num_int_groups; ++i) { 00841 capacity *= IntKMAX; 00842 STXXL_MSG(" grp " << i << " int" << 00843 " grpbuf=" << current_group_buffer_size(i) << 00844 " size=" << int_mergers[i].size() << "/" << capacity << 00845 " space=" << int_mergers[i].is_space_available()); 00846 } 00847 for (int i = 0; i < num_ext_groups; ++i) { 00848 capacity *= ExtKMAX; 00849 STXXL_MSG(" grp " << i + num_int_groups << " ext" << 00850 " grpbuf=" << current_group_buffer_size(i + num_int_groups) << 00851 " size=" << ext_mergers[i]->size() << "/" << capacity << 00852 " space=" << ext_mergers[i]->is_space_available()); 00853 } 00854 dump_params(); 00855 } 00856 00857 template <class Config_> 00858 void priority_queue<Config_>::dump_params() const 00859 { 00860 STXXL_MSG("params: delete_buffer_size=" << delete_buffer_size << " N=" << N << " IntKMAX=" << IntKMAX << " num_int_groups=" << num_int_groups << " ExtKMAX=" << ExtKMAX << " num_ext_groups=" << num_ext_groups << " BlockSize=" << BlockSize); 00861 } 00862 00863 namespace priority_queue_local 00864 { 00865 struct Parameters_for_priority_queue_not_found_Increase_IntM 00866 { 00867 enum { fits = false }; 00868 typedef Parameters_for_priority_queue_not_found_Increase_IntM result; 00869 }; 00870 00871 struct dummy 00872 { 00873 enum { fits = false }; 00874 typedef dummy result; 00875 }; 00876 00877 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false> 00878 struct find_B_m 00879 { 00880 typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self; 00881 enum { 00882 k = IntM_ / B_, // number of blocks that fit into M 00883 element_size = E_, // element size 00884 IntM = IntM_, 00885 B = B_, // block size 00886 m = m_, // number of blocks fitting into buffers 00887 c = k - m_, 00888 // memory occ. by block must be at least 10 times larger than size of ext sequence 00889 // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2 00890 fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_ 00891 && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128), 00892 step = 1 00893 }; 00894 00895 typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1; 00896 typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2; 00897 typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result; 00898 }; 00899 00900 // specialization for the case when no valid parameters are found 00901 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop> 00902 struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop> 00903 { 00904 enum { fits = false }; 00905 typedef Parameters_for_priority_queue_not_found_Increase_IntM result; 00906 }; 00907 00908 // to speedup search 00909 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_> 00910 struct find_B_m<E_, IntM_, MaxS_, B_, m_, true> 00911 { 00912 enum { fits = false }; 00913 typedef dummy result; 00914 }; 00915 00916 // E_ size of element in bytes 00917 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_> 00918 struct find_settings 00919 { 00920 // start from block size (8*1024*1024) bytes 00921 typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result; 00922 }; 00923 00924 struct Parameters_not_found_Try_to_change_the_Tune_parameter 00925 { 00926 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result; 00927 }; 00928 00929 00930 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_> 00931 struct compute_N 00932 { 00933 typedef compute_N<AI_, X_, CriticalSize_> Self; 00934 enum 00935 { 00936 X = X_, 00937 AI = AI_, 00938 N = X / (AI * AI) // two stage internal 00939 }; 00940 typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result; 00941 }; 00942 00943 template <unsigned_type X_, unsigned_type CriticalSize_> 00944 struct compute_N<1, X_, CriticalSize_> 00945 { 00946 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result; 00947 }; 00948 } 00949 00950 //! \} 00951 00952 //! \addtogroup stlcont 00953 //! \{ 00954 00955 //! \brief Priority queue type generator 00956 00957 //! Implements a data structure from "Peter Sanders. Fast Priority Queues 00958 //! for Cached Memory. ALENEX'99" for external memory. 00959 //! <BR> 00960 //! \tparam type of the contained objects (POD with no references to internal memory) 00961 //! \tparam the comparison type used to determine 00962 //! whether one element is smaller than another element. 00963 //! If Cmp_(x,y) is true, then x is smaller than y. The element 00964 //! returned by Q.top() is the largest element in the priority 00965 //! queue. That is, it has the property that, for every other 00966 //! element \b x in the priority queue, Cmp_(Q.top(), x) is false. 00967 //! Cmp_ must also provide min_value method, that returns value of type Tp_ that is 00968 //! smaller than any element of the queue \b x , i.e. Cmp_(Cmp_.min_value(),x) is 00969 //! always \b true . <BR> 00970 //! <BR> 00971 //! Example: comparison object for priority queue 00972 //! where \b top() returns the \b smallest contained integer: 00973 //! \verbatim 00974 //! struct CmpIntGreater 00975 //! { 00976 //! bool operator () (const int & a, const int & b) const { return a>b; } 00977 //! int min_value() const { return std::numeric_limits<int>::max(); } 00978 //! }; 00979 //! \endverbatim 00980 //! Example: comparison object for priority queue 00981 //! where \b top() returns the \b largest contained integer: 00982 //! \verbatim 00983 //! struct CmpIntLess 00984 //! { 00985 //! bool operator () (const int & a, const int & b) const { return a<b; } 00986 //! int min_value() const { return std::numeric_limits<int>::min(); } 00987 //! }; 00988 //! \endverbatim 00989 //! Note that Cmp_ must define strict weak ordering. 00990 //! (<A HREF="http://www.sgi.com/tech/stl/StrictWeakOrdering.html">see what it is</A>) 00991 //! - \c IntM_ upper limit for internal memory consumption in bytes. 00992 //! - \c MaxS_ upper limit for number of elements contained in the priority queue (in 1024 units). 00993 //! Example: if you are sure that priority queue contains no more than 00994 //! one million elements in a time, then the right parameter is (1000000/1024)= 976 . 00995 //! - \c Tune_ tuning parameter. Try to play with it if the code does not compile 00996 //! (larger than default values might help). Code does not compile 00997 //! if no suitable internal parameters were found for given IntM_ and MaxS_. 00998 //! It might also happen that given IntM_ is too small for given MaxS_, try larger values. 00999 //! <BR> 01000 //! \c PRIORITY_QUEUE_GENERATOR is template meta program that searches 01001 //! for \b 7 configuration parameters of \b stxxl::priority_queue that both 01002 //! minimize internal memory consumption of the priority queue to 01003 //! match IntM_ and maximize performance of priority queue operations. 01004 //! Actual memory consumption might be larger (use 01005 //! \c stxxl::priority_queue::mem_cons() method to track it), since the search 01006 //! assumes rather optimistic schedule of push'es and pop'es for the 01007 //! estimation of the maximum memory consumption. To keep actual memory 01008 //! requirements low increase the value of MaxS_ parameter. 01009 //! <BR> 01010 //! For functioning a priority queue object requires two pools of blocks 01011 //! (See constructor of \c priority_queue ). To construct \c <stxxl> block 01012 //! pools you might need \b block \b type that will be used by priority queue. 01013 //! Note that block's size and hence it's type is generated by 01014 //! the \c PRIORITY_QUEUE_GENERATOR in compile type from IntM_, MaxS_ and sizeof(Tp_) and 01015 //! not given directly by user as a template parameter. Block type can be extracted as 01016 //! \c PRIORITY_QUEUE_GENERATOR<some_parameters>::result::block_type . 01017 //! For an example see p_queue.cpp . 01018 //! Configured priority queue type is available as \c PRIORITY_QUEUE_GENERATOR<>::result. <BR> <BR> 01019 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6> 01020 class PRIORITY_QUEUE_GENERATOR 01021 { 01022 public: 01023 // actual calculation of B, m, k and element_size 01024 typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings; 01025 enum { 01026 B = settings::B, 01027 m = settings::m, 01028 X = B * (settings::k - m) / settings::element_size, // interpretation of result 01029 Buffer1Size = 32 // fixed 01030 }; 01031 // derivation of N, AI, AE 01032 typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN; 01033 enum 01034 { 01035 N = ComputeN::N, 01036 AI = ComputeN::AI, 01037 AE = (m / 2 < 2) ? 2 : (m / 2) // at least 2 01038 }; 01039 enum { 01040 // Estimation of maximum internal memory consumption (in bytes) 01041 EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024 01042 }; 01043 /* 01044 unsigned BufferSize1_ = 32, // equalize procedure call overheads etc. 01045 unsigned N_ = 512, // bandwidth 01046 unsigned IntKMAX_ = 64, // maximal arity for internal mergers 01047 unsigned IntLevels_ = 4, 01048 unsigned BlockSize_ = (2*1024*1024), 01049 unsigned ExtKMAX_ = 64, // maximal arity for external mergers 01050 unsigned ExtLevels_ = 2, 01051 */ 01052 typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result; 01053 }; 01054 01055 //! \} 01056 01057 __STXXL_END_NAMESPACE 01058 01059 01060 namespace std 01061 { 01062 template <class Config_> 01063 void swap(stxxl::priority_queue<Config_> & a, 01064 stxxl::priority_queue<Config_> & b) 01065 { 01066 a.swap(b); 01067 } 01068 } 01069 01070 #endif // !STXXL_PRIORITY_QUEUE_HEADER 01071 // vim: et:ts=4:sw=4