Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/stream/sort_stream.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2006-2008 Johannes Singler <singler@ira.uka.de> 00008 * Copyright (C) 2008-2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #ifndef STXXL_SORT_STREAM_HEADER 00016 #define STXXL_SORT_STREAM_HEADER 00017 00018 #ifdef STXXL_BOOST_CONFIG 00019 #include <boost/config.hpp> 00020 #endif 00021 00022 #include <stxxl/bits/stream/stream.h> 00023 #include <stxxl/bits/mng/mng.h> 00024 #include <stxxl/bits/algo/sort_base.h> 00025 #include <stxxl/bits/algo/sort_helper.h> 00026 #include <stxxl/bits/algo/adaptor.h> 00027 #include <stxxl/bits/algo/run_cursor.h> 00028 #include <stxxl/bits/algo/losertree.h> 00029 #include <stxxl/bits/stream/sorted_runs.h> 00030 #include <stxxl/bits/counting_ptr.h> 00031 00032 __STXXL_BEGIN_NAMESPACE 00033 00034 namespace stream 00035 { 00036 //! \addtogroup streampack Stream package 00037 //! \{ 00038 00039 //////////////////////////////////////////////////////////////////////// 00040 // CREATE RUNS // 00041 //////////////////////////////////////////////////////////////////////// 00042 00043 //! \brief Forms sorted runs of data from a stream 00044 //! 00045 //! \tparam Input_ type of the input stream 00046 //! \tparam CompareType_ type of comparison object used for sorting the runs 00047 //! \tparam BlockSize_ size of blocks used to store the runs (in bytes) 00048 //! \tparam AllocStr_ functor that defines allocation strategy for the runs 00049 template < 00050 class Input_, 00051 class CompareType_, 00052 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 00053 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00054 class basic_runs_creator : private noncopyable 00055 { 00056 public: 00057 typedef Input_ input_type; 00058 typedef CompareType_ cmp_type; 00059 static const unsigned block_size = BlockSize_; 00060 typedef AllocStr_ allocation_strategy_type; 00061 00062 public: 00063 typedef typename Input_::value_type value_type; 00064 typedef typed_block<BlockSize_, value_type> block_type; 00065 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00066 typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type; 00067 typedef typename sorted_runs_data_type::run_type run_type; 00068 typedef counting_ptr<sorted_runs_data_type> sorted_runs_type; 00069 00070 protected: 00071 Input_ & m_input; /// reference to the input stream 00072 CompareType_ m_cmp; /// comparator used to sort block groups 00073 00074 private: 00075 00076 sorted_runs_type m_result; /// stores the result (sorted runs) as smart pointer 00077 unsigned_type m_memsize; /// memory for internal use in blocks 00078 bool m_result_computed; /// true iff result is already computed (used in 'result()' method) 00079 00080 /// Fetch data from input into blocks[first_idx,last_idx). 00081 unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx) 00082 { 00083 typename element_iterator_traits<block_type>::element_iterator output = 00084 make_element_iterator(blocks, first_idx); 00085 unsigned_type curr_idx = first_idx; 00086 while (!m_input.empty() && curr_idx != last_idx) { 00087 *output = *m_input; 00088 ++m_input; 00089 ++output; 00090 ++curr_idx; 00091 } 00092 return curr_idx; 00093 } 00094 00095 /// fill the rest of the block with max values 00096 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx) 00097 { 00098 unsigned_type last_idx = num_blocks * block_type::size; 00099 if (first_idx < last_idx) { 00100 typename element_iterator_traits<block_type>::element_iterator curr = 00101 make_element_iterator(blocks, first_idx); 00102 while (first_idx != last_idx) { 00103 *curr = m_cmp.max_value(); 00104 ++curr; 00105 ++first_idx; 00106 } 00107 } 00108 } 00109 00110 /// Sort a specific run, contained in a sequences of blocks. 00111 void sort_run(block_type * run, unsigned_type elements) 00112 { 00113 check_sort_settings(); 00114 potentially_parallel::sort(make_element_iterator(run, 0), 00115 make_element_iterator(run, elements), 00116 m_cmp); 00117 } 00118 00119 void compute_result(); 00120 00121 public: 00122 //! \brief Create the object 00123 //! \param i input stream 00124 //! \param c comparator object 00125 //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes 00126 basic_runs_creator(Input_ & input, CompareType_ cmp, unsigned_type memory_to_use) 00127 : m_input(input), 00128 m_cmp(cmp), 00129 m_result(new sorted_runs_data_type), 00130 m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()), 00131 m_result_computed(false) 00132 { 00133 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00134 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) { 00135 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 00136 } 00137 assert(m_memsize > 0); 00138 } 00139 00140 //! \brief Returns the sorted runs object 00141 //! \return Sorted runs object. The result is computed lazily, i.e. on the first call 00142 //! \remark Returned object is intended to be used by \c runs_merger object as input 00143 sorted_runs_type & result() 00144 { 00145 if (!m_result_computed) 00146 { 00147 compute_result(); 00148 m_result_computed = true; 00149 #ifdef STXXL_PRINT_STAT_AFTER_RF 00150 STXXL_MSG(*stats::get_instance()); 00151 #endif //STXXL_PRINT_STAT_AFTER_RF 00152 } 00153 return m_result; 00154 } 00155 }; 00156 00157 //! \brief Finish the results, i. e. create all runs. 00158 //! 00159 //! This is the main routine of this class. 00160 template <class Input_, class CompareType_, unsigned BlockSize_, class AllocStr_> 00161 void basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_>::compute_result() 00162 { 00163 unsigned_type i = 0; 00164 unsigned_type m2 = m_memsize / 2; 00165 const unsigned_type el_in_run = m2 * block_type::size; // # el in a run 00166 STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2); 00167 unsigned_type blocks1_length = 0, blocks2_length = 0; 00168 block_type * Blocks1 = NULL; 00169 00170 #ifndef STXXL_SMALL_INPUT_PSORT_OPT 00171 Blocks1 = new block_type[m2 * 2]; 00172 #else 00173 // push input element into small_run vector in result until it is full 00174 while (!input.empty() && blocks1_length != block_type::size) 00175 { 00176 m_result->small_run.push_back(*input); 00177 ++input; 00178 ++blocks1_length; 00179 } 00180 00181 if (blocks1_length == block_type::size && !input.empty()) 00182 { 00183 Blocks1 = new block_type[m2 * 2]; 00184 std::copy(m_result->small_run.begin(), m_result->small_run.end(), Blocks1[0].begin()); 00185 m_result->small_run.clear(); 00186 } 00187 else 00188 { 00189 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length); 00190 m_result->elements = blocks1_length; 00191 check_sort_settings(); 00192 potentially_parallel::sort(m_result->small_run.begin(), m_result->small_run.end(), cmp); 00193 return; 00194 } 00195 #endif //STXXL_SMALL_INPUT_PSORT_OPT 00196 00197 // the first block may be there already, now fetch until memsize is filled. 00198 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run); 00199 00200 // sort first run 00201 sort_run(Blocks1, blocks1_length); 00202 00203 if (blocks1_length <= block_type::size && m_input.empty()) 00204 { 00205 // small input, do not flush it on the disk(s) 00206 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length); 00207 assert(m_result->small_run.empty()); 00208 m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length); 00209 m_result->elements = blocks1_length; 00210 delete[] Blocks1; 00211 return; 00212 } 00213 00214 block_type * Blocks2 = Blocks1 + m2; 00215 block_manager * bm = block_manager::get_instance(); 00216 request_ptr * write_reqs = new request_ptr[m2]; 00217 run_type run; 00218 00219 unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks 00220 run.resize(cur_run_size); 00221 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00222 00223 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00224 00225 // fill the rest of the last block with max values 00226 fill_with_max_value(Blocks1, cur_run_size, blocks1_length); 00227 00228 for (i = 0; i < cur_run_size; ++i) 00229 { 00230 run[i].value = Blocks1[i][0]; 00231 write_reqs[i] = Blocks1[i].write(run[i].bid); 00232 } 00233 m_result->runs.push_back(run); 00234 m_result->runs_sizes.push_back(blocks1_length); 00235 m_result->elements += blocks1_length; 00236 00237 if (m_input.empty()) 00238 { 00239 // return 00240 wait_all(write_reqs, write_reqs + cur_run_size); 00241 delete[] write_reqs; 00242 delete[] Blocks1; 00243 return; 00244 } 00245 00246 STXXL_VERBOSE1("Filling the second part of the allocated blocks"); 00247 blocks2_length = fetch(Blocks2, 0, el_in_run); 00248 00249 if (m_input.empty()) 00250 { 00251 // optimization if the whole set fits into both halves 00252 // (re)sort internally and return 00253 blocks2_length += el_in_run; 00254 sort_run(Blocks1, blocks2_length); // sort first an second run together 00255 wait_all(write_reqs, write_reqs + cur_run_size); 00256 bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00257 00258 cur_run_size = div_ceil(blocks2_length, block_type::size); 00259 run.resize(cur_run_size); 00260 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00261 00262 // fill the rest of the last block with max values 00263 fill_with_max_value(Blocks1, cur_run_size, blocks2_length); 00264 00265 assert(cur_run_size > m2); 00266 00267 for (i = 0; i < m2; ++i) 00268 { 00269 run[i].value = Blocks1[i][0]; 00270 write_reqs[i]->wait(); 00271 write_reqs[i] = Blocks1[i].write(run[i].bid); 00272 } 00273 00274 request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2]; 00275 00276 for ( ; i < cur_run_size; ++i) 00277 { 00278 run[i].value = Blocks1[i][0]; 00279 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid); 00280 } 00281 00282 m_result->runs[0] = run; 00283 m_result->runs_sizes[0] = blocks2_length; 00284 m_result->elements = blocks2_length; 00285 00286 wait_all(write_reqs, write_reqs + m2); 00287 delete[] write_reqs; 00288 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2); 00289 delete[] write_reqs1; 00290 00291 delete[] Blocks1; 00292 00293 return; 00294 } 00295 00296 // more than 2 runs can be filled, i. e. the general case 00297 00298 sort_run(Blocks2, blocks2_length); 00299 00300 cur_run_size = div_ceil(blocks2_length, block_type::size); // in blocks 00301 run.resize(cur_run_size); 00302 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00303 00304 for (i = 0; i < cur_run_size; ++i) 00305 { 00306 run[i].value = Blocks2[i][0]; 00307 write_reqs[i]->wait(); 00308 write_reqs[i] = Blocks2[i].write(run[i].bid); 00309 } 00310 assert((blocks2_length % el_in_run) == 0); 00311 00312 m_result->add_run(run, blocks2_length); 00313 00314 while (!m_input.empty()) 00315 { 00316 blocks1_length = fetch(Blocks1, 0, el_in_run); 00317 sort_run(Blocks1, blocks1_length); 00318 cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks 00319 run.resize(cur_run_size); 00320 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00321 00322 // fill the rest of the last block with max values (occurs only on the last run) 00323 fill_with_max_value(Blocks1, cur_run_size, blocks1_length); 00324 00325 for (i = 0; i < cur_run_size; ++i) 00326 { 00327 run[i].value = Blocks1[i][0]; 00328 write_reqs[i]->wait(); 00329 write_reqs[i] = Blocks1[i].write(run[i].bid); 00330 } 00331 m_result->add_run(run, blocks1_length); 00332 00333 std::swap(Blocks1, Blocks2); 00334 std::swap(blocks1_length, blocks2_length); 00335 } 00336 00337 wait_all(write_reqs, write_reqs + m2); 00338 delete[] write_reqs; 00339 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2); 00340 } 00341 00342 //! \brief Forms sorted runs of data from a stream 00343 //! 00344 //! \tparam Input_ type of the input stream 00345 //! \tparam CompareType_ type of omparison object used for sorting the runs 00346 //! \tparam BlockSize_ size of blocks used to store the runs 00347 //! \tparam AllocStr_ functor that defines allocation strategy for the runs 00348 template < 00349 class Input_, 00350 class CompareType_, 00351 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 00352 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00353 class runs_creator : public basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> 00354 { 00355 private: 00356 typedef basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> base; 00357 00358 public: 00359 typedef typename base::cmp_type cmp_type; 00360 typedef typename base::value_type value_type; 00361 typedef typename base::block_type block_type; 00362 typedef typename base::sorted_runs_data_type sorted_runs_data_type; 00363 typedef typename base::sorted_runs_type sorted_runs_type; 00364 00365 public: 00366 //! \brief Creates the object 00367 //! \param i input stream 00368 //! \param c comparator object 00369 //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes 00370 runs_creator(Input_ & input, CompareType_ cmp, unsigned_type memory_to_use) 00371 : base(input, cmp, memory_to_use) 00372 { } 00373 }; 00374 00375 00376 //! \brief Input strategy for \c runs_creator class 00377 //! 00378 //! This strategy together with \c runs_creator class 00379 //! allows to create sorted runs 00380 //! data structure usable for \c runs_merger 00381 //! pushing elements into the sorter 00382 //! (using runs_creator::push()) 00383 template <class ValueType_> 00384 struct use_push 00385 { 00386 typedef ValueType_ value_type; 00387 }; 00388 00389 //! \brief Forms sorted runs of elements passed in push() method 00390 //! 00391 //! A specialization of \c runs_creator that 00392 //! allows to create sorted runs 00393 //! data structure usable for \c runs_merger from 00394 //! elements passed in sorted push() method. <BR> 00395 //! \tparam ValueType_ type of values (parameter for \c use_push strategy) 00396 //! \tparam CompareType_ type of comparison object used for sorting the runs 00397 //! \tparam BlockSize_ size of blocks used to store the runs 00398 //! \tparam AllocStr_ functor that defines allocation strategy for the runs 00399 template < 00400 class ValueType_, 00401 class CompareType_, 00402 unsigned BlockSize_, 00403 class AllocStr_> 00404 class runs_creator< 00405 use_push<ValueType_>, 00406 CompareType_, 00407 BlockSize_, 00408 AllocStr_> 00409 : private noncopyable 00410 { 00411 public: 00412 typedef CompareType_ cmp_type; 00413 typedef ValueType_ value_type; 00414 typedef typed_block<BlockSize_, value_type> block_type; 00415 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00416 typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type; 00417 typedef counting_ptr<sorted_runs_data_type> sorted_runs_type; 00418 typedef sorted_runs_type result_type; 00419 00420 private: 00421 /// comparator object to sort runs 00422 CompareType_ m_cmp; 00423 00424 typedef typename sorted_runs_data_type::run_type run_type; 00425 00426 /// stores the result (sorted runs) in a reference counted object 00427 sorted_runs_type m_result; 00428 00429 /// memory size in bytes to use 00430 const unsigned_type m_memory_to_use; 00431 00432 /// memory size in numberr of blocks for internal use 00433 const unsigned_type m_memsize; 00434 00435 /// m_memsize / 2 00436 const unsigned_type m_m2; 00437 00438 /// true after the result() method was called for the first time 00439 bool m_result_computed; 00440 00441 /// total number of elements in a run 00442 const unsigned_type m_el_in_run; 00443 00444 /// current number of elements in the run m_blocks1 00445 unsigned_type m_cur_el; 00446 00447 /// accumulation buffer of size m_m2 blocks, half the available memory size 00448 block_type * m_blocks1; 00449 00450 /// accumulation buffer that is currently being written to disk 00451 block_type * m_blocks2; 00452 00453 /// reference to write requests transporting the last accumulation buffer to disk 00454 request_ptr * m_write_reqs; 00455 00456 /// run object containing block ids of the run being written to disk 00457 run_type run; 00458 00459 protected: 00460 /// fill the rest of the block with max values 00461 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx) 00462 { 00463 unsigned_type last_idx = num_blocks * block_type::size; 00464 if (first_idx < last_idx) { 00465 typename element_iterator_traits<block_type>::element_iterator curr = 00466 make_element_iterator(blocks, first_idx); 00467 while (first_idx != last_idx) { 00468 *curr = m_cmp.max_value(); 00469 ++curr; 00470 ++first_idx; 00471 } 00472 } 00473 } 00474 00475 /// Sort a specific run, contained in a sequences of blocks. 00476 void sort_run(block_type * run, unsigned_type elements) 00477 { 00478 check_sort_settings(); 00479 potentially_parallel::sort(make_element_iterator(run, 0), 00480 make_element_iterator(run, elements), 00481 m_cmp); 00482 } 00483 00484 void compute_result() 00485 { 00486 if (m_cur_el == 0) 00487 return; 00488 00489 sort_run(m_blocks1, m_cur_el); 00490 00491 if (m_cur_el <= block_type::size && m_result->elements == 0) 00492 { 00493 // small input, do not flush it on the disk(s) 00494 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << m_cur_el); 00495 m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el); 00496 m_result->elements = m_cur_el; 00497 return; 00498 } 00499 00500 const unsigned_type cur_run_size = div_ceil(m_cur_el, block_type::size); // in blocks 00501 run.resize(cur_run_size); 00502 block_manager * bm = block_manager::get_instance(); 00503 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00504 00505 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00506 00507 // fill the rest of the last block with max values 00508 fill_with_max_value(m_blocks1, cur_run_size, m_cur_el); 00509 00510 unsigned_type i = 0; 00511 for ( ; i < cur_run_size; ++i) 00512 { 00513 run[i].value = m_blocks1[i][0]; 00514 if (m_write_reqs[i].get()) 00515 m_write_reqs[i]->wait(); 00516 00517 m_write_reqs[i] = m_blocks1[i].write(run[i].bid); 00518 } 00519 m_result->add_run(run, m_cur_el); 00520 00521 for (i = 0; i < m_m2; ++i) 00522 { 00523 if (m_write_reqs[i].get()) 00524 m_write_reqs[i]->wait(); 00525 } 00526 } 00527 00528 public: 00529 //! \brief Creates the object 00530 //! \param c comparator object 00531 //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes 00532 runs_creator(CompareType_ cmp, unsigned_type memory_to_use) : 00533 m_cmp(cmp), 00534 m_memory_to_use(memory_to_use), 00535 m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()), 00536 m_m2(m_memsize / 2), 00537 m_el_in_run(m_m2 * block_type::size), 00538 m_blocks1(NULL), m_blocks2(NULL), 00539 m_write_reqs(NULL) 00540 { 00541 sort_helper::verify_sentinel_strict_weak_ordering(m_cmp); 00542 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= m_memory_to_use)) { 00543 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 00544 } 00545 assert(m_m2 > 0); 00546 00547 allocate(); 00548 } 00549 00550 ~runs_creator() 00551 { 00552 m_result_computed = 1; 00553 deallocate(); 00554 } 00555 00556 //! \brief Clear current state and remove all items 00557 void clear() 00558 { 00559 if (!m_result) 00560 m_result = new sorted_runs_data_type; 00561 else 00562 m_result->clear(); 00563 00564 m_result_computed = false; 00565 m_cur_el = 0; 00566 00567 for (unsigned_type i = 0; i < m_m2; ++i) 00568 { 00569 if (m_write_reqs[i].get()) 00570 m_write_reqs[i]->cancel(); 00571 } 00572 } 00573 00574 //! \brief Allocates input buffers and clears result. 00575 void allocate() 00576 { 00577 if (!m_blocks1) 00578 { 00579 m_blocks1 = new block_type[m_m2 * 2]; 00580 m_blocks2 = m_blocks1 + m_m2; 00581 00582 m_write_reqs = new request_ptr[m_m2]; 00583 } 00584 00585 clear(); 00586 } 00587 00588 //! \brief Deallocates input buffers but not the current result. 00589 void deallocate() 00590 { 00591 result(); // finishes result 00592 00593 if (m_blocks1) 00594 { 00595 delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2); 00596 m_blocks1 = m_blocks2 = NULL; 00597 00598 delete[] m_write_reqs; 00599 m_write_reqs = NULL; 00600 } 00601 } 00602 00603 //! \brief Adds new element to the sorter 00604 //! \param val value to be added 00605 void push(const value_type & val) 00606 { 00607 assert(m_result_computed == false); 00608 if (LIKELY(m_cur_el < m_el_in_run)) 00609 { 00610 m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val; 00611 ++m_cur_el; 00612 return; 00613 } 00614 00615 assert(m_el_in_run == m_cur_el); 00616 m_cur_el = 0; 00617 00618 // sort and store m_blocks1 00619 sort_run(m_blocks1, m_el_in_run); 00620 00621 const unsigned_type cur_run_blocks = div_ceil(m_el_in_run, block_type::size); // in blocks 00622 run.resize(cur_run_blocks); 00623 block_manager * bm = block_manager::get_instance(); 00624 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end())); 00625 00626 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00627 00628 for (unsigned_type i = 0; i < cur_run_blocks; ++i) 00629 { 00630 run[i].value = m_blocks1[i][0]; 00631 if (m_write_reqs[i].get()) 00632 m_write_reqs[i]->wait(); 00633 00634 m_write_reqs[i] = m_blocks1[i].write(run[i].bid); 00635 } 00636 00637 m_result->add_run(run, m_el_in_run); 00638 00639 std::swap(m_blocks1, m_blocks2); 00640 00641 push(val); 00642 } 00643 00644 //! \brief Returns the sorted runs object 00645 //! \return Sorted runs object. 00646 //! \remark Returned object is intended to be used by \c runs_merger object as input 00647 sorted_runs_type & result() 00648 { 00649 if (!m_result_computed) 00650 { 00651 compute_result(); 00652 m_result_computed = true; 00653 #ifdef STXXL_PRINT_STAT_AFTER_RF 00654 STXXL_MSG(*stats::get_instance()); 00655 #endif //STXXL_PRINT_STAT_AFTER_RF 00656 } 00657 return m_result; 00658 } 00659 00660 //! \brief number of items currently inserted. 00661 unsigned_type size() const 00662 { 00663 return m_result->elements + m_cur_el; 00664 } 00665 00666 //! \brief return comparator object 00667 const cmp_type& cmp() const 00668 { 00669 return m_cmp; 00670 } 00671 00672 //! \brief return memory size used (in bytes) 00673 unsigned_type memory_used() const 00674 { 00675 return m_memory_to_use; 00676 } 00677 }; 00678 00679 00680 //! \brief Input strategy for \c runs_creator class 00681 //! 00682 //! This strategy together with \c runs_creator class 00683 //! allows to create sorted runs 00684 //! data structure usable for \c runs_merger from 00685 //! sequences of elements in sorted order 00686 template <class ValueType_> 00687 struct from_sorted_sequences 00688 { 00689 typedef ValueType_ value_type; 00690 }; 00691 00692 //! \brief Forms sorted runs of data taking elements in sorted order (element by element) 00693 //! 00694 //! A specialization of \c runs_creator that 00695 //! allows to create sorted runs 00696 //! data structure usable for \c runs_merger from 00697 //! sequences of elements in sorted order. <BR> 00698 //! \tparam ValueType_ type of values (parameter for \c from_sorted_sequences strategy) 00699 //! \tparam CompareType_ type of comparison object used for sorting the runs 00700 //! \tparam BlockSize_ size of blocks used to store the runs 00701 //! \tparam AllocStr_ functor that defines allocation strategy for the runs 00702 template < 00703 class ValueType_, 00704 class CompareType_, 00705 unsigned BlockSize_, 00706 class AllocStr_> 00707 class runs_creator< 00708 from_sorted_sequences<ValueType_>, 00709 CompareType_, 00710 BlockSize_, 00711 AllocStr_> 00712 : private noncopyable 00713 { 00714 public: 00715 typedef ValueType_ value_type; 00716 typedef typed_block<BlockSize_, value_type> block_type; 00717 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00718 typedef AllocStr_ alloc_strategy_type; 00719 00720 public: 00721 typedef CompareType_ cmp_type; 00722 typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type; 00723 typedef counting_ptr<sorted_runs_data_type> sorted_runs_type; 00724 typedef sorted_runs_type result_type; 00725 00726 private: 00727 typedef typename sorted_runs_data_type::run_type run_type; 00728 00729 CompareType_ cmp; 00730 00731 sorted_runs_type result_; // stores the result (sorted runs) 00732 unsigned_type m_; // memory for internal use in blocks 00733 buffered_writer<block_type> writer; 00734 block_type * cur_block; 00735 unsigned_type offset; 00736 unsigned_type iblock; 00737 unsigned_type irun; 00738 alloc_strategy_type alloc_strategy; // needs to be reset after each run 00739 00740 public: 00741 //! \brief Creates the object 00742 //! \param c comparator object 00743 //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes. 00744 //! Recommended value: 2 * block_size * D 00745 runs_creator(CompareType_ c, unsigned_type memory_to_use) : 00746 cmp(c), 00747 result_(new sorted_runs_data_type), 00748 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), 00749 writer(m_, m_ / 2), 00750 cur_block(writer.get_free_block()), 00751 offset(0), 00752 iblock(0), 00753 irun(0) 00754 { 00755 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00756 assert(m_ > 0); 00757 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) { 00758 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 00759 } 00760 } 00761 00762 //! \brief Adds new element to the current run 00763 //! \param val value to be added to the current run 00764 void push(const value_type & val) 00765 { 00766 assert(offset < block_type::size); 00767 00768 (*cur_block)[offset] = val; 00769 ++offset; 00770 00771 if (offset == block_type::size) 00772 { 00773 // write current block 00774 00775 block_manager * bm = block_manager::get_instance(); 00776 // allocate space for the block 00777 result_->runs.resize(irun + 1); 00778 result_->runs[irun].resize(iblock + 1); 00779 bm->new_blocks( 00780 alloc_strategy, 00781 make_bid_iterator(result_->runs[irun].begin() + iblock), 00782 make_bid_iterator(result_->runs[irun].end()), 00783 iblock 00784 ); 00785 00786 result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger 00787 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid); 00788 ++iblock; 00789 00790 offset = 0; 00791 } 00792 00793 ++result_->elements; 00794 } 00795 00796 //! \brief Finishes current run and begins new one 00797 void finish() 00798 { 00799 if (offset == 0 && iblock == 0) // current run is empty 00800 return; 00801 00802 00803 result_->runs_sizes.resize(irun + 1); 00804 result_->runs_sizes.back() = iblock * block_type::size + offset; 00805 00806 if (offset) // if current block is partially filled 00807 { 00808 while (offset != block_type::size) 00809 { 00810 (*cur_block)[offset] = cmp.max_value(); 00811 ++offset; 00812 } 00813 offset = 0; 00814 00815 block_manager * bm = block_manager::get_instance(); 00816 // allocate space for the block 00817 result_->runs.resize(irun + 1); 00818 result_->runs[irun].resize(iblock + 1); 00819 bm->new_blocks( 00820 alloc_strategy, 00821 make_bid_iterator(result_->runs[irun].begin() + iblock), 00822 make_bid_iterator(result_->runs[irun].end()), 00823 iblock 00824 ); 00825 00826 result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger 00827 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid); 00828 } 00829 else 00830 { } 00831 00832 alloc_strategy = alloc_strategy_type(); // reinitialize block allocator for the next run 00833 iblock = 0; 00834 ++irun; 00835 } 00836 00837 //! \brief Returns the sorted runs object 00838 //! \return Sorted runs object 00839 //! \remark Returned object is intended to be used by \c runs_merger object as input 00840 sorted_runs_type & result() 00841 { 00842 finish(); 00843 writer.flush(); 00844 00845 return result_; 00846 } 00847 }; 00848 00849 00850 //! \brief Checker for the sorted runs object created by the \c runs_creator . 00851 //! \param sruns sorted runs object 00852 //! \param cmp comparison object used for checking the order of elements in runs 00853 //! \return \c true if runs are sorted, \c false otherwise 00854 template <class RunsType_, class CompareType_> 00855 bool check_sorted_runs(const RunsType_ & sruns, CompareType_ cmp) 00856 { 00857 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00858 typedef typename RunsType_::element_type::block_type block_type; 00859 typedef typename block_type::value_type value_type; 00860 STXXL_VERBOSE2("Elements: " << sruns->elements); 00861 unsigned_type nruns = sruns->runs.size(); 00862 STXXL_VERBOSE2("Runs: " << nruns); 00863 unsigned_type irun = 0; 00864 for (irun = 0; irun < nruns; ++irun) 00865 { 00866 const unsigned_type nblocks = sruns->runs[irun].size(); 00867 block_type * blocks = new block_type[nblocks]; 00868 request_ptr * reqs = new request_ptr[nblocks]; 00869 for (unsigned_type j = 0; j < nblocks; ++j) 00870 { 00871 reqs[j] = blocks[j].read(sruns->runs[irun][j].bid); 00872 } 00873 wait_all(reqs, reqs + nblocks); 00874 for (unsigned_type j = 0; j < nblocks; ++j) 00875 { 00876 if (cmp(blocks[j][0], sruns->runs[irun][j].value) || 00877 cmp(sruns->runs[irun][j].value, blocks[j][0])) //!= 00878 { 00879 STXXL_ERRMSG("check_sorted_runs wrong trigger in the run"); 00880 return false; 00881 } 00882 } 00883 if (!stxxl::is_sorted(make_element_iterator(blocks, 0), 00884 make_element_iterator(blocks, sruns->runs_sizes[irun]), 00885 cmp)) 00886 { 00887 STXXL_ERRMSG("check_sorted_runs wrong order in the run"); 00888 return false; 00889 } 00890 00891 delete[] reqs; 00892 delete[] blocks; 00893 } 00894 00895 STXXL_MSG("Checking runs finished successfully"); 00896 00897 return true; 00898 } 00899 00900 00901 //////////////////////////////////////////////////////////////////////// 00902 // MERGE RUNS // 00903 //////////////////////////////////////////////////////////////////////// 00904 00905 //! \brief Merges sorted runs 00906 //! 00907 //! \tparam RunsType_ type of the sorted runs, available as \c runs_creator::sorted_runs_type , 00908 //! \tparam CompareType_ type of comparison object used for merging 00909 //! \tparam AllocStr_ allocation strategy used to allocate the blocks for 00910 //! storing intermediate results if several merge passes are required 00911 template <class RunsType_, 00912 class CompareType_, 00913 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 00914 class basic_runs_merger : private noncopyable 00915 { 00916 public: 00917 typedef RunsType_ sorted_runs_type; 00918 typedef CompareType_ value_cmp; 00919 typedef AllocStr_ alloc_strategy; 00920 00921 typedef typename sorted_runs_type::element_type sorted_runs_data_type; 00922 typedef typename sorted_runs_data_type::size_type size_type; 00923 typedef typename sorted_runs_data_type::run_type run_type; 00924 typedef typename sorted_runs_data_type::block_type block_type; 00925 typedef block_type out_block_type; 00926 typedef typename run_type::value_type trigger_entry_type; 00927 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00928 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00929 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type; 00930 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type; 00931 typedef stxxl::int64 diff_type; 00932 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence; 00933 typedef typename std::vector<sequence>::size_type seqs_size_type; 00934 00935 public: 00936 //! \brief Standard stream typedef 00937 typedef typename sorted_runs_data_type::value_type value_type; 00938 00939 private: 00940 /// comparator object to sort runs 00941 value_cmp m_cmp; 00942 00943 /// memory size in bytes to use 00944 unsigned_type m_memory_to_use; 00945 00946 /// smart pointer to sorted_runs object 00947 sorted_runs_type m_sruns; 00948 00949 /// items remaining in input 00950 size_type m_elements_remaining; 00951 00952 /// memory buffer for merging from external streams 00953 out_block_type* m_buffer_block; 00954 00955 /// pointer into current memory buffer: this is either m_buffer_block or the small_runs vector 00956 const value_type* m_current_ptr; 00957 00958 /// pointer into current memory buffer: end after range of current values 00959 const value_type* m_current_end; 00960 00961 /// sequence of block needed for merging 00962 run_type m_consume_seq; 00963 00964 /// precalculated order of blocks in which they are prefetched 00965 int_type* m_prefetch_seq; 00966 00967 /// prefetcher object 00968 prefetcher_type * m_prefetcher; 00969 00970 /// loser tree used for native merging 00971 loser_tree_type * m_losers; 00972 00973 #if STXXL_PARALLEL_MULTIWAY_MERGE 00974 std::vector<sequence> * seqs; 00975 std::vector<block_type *> * buffers; 00976 diff_type num_currently_mergeable; 00977 #endif 00978 00979 #if STXXL_CHECK_ORDER_IN_SORTS 00980 /// previous element to ensure the current output ordering 00981 value_type m_last_element; 00982 #endif //STXXL_CHECK_ORDER_IN_SORTS 00983 00984 //////////////////////////////////////////////////////////////////// 00985 00986 void merge_recursively(); 00987 00988 void deallocate_prefetcher() 00989 { 00990 if (m_prefetcher) 00991 { 00992 delete m_losers; 00993 #if STXXL_PARALLEL_MULTIWAY_MERGE 00994 delete seqs; 00995 delete buffers; 00996 #endif 00997 delete m_prefetcher; 00998 delete[] m_prefetch_seq; 00999 m_prefetcher = NULL; 01000 } 01001 } 01002 01003 void fill_buffer_block() 01004 { 01005 STXXL_VERBOSE1("fill_buffer_block"); 01006 if (do_parallel_merge()) 01007 { 01008 #if STXXL_PARALLEL_MULTIWAY_MERGE 01009 // begin of STL-style merging 01010 diff_type rest = out_block_type::size; // elements still to merge for this output block 01011 01012 do // while rest > 0 and still elements available 01013 { 01014 if (num_currently_mergeable < rest) 01015 { 01016 if (!m_prefetcher || m_prefetcher->empty()) 01017 { 01018 // anything remaining is already in memory 01019 num_currently_mergeable = m_elements_remaining; 01020 } 01021 else 01022 { 01023 num_currently_mergeable = sort_helper::count_elements_less_equal( 01024 *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp); 01025 } 01026 } 01027 01028 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements 01029 01030 STXXL_VERBOSE1("before merge " << output_size); 01031 01032 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size); 01033 // sequence iterators are progressed appropriately 01034 01035 rest -= output_size; 01036 num_currently_mergeable -= output_size; 01037 01038 STXXL_VERBOSE1("after merge"); 01039 01040 sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *m_prefetcher); 01041 } while (rest > 0 && (*seqs).size() > 0); 01042 01043 #if STXXL_CHECK_ORDER_IN_SORTS 01044 if (!stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp)) 01045 { 01046 for (value_type * i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i) 01047 if (cmp(*i, *(i - 1))) 01048 { 01049 STXXL_VERBOSE1("Error at position " << (i - m_buffer_block->begin())); 01050 } 01051 assert(false); 01052 } 01053 #endif //STXXL_CHECK_ORDER_IN_SORTS 01054 01055 // end of STL-style merging 01056 #else 01057 STXXL_THROW_UNREACHABLE(); 01058 #endif //STXXL_PARALLEL_MULTIWAY_MERGE 01059 } 01060 else 01061 { 01062 // begin of native merging procedure 01063 m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining)); 01064 // end of native merging procedure 01065 } 01066 STXXL_VERBOSE1("current block filled"); 01067 01068 m_current_ptr = m_buffer_block->elem; 01069 m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining); 01070 01071 if (m_elements_remaining <= out_block_type::size) 01072 deallocate_prefetcher(); 01073 } 01074 01075 public: 01076 //! \brief Creates a runs merger object 01077 //! \param c comparison object 01078 //! \param memory_to_use amount of memory available for the merger in bytes 01079 basic_runs_merger(value_cmp c, unsigned_type memory_to_use) 01080 : m_cmp(c), 01081 m_memory_to_use(memory_to_use), 01082 m_buffer_block(new out_block_type), 01083 m_prefetch_seq(NULL), 01084 m_prefetcher(NULL), 01085 m_losers(NULL) 01086 #if STXXL_PARALLEL_MULTIWAY_MERGE 01087 , seqs(NULL), 01088 buffers(NULL), 01089 num_currently_mergeable(0) 01090 #endif 01091 #if STXXL_CHECK_ORDER_IN_SORTS 01092 , m_last_element(m_cmp.min_value()) 01093 #endif //STXXL_CHECK_ORDER_IN_SORTS 01094 { 01095 sort_helper::verify_sentinel_strict_weak_ordering(m_cmp); 01096 } 01097 01098 //! \brief Set memory amount to use for the merger in bytes 01099 void set_memory_to_use(unsigned_type memory_to_use) 01100 { 01101 m_memory_to_use = memory_to_use; 01102 } 01103 01104 //! \brief Initialize the runs merger object with a new round of sorted_runs 01105 void initialize(const sorted_runs_type & sruns) 01106 { 01107 m_sruns = sruns; 01108 m_elements_remaining = m_sruns->elements; 01109 01110 if (empty()) 01111 return; 01112 01113 if (!m_sruns->small_run.empty()) 01114 { 01115 // we have a small input <= B, that is kept in the main memory 01116 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << m_elements_remaining); 01117 assert(m_elements_remaining == size_type(m_sruns->small_run.size())); 01118 01119 m_current_ptr = &m_sruns->small_run[0]; 01120 m_current_end = m_current_ptr + m_sruns->small_run.size(); 01121 01122 return; 01123 } 01124 01125 #if STXXL_CHECK_ORDER_IN_SORTS 01126 assert(check_sorted_runs(m_sruns, m_cmp)); 01127 #endif //STXXL_CHECK_ORDER_IN_SORTS 01128 01129 // *** test whether recursive merging is necessary 01130 01131 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 01132 01133 int_type disks_number = config::get_instance()->disks_number(); 01134 unsigned_type min_prefetch_buffers = 2 * disks_number; 01135 unsigned_type input_buffers = (m_memory_to_use > sizeof(out_block_type) ? m_memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size; 01136 unsigned_type nruns = m_sruns->runs.size(); 01137 01138 if (input_buffers < nruns + min_prefetch_buffers) 01139 { 01140 // can not merge runs in one pass. merge recursively: 01141 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better"); 01142 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)"); 01143 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger."); 01144 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers); 01145 STXXL_WARNMSG_RECURSIVE_SORT("memory_to_use=" << m_memory_to_use << " bytes block_type::raw_size=" << block_type::raw_size << " bytes"); 01146 01147 // check whether we have enough memory to merge recursively 01148 unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size; 01149 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) { 01150 // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering 01151 // as well as 1 current output block and at least 2 input blocks 01152 STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but " 01153 << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and"); 01154 STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting."); 01155 throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'"); 01156 } 01157 01158 merge_recursively(); 01159 01160 nruns = m_sruns->runs.size(); 01161 } 01162 01163 assert(nruns + min_prefetch_buffers <= input_buffers); 01164 01165 // *** Allocate prefetcher and merge data structure 01166 01167 deallocate_prefetcher(); 01168 01169 unsigned_type prefetch_seq_size = 0; 01170 for (unsigned_type i = 0; i < nruns; ++i) 01171 { 01172 prefetch_seq_size += m_sruns->runs[i].size(); 01173 } 01174 01175 m_consume_seq.resize(prefetch_seq_size); 01176 m_prefetch_seq = new int_type[prefetch_seq_size]; 01177 01178 typename run_type::iterator copy_start = m_consume_seq.begin(); 01179 for (unsigned_type i = 0; i < nruns; ++i) 01180 { 01181 copy_start = std::copy(m_sruns->runs[i].begin(), 01182 m_sruns->runs[i].end(), 01183 copy_start); 01184 } 01185 01186 std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(), 01187 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(m_cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL); 01188 01189 const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns); 01190 01191 #if STXXL_SORT_OPTIMAL_PREFETCHING 01192 // heuristic 01193 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10; 01194 01195 compute_prefetch_schedule( 01196 m_consume_seq, 01197 m_prefetch_seq, 01198 n_opt_prefetch_buffers, 01199 disks_number); 01200 #else 01201 for (unsigned_type i = 0; i < prefetch_seq_size; ++i) 01202 m_prefetch_seq[i] = i; 01203 #endif //STXXL_SORT_OPTIMAL_PREFETCHING 01204 01205 m_prefetcher = new prefetcher_type( 01206 m_consume_seq.begin(), 01207 m_consume_seq.end(), 01208 m_prefetch_seq, 01209 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size)); 01210 01211 if (do_parallel_merge()) 01212 { 01213 #if STXXL_PARALLEL_MULTIWAY_MERGE 01214 // begin of STL-style merging 01215 seqs = new std::vector<sequence>(nruns); 01216 buffers = new std::vector<block_type *>(nruns); 01217 01218 for (unsigned_type i = 0; i < nruns; ++i) //initialize sequences 01219 { 01220 (*buffers)[i] = m_prefetcher->pull_block(); //get first block of each run 01221 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged 01222 } 01223 // end of STL-style merging 01224 #else 01225 STXXL_THROW_UNREACHABLE(); 01226 #endif //STXXL_PARALLEL_MULTIWAY_MERGE 01227 } 01228 else 01229 { 01230 // begin of native merging procedure 01231 m_losers = new loser_tree_type(m_prefetcher, nruns, run_cursor2_cmp_type(m_cmp)); 01232 // end of native merging procedure 01233 } 01234 01235 fill_buffer_block(); 01236 } 01237 01238 //! \brief Deallocate temporary structures freeing memory prior to next initialize() 01239 void deallocate() 01240 { 01241 deallocate_prefetcher(); 01242 m_sruns = NULL; // release reference on result object 01243 } 01244 01245 public: 01246 //! \brief Standard stream method 01247 bool empty() const 01248 { 01249 return (m_elements_remaining == 0); 01250 } 01251 01252 //! \brief Standard size method 01253 size_type size() const 01254 { 01255 return m_elements_remaining; 01256 } 01257 01258 //! \brief Standard stream method 01259 const value_type & operator * () const 01260 { 01261 assert(!empty()); 01262 return *m_current_ptr; 01263 } 01264 01265 //! \brief Standard stream method 01266 const value_type * operator -> () const 01267 { 01268 return &(operator * ()); 01269 } 01270 01271 //! \brief Standard stream method 01272 basic_runs_merger & operator ++ () // preincrement operator 01273 { 01274 assert(!empty()); 01275 assert(m_current_ptr != m_current_end); 01276 01277 --m_elements_remaining; 01278 ++m_current_ptr; 01279 01280 if (LIKELY(m_current_ptr == m_current_end && !empty())) 01281 { 01282 fill_buffer_block(); 01283 01284 #if STXXL_CHECK_ORDER_IN_SORTS 01285 assert(stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp)); 01286 #endif //STXXL_CHECK_ORDER_IN_SORTS 01287 } 01288 01289 #if STXXL_CHECK_ORDER_IN_SORTS 01290 if (!empty()) 01291 { 01292 assert(!m_cmp(operator*(), m_last_element)); 01293 m_last_element = operator*(); 01294 } 01295 #endif //STXXL_CHECK_ORDER_IN_SORTS 01296 01297 return *this; 01298 } 01299 01300 //! \brief Destructor 01301 //! \remark Deallocates blocks of the input sorted runs object 01302 virtual ~basic_runs_merger() 01303 { 01304 deallocate_prefetcher(); 01305 01306 delete m_buffer_block; 01307 } 01308 }; 01309 01310 01311 template <class RunsType_, class CompareType_, class AllocStr_> 01312 void basic_runs_merger<RunsType_, CompareType_, AllocStr_>::merge_recursively() 01313 { 01314 block_manager * bm = block_manager::get_instance(); 01315 unsigned_type ndisks = config::get_instance()->disks_number(); 01316 unsigned_type nwrite_buffers = 2 * ndisks; 01317 unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type); 01318 01319 // memory consumption of the recursive merger (uses block_type as out_block_type) 01320 unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type); 01321 unsigned_type recursive_merger_memory_out_block = sizeof(block_type); 01322 unsigned_type memory_for_buffers = memory_for_write_buffers 01323 + recursive_merger_memory_prefetch_buffers 01324 + recursive_merger_memory_out_block; 01325 // maximum arity in the recursive merger 01326 unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size; 01327 01328 unsigned_type nruns = m_sruns->runs.size(); 01329 const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity); 01330 assert(merge_factor > 1); 01331 assert(merge_factor <= max_arity); 01332 01333 while (nruns > max_arity) 01334 { 01335 unsigned_type new_nruns = div_ceil(nruns, merge_factor); 01336 STXXL_MSG("Starting new merge phase: nruns: " << nruns << 01337 " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns); 01338 01339 // construct new sorted_runs data object which will be swapped into m_sruns 01340 01341 sorted_runs_data_type new_runs; 01342 new_runs.runs.resize(new_nruns); 01343 new_runs.runs_sizes.resize(new_nruns); 01344 new_runs.elements = m_sruns->elements; 01345 01346 // merge all runs from m_runs into news_runs 01347 01348 unsigned_type runs_left = nruns; 01349 unsigned_type cur_out_run = 0; 01350 size_type elements_left = m_sruns->elements; 01351 01352 while (runs_left > 0) 01353 { 01354 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor); 01355 STXXL_MSG("Merging " << runs2merge << " runs"); 01356 01357 if (runs2merge > 1) // non-trivial merge 01358 { 01359 // count the number of elements in the run 01360 unsigned_type elements_in_new_run = 0; 01361 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i) 01362 { 01363 elements_in_new_run += m_sruns->runs_sizes[i]; 01364 } 01365 new_runs.runs_sizes[cur_out_run] = elements_in_new_run; 01366 01367 // calculate blocks in run 01368 const unsigned_type blocks_in_new_run = div_ceil(elements_in_new_run, block_type::size); 01369 01370 // allocate blocks for the new runs 01371 new_runs.runs[cur_out_run].resize(blocks_in_new_run); 01372 bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[cur_out_run].begin()), make_bid_iterator(new_runs.runs[cur_out_run].end())); 01373 01374 // Construct temporary sorted_runs object as input into recursive merger. 01375 // This sorted_runs is copied a subset of the over-large set of runs, which 01376 // will be deallocated from external memory once the runs are merged. 01377 sorted_runs_data_type cur_runs; 01378 cur_runs.runs.resize(runs2merge); 01379 cur_runs.runs_sizes.resize(runs2merge); 01380 cur_runs.inc_ref(); // hold dangling reference 01381 01382 std::copy(m_sruns->runs.begin() + nruns - runs_left, 01383 m_sruns->runs.begin() + nruns - runs_left + runs2merge, 01384 cur_runs.runs.begin()); 01385 std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left, 01386 m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge, 01387 cur_runs.runs_sizes.begin()); 01388 01389 cur_runs.elements = elements_in_new_run; 01390 elements_left -= elements_in_new_run; 01391 01392 // construct recursive merger 01393 01394 basic_runs_merger<RunsType_, CompareType_, AllocStr_> merger(m_cmp, m_memory_to_use - memory_for_write_buffers); 01395 merger.initialize(&cur_runs); 01396 01397 { // make sure everything is being destroyed in right time 01398 buf_ostream<block_type, typename run_type::iterator> out( 01399 new_runs.runs[cur_out_run].begin(), 01400 nwrite_buffers); 01401 01402 size_type cnt = 0; 01403 const size_type cnt_max = cur_runs.elements; 01404 01405 while (cnt != cnt_max) 01406 { 01407 *out = *merger; 01408 if ((cnt % block_type::size) == 0) // have to write the trigger value 01409 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger; 01410 01411 ++cnt, ++out, ++merger; 01412 } 01413 assert(merger.empty()); 01414 01415 while (cnt % block_type::size) 01416 { 01417 *out = m_cmp.max_value(); 01418 ++out, ++cnt; 01419 } 01420 } 01421 01422 // deallocate merged runs by destroying cur_runs 01423 } 01424 else // runs2merge = 1 -> no merging needed 01425 { 01426 assert( cur_out_run+1 == new_runs.runs.size() ); 01427 01428 elements_left -= m_sruns->runs_sizes.back(); 01429 01430 // copy block identifiers into new sorted_runs object 01431 new_runs.runs.back() = m_sruns->runs.back(); 01432 new_runs.runs_sizes.back() = m_sruns->runs_sizes.back(); 01433 } 01434 01435 runs_left -= runs2merge; 01436 ++cur_out_run; 01437 } 01438 01439 assert(elements_left == 0); 01440 01441 m_sruns->runs.clear(); // clear bid vector of m_sruns to skip deallocation of blocks in destructor 01442 01443 std::swap(nruns, new_nruns); 01444 m_sruns->swap(new_runs); // replaces data in referenced counted object m_sruns 01445 01446 } // end while (nruns > max_arity) 01447 } 01448 01449 01450 //! \brief Merges sorted runs 01451 //! 01452 //! \tparam RunsType_ type of the sorted runs, available as \c runs_creator::sorted_runs_type , 01453 //! \tparam CompareType_ type of comparison object used for merging 01454 //! \tparam AllocStr_ allocation strategy used to allocate the blocks for 01455 //! storing intermediate results if several merge passes are required 01456 template <class RunsType_, 01457 class CompareType_ = typename RunsType_::element_type::cmp_type, 01458 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY> 01459 class runs_merger : public basic_runs_merger<RunsType_, CompareType_, AllocStr_> 01460 { 01461 protected: 01462 typedef basic_runs_merger<RunsType_, CompareType_, AllocStr_> base; 01463 01464 public: 01465 typedef RunsType_ sorted_runs_type; 01466 typedef typename base::value_cmp value_cmp; 01467 typedef typename base::value_cmp cmp_type; 01468 typedef typename base::block_type block_type; 01469 01470 public: 01471 //! \brief Creates a runs merger object 01472 //! \param r input sorted runs object 01473 //! \param c comparison object 01474 //! \param memory_to_use amount of memory available for the merger in bytes 01475 runs_merger(sorted_runs_type & sruns, value_cmp cmp, unsigned_type memory_to_use) 01476 : base(cmp, memory_to_use) 01477 { 01478 initialize(sruns); 01479 } 01480 01481 //! \brief Creates a runs merger object without initializing a round of sorted_runs 01482 //! \param c comparison object 01483 //! \param memory_to_use amount of memory available for the merger in bytes 01484 runs_merger(value_cmp cmp, unsigned_type memory_to_use) 01485 : base(cmp, memory_to_use) 01486 { 01487 } 01488 }; 01489 01490 01491 //////////////////////////////////////////////////////////////////////// 01492 // SORT // 01493 //////////////////////////////////////////////////////////////////////// 01494 01495 //! \brief Produces sorted stream from input stream 01496 //! 01497 //! \tparam Input_ type of the input stream 01498 //! \tparam CompareType_ type of comparison object used for sorting the runs 01499 //! \tparam BlockSize_ size of blocks used to store the runs 01500 //! \tparam AllocStr_ functor that defines allocation strategy for the runs 01501 //! \remark Implemented as the composition of \c runs_creator and \c runs_merger . 01502 template <class Input_, 01503 class CompareType_, 01504 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type), 01505 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY, 01506 class runs_creator_type = runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> > 01507 class sort : public noncopyable 01508 { 01509 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type; 01510 typedef runs_merger<sorted_runs_type, CompareType_, AllocStr_> runs_merger_type; 01511 01512 runs_creator_type creator; 01513 runs_merger_type merger; 01514 01515 public: 01516 //! \brief Standard stream typedef 01517 typedef typename Input_::value_type value_type; 01518 01519 //! \brief Creates the object 01520 //! \param in input stream 01521 //! \param c comparator object 01522 //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes 01523 sort(Input_ & in, CompareType_ c, unsigned_type memory_to_use) : 01524 creator(in, c, memory_to_use), 01525 merger(creator.result(), c, memory_to_use) 01526 { 01527 sort_helper::verify_sentinel_strict_weak_ordering(c); 01528 } 01529 01530 //! \brief Creates the object 01531 //! \param in input stream 01532 //! \param c comparator object 01533 //! \param m_memory_to_userc memory amount that is allowed to used by the runs creator in bytes 01534 //! \param m_memory_to_use memory amount that is allowed to used by the merger in bytes 01535 sort(Input_ & in, CompareType_ c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use) : 01536 creator(in, c, m_memory_to_userc), 01537 merger(creator.result(), c, m_memory_to_use) 01538 { 01539 sort_helper::verify_sentinel_strict_weak_ordering(c); 01540 } 01541 01542 01543 //! \brief Standard stream method 01544 bool empty() const 01545 { 01546 return merger.empty(); 01547 } 01548 01549 //! \brief Standard stream method 01550 const value_type & operator * () const 01551 { 01552 assert(!empty()); 01553 return *merger; 01554 } 01555 01556 const value_type * operator -> () const 01557 { 01558 assert(!empty()); 01559 return merger.operator -> (); 01560 } 01561 01562 //! \brief Standard stream method 01563 sort & operator ++ () 01564 { 01565 ++merger; 01566 return *this; 01567 } 01568 }; 01569 01570 //! \brief Computes sorted runs type from value type and block size 01571 //! 01572 //! \tparam ValueType_ type of values ins sorted runs 01573 //! \tparam BlockSize_ size of blocks where sorted runs stored 01574 template < 01575 class ValueType_, 01576 unsigned BlockSize_> 01577 class compute_sorted_runs_type 01578 { 01579 typedef ValueType_ value_type; 01580 typedef BID<BlockSize_> bid_type; 01581 typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type; 01582 01583 public: 01584 typedef sorted_runs<trigger_entry_type,std::less<value_type> > result; 01585 }; 01586 01587 //! \} 01588 } 01589 01590 //! \addtogroup stlalgo 01591 //! \{ 01592 01593 //! \brief Sorts range of any random access iterators externally 01594 01595 //! \param begin iterator pointing to the first element of the range 01596 //! \param end iterator pointing to the last+1 element of the range 01597 //! \param cmp comparison object 01598 //! \param MemSize memory to use for sorting (in bytes) 01599 //! \param AS allocation strategy 01600 //! 01601 //! The \c BlockSize template parameter defines the block size to use (in bytes) 01602 //! \warning Slower than External Iterator Sort 01603 template <unsigned BlockSize, 01604 class RandomAccessIterator, 01605 class CmpType, 01606 class AllocStr> 01607 void sort(RandomAccessIterator begin, 01608 RandomAccessIterator end, 01609 CmpType cmp, 01610 unsigned_type MemSize, 01611 AllocStr AS) 01612 { 01613 STXXL_UNUSED(AS); 01614 #ifdef BOOST_MSVC 01615 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType; 01616 #else 01617 typedef __typeof__(stream::streamify(begin, end)) InputType; 01618 #endif //BOOST_MSVC 01619 InputType Input(begin, end); 01620 typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type; 01621 sorter_type Sort(Input, cmp, MemSize); 01622 stream::materialize(Sort, begin); 01623 } 01624 01625 //! \} 01626 01627 __STXXL_END_NAMESPACE 01628 01629 #endif // !STXXL_SORT_STREAM_HEADER 01630 // vim: et:ts=4:sw=4