Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/sort.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2006 Johannes Singler <singler@ira.uka.de> 00008 * Copyright (C) 2008-2011 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #ifndef STXXL_SORT_HEADER 00016 #define STXXL_SORT_HEADER 00017 00018 #include <functional> 00019 00020 #include <stxxl/bits/mng/mng.h> 00021 #include <stxxl/bits/common/rand.h> 00022 #include <stxxl/bits/mng/adaptor.h> 00023 #include <stxxl/bits/common/simple_vector.h> 00024 #include <stxxl/bits/common/settings.h> 00025 #include <stxxl/bits/mng/block_alloc_interleaved.h> 00026 #include <stxxl/bits/io/request_operations.h> 00027 #include <stxxl/bits/algo/sort_base.h> 00028 #include <stxxl/bits/algo/sort_helper.h> 00029 #include <stxxl/bits/algo/intksort.h> 00030 #include <stxxl/bits/algo/adaptor.h> 00031 #include <stxxl/bits/algo/async_schedule.h> 00032 #include <stxxl/bits/mng/block_prefetcher.h> 00033 #include <stxxl/bits/mng/buf_writer.h> 00034 #include <stxxl/bits/algo/run_cursor.h> 00035 #include <stxxl/bits/algo/losertree.h> 00036 #include <stxxl/bits/algo/inmemsort.h> 00037 #include <stxxl/bits/parallel.h> 00038 #include <stxxl/bits/common/is_sorted.h> 00039 00040 00041 __STXXL_BEGIN_NAMESPACE 00042 00043 //! \addtogroup stlalgo 00044 //! \{ 00045 00046 00047 /*! \internal 00048 */ 00049 namespace sort_local 00050 { 00051 template <typename block_type, typename bid_type> 00052 struct read_next_after_write_completed 00053 { 00054 block_type * block; 00055 bid_type bid; 00056 request_ptr * req; 00057 void operator () (request * /*completed_req*/) 00058 { 00059 *req = block->read(bid); 00060 } 00061 }; 00062 00063 00064 template < 00065 typename block_type, 00066 typename run_type, 00067 typename input_bid_iterator, 00068 typename value_cmp> 00069 void 00070 create_runs( 00071 input_bid_iterator it, 00072 run_type ** runs, 00073 int_type nruns, 00074 int_type _m, 00075 value_cmp cmp) 00076 { 00077 typedef typename block_type::value_type type; 00078 typedef typename block_type::bid_type bid_type; 00079 STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m); 00080 00081 int_type m2 = _m / 2; 00082 block_manager * bm = block_manager::get_instance(); 00083 block_type * Blocks1 = new block_type[m2]; 00084 block_type * Blocks2 = new block_type[m2]; 00085 bid_type * bids1 = new bid_type[m2]; 00086 bid_type * bids2 = new bid_type[m2]; 00087 request_ptr * read_reqs1 = new request_ptr[m2]; 00088 request_ptr * read_reqs2 = new request_ptr[m2]; 00089 request_ptr * write_reqs = new request_ptr[m2]; 00090 read_next_after_write_completed<block_type, bid_type> * next_run_reads = 00091 new read_next_after_write_completed<block_type, bid_type>[m2]; 00092 00093 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00094 00095 int_type i; 00096 int_type run_size = 0; 00097 00098 assert(nruns >= 2); 00099 00100 run_size = runs[0]->size(); 00101 assert(run_size == m2); 00102 00103 for (i = 0; i < run_size; ++i) 00104 { 00105 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem)); 00106 bids1[i] = *(it++); 00107 read_reqs1[i] = Blocks1[i].read(bids1[i]); 00108 } 00109 00110 run_size = runs[1]->size(); 00111 00112 for (i = 0; i < run_size; ++i) 00113 { 00114 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem)); 00115 bids2[i] = *(it++); 00116 read_reqs2[i] = Blocks2[i].read(bids2[i]); 00117 } 00118 00119 for (int_type k = 0; k < nruns - 1; ++k) 00120 { 00121 run_type * run = runs[k]; 00122 run_size = run->size(); 00123 assert(run_size == m2); 00124 #ifndef NDEBUG 00125 int_type next_run_size = runs[k + 1]->size(); 00126 #endif 00127 assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2)); 00128 00129 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1"); 00130 wait_all(read_reqs1, run_size); 00131 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1"); 00132 for (i = 0; i < run_size; ++i) 00133 bm->delete_block(bids1[i]); 00134 00135 check_sort_settings(); 00136 potentially_parallel:: 00137 sort(make_element_iterator(Blocks1, 0), 00138 make_element_iterator(Blocks1, run_size * block_type::size), 00139 cmp); 00140 00141 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00142 if (k > 0) 00143 wait_all(write_reqs, m2); 00144 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00145 00146 int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0; 00147 for (i = 0; i < m2; ++i) 00148 { 00149 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem)); 00150 (*run)[i].value = Blocks1[i][0]; 00151 if (i >= runplus2size) { 00152 write_reqs[i] = Blocks1[i].write((*run)[i].bid); 00153 } 00154 else 00155 { 00156 next_run_reads[i].block = Blocks1 + i; 00157 next_run_reads[i].req = read_reqs1 + i; 00158 bids1[i] = next_run_reads[i].bid = *(it++); 00159 write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]); 00160 } 00161 } 00162 std::swap(Blocks1, Blocks2); 00163 std::swap(bids1, bids2); 00164 std::swap(read_reqs1, read_reqs2); 00165 } 00166 00167 run_type * run = runs[nruns - 1]; 00168 run_size = run->size(); 00169 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1"); 00170 wait_all(read_reqs1, run_size); 00171 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1"); 00172 for (i = 0; i < run_size; ++i) 00173 bm->delete_block(bids1[i]); 00174 00175 check_sort_settings(); 00176 potentially_parallel:: 00177 sort(make_element_iterator(Blocks1, 0), 00178 make_element_iterator(Blocks1, run_size * block_type::size), 00179 cmp); 00180 00181 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00182 wait_all(write_reqs, m2); 00183 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00184 00185 for (i = 0; i < run_size; ++i) 00186 { 00187 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem)); 00188 (*run)[i].value = Blocks1[i][0]; 00189 write_reqs[i] = Blocks1[i].write((*run)[i].bid); 00190 } 00191 00192 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00193 wait_all(write_reqs, run_size); 00194 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00195 00196 delete[] Blocks1; 00197 delete[] Blocks2; 00198 delete[] bids1; 00199 delete[] bids2; 00200 delete[] read_reqs1; 00201 delete[] read_reqs2; 00202 delete[] write_reqs; 00203 delete[] next_run_reads; 00204 } 00205 00206 00207 template <typename block_type, typename run_type, typename value_cmp> 00208 bool check_sorted_runs(run_type ** runs, 00209 unsigned_type nruns, 00210 unsigned_type m, 00211 value_cmp cmp) 00212 { 00213 typedef typename block_type::value_type value_type; 00214 00215 STXXL_MSG("check_sorted_runs Runs: " << nruns); 00216 unsigned_type irun = 0; 00217 for (irun = 0; irun < nruns; ++irun) 00218 { 00219 const unsigned_type nblocks_per_run = runs[irun]->size(); 00220 unsigned_type blocks_left = nblocks_per_run; 00221 block_type * blocks = new block_type[m]; 00222 request_ptr * reqs = new request_ptr[m]; 00223 value_type last = cmp.min_value(); 00224 00225 for (unsigned_type off = 0; off < nblocks_per_run; off += m) 00226 { 00227 const unsigned_type nblocks = STXXL_MIN(blocks_left, m); 00228 const unsigned_type nelements = nblocks * block_type::size; 00229 blocks_left -= nblocks; 00230 00231 for (unsigned_type j = 0; j < nblocks; ++j) 00232 { 00233 reqs[j] = blocks[j].read((*runs[irun])[j + off].bid); 00234 } 00235 wait_all(reqs, reqs + nblocks); 00236 00237 if (off && cmp(blocks[0][0], last)) 00238 { 00239 STXXL_MSG("check_sorted_runs wrong first value in the run " << irun); 00240 STXXL_MSG(" first value: " << blocks[0][0]); 00241 STXXL_MSG(" last value: " << last); 00242 for (unsigned_type k = 0; k < block_type::size; ++k) 00243 STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]); 00244 00245 return false; 00246 } 00247 00248 for (unsigned_type j = 0; j < nblocks; ++j) 00249 { 00250 if (!(blocks[j][0] == (*runs[irun])[j + off].value)) 00251 { 00252 STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (j + off)); 00253 STXXL_MSG(" trigger value: " << (*runs[irun])[j + off].value); 00254 STXXL_MSG("Data in the block:"); 00255 for (unsigned_type k = 0; k < block_type::size; ++k) 00256 STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]); 00257 00258 STXXL_MSG("BIDS:"); 00259 for (unsigned_type k = 0; k < nblocks; ++k) 00260 { 00261 if (k == j) 00262 STXXL_MSG("Bad one comes next."); 00263 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00264 } 00265 00266 return false; 00267 } 00268 } 00269 if (!stxxl::is_sorted(make_element_iterator(blocks, 0), 00270 make_element_iterator(blocks, nelements), 00271 cmp)) 00272 { 00273 STXXL_MSG("check_sorted_runs wrong order in the run " << irun); 00274 STXXL_MSG("Data in blocks:"); 00275 for (unsigned_type j = 0; j < nblocks; ++j) 00276 { 00277 for (unsigned_type k = 0; k < block_type::size; ++k) 00278 STXXL_MSG(" Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]); 00279 } 00280 STXXL_MSG("BIDS:"); 00281 for (unsigned_type k = 0; k < nblocks; ++k) 00282 { 00283 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00284 } 00285 00286 return false; 00287 } 00288 00289 last = blocks[nblocks - 1][block_type::size - 1]; 00290 } 00291 00292 assert(blocks_left == 0); 00293 delete[] reqs; 00294 delete[] blocks; 00295 } 00296 00297 return true; 00298 } 00299 00300 00301 template <typename block_type, typename run_type, typename value_cmp> 00302 void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp) 00303 { 00304 typedef typename block_type::value_type value_type; 00305 typedef typename run_type::value_type trigger_entry_type; 00306 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00307 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00308 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type; 00309 00310 run_type consume_seq(out_run->size()); 00311 00312 int_type * prefetch_seq = new int_type[out_run->size()]; 00313 00314 typename run_type::iterator copy_start = consume_seq.begin(); 00315 for (int_type i = 0; i < nruns; i++) 00316 { 00317 // TODO: try to avoid copy 00318 copy_start = std::copy( 00319 in_runs[i]->begin(), 00320 in_runs[i]->end(), 00321 copy_start); 00322 } 00323 00324 std::stable_sort(consume_seq.begin(), consume_seq.end(), 00325 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL); 00326 00327 int_type disks_number = config::get_instance()->disks_number(); 00328 00329 #ifdef PLAY_WITH_OPT_PREF 00330 const int_type n_write_buffers = 4 * disks_number; 00331 #else 00332 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4)); 00333 const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers); 00334 #if STXXL_SORT_OPTIMAL_PREFETCHING 00335 // heuristic 00336 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10; 00337 #endif 00338 #endif 00339 00340 #if STXXL_SORT_OPTIMAL_PREFETCHING 00341 compute_prefetch_schedule( 00342 consume_seq, 00343 prefetch_seq, 00344 n_opt_prefetch_buffers, 00345 disks_number); 00346 #else 00347 for (unsigned_type i = 0; i < out_run->size(); i++) 00348 prefetch_seq[i] = i; 00349 00350 #endif 00351 00352 prefetcher_type prefetcher(consume_seq.begin(), 00353 consume_seq.end(), 00354 prefetch_seq, 00355 nruns + n_prefetch_buffers); 00356 00357 buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2); 00358 00359 int_type out_run_size = out_run->size(); 00360 00361 block_type * out_buffer = writer.get_free_block(); 00362 00363 //If parallelism is activated, one can still fall back to the 00364 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway. 00365 00366 if (do_parallel_merge()) 00367 { 00368 #if STXXL_PARALLEL_MULTIWAY_MERGE 00369 00370 // begin of STL-style merging 00371 00372 typedef stxxl::int64 diff_type; 00373 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence; 00374 typedef typename std::vector<sequence>::size_type seqs_size_type; 00375 std::vector<sequence> seqs(nruns); 00376 std::vector<block_type *> buffers(nruns); 00377 00378 for (int_type i = 0; i < nruns; i++) // initialize sequences 00379 { 00380 buffers[i] = prefetcher.pull_block(); // get first block of each run 00381 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end()); 00382 // this memory location stays the same, only the data is exchanged 00383 } 00384 00385 #if STXXL_CHECK_ORDER_IN_SORTS 00386 value_type last_elem = cmp.min_value(); 00387 #endif 00388 diff_type num_currently_mergeable = 0; 00389 00390 for (int_type j = 0; j < out_run_size; ++j) // for the whole output run, out_run_size is in blocks 00391 { 00392 diff_type rest = block_type::size; // elements still to merge for this output block 00393 00394 STXXL_VERBOSE1("output block " << j); 00395 do { 00396 if (num_currently_mergeable < rest) 00397 { 00398 if (prefetcher.empty()) 00399 { 00400 // anything remaining is already in memory 00401 num_currently_mergeable = (out_run_size - j) * block_type::size 00402 - (block_type::size - rest); 00403 } 00404 else 00405 { 00406 num_currently_mergeable = sort_helper::count_elements_less_equal( 00407 seqs, consume_seq[prefetcher.pos()].value, cmp); 00408 } 00409 } 00410 00411 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements 00412 00413 STXXL_VERBOSE1("before merge " << output_size); 00414 00415 stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size); 00416 // sequence iterators are progressed appropriately 00417 00418 rest -= output_size; 00419 num_currently_mergeable -= output_size; 00420 00421 STXXL_VERBOSE1("after merge"); 00422 00423 sort_helper::refill_or_remove_empty_sequences(seqs, buffers, prefetcher); 00424 } while (rest > 0 && seqs.size() > 0); 00425 00426 #if STXXL_CHECK_ORDER_IN_SORTS 00427 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp)) 00428 { 00429 for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++) 00430 if (cmp(*i, *(i - 1))) 00431 { 00432 STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin())); 00433 } 00434 assert(false); 00435 } 00436 00437 if (j > 0) // do not check in first iteration 00438 assert(cmp((*out_buffer)[0], last_elem) == false); 00439 00440 last_elem = (*out_buffer)[block_type::size - 1]; 00441 #endif 00442 00443 (*out_run)[j].value = (*out_buffer)[0]; // save smallest value 00444 00445 out_buffer = writer.write(out_buffer, (*out_run)[j].bid); 00446 } 00447 00448 // end of STL-style merging 00449 00450 #else 00451 STXXL_THROW_UNREACHABLE(); 00452 #endif 00453 } 00454 else 00455 { 00456 // begin of native merging procedure 00457 00458 loser_tree<run_cursor_type, run_cursor2_cmp_type> 00459 losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp)); 00460 00461 #if STXXL_CHECK_ORDER_IN_SORTS 00462 value_type last_elem = cmp.min_value(); 00463 #endif 00464 00465 for (int_type i = 0; i < out_run_size; ++i) 00466 { 00467 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size); 00468 (*out_run)[i].value = *(out_buffer->elem); 00469 00470 #if STXXL_CHECK_ORDER_IN_SORTS 00471 assert(stxxl::is_sorted( 00472 out_buffer->begin(), 00473 out_buffer->end(), 00474 cmp)); 00475 00476 if (i) 00477 assert(cmp(*(out_buffer->elem), last_elem) == false); 00478 00479 last_elem = (*out_buffer).elem[block_type::size - 1]; 00480 #endif 00481 00482 out_buffer = writer.write(out_buffer, (*out_run)[i].bid); 00483 } 00484 00485 // end of native merging procedure 00486 } 00487 00488 delete[] prefetch_seq; 00489 00490 block_manager * bm = block_manager::get_instance(); 00491 for (int_type i = 0; i < nruns; ++i) 00492 { 00493 unsigned_type sz = in_runs[i]->size(); 00494 for (unsigned_type j = 0; j < sz; ++j) 00495 bm->delete_block((*in_runs[i])[j].bid); 00496 00497 00498 delete in_runs[i]; 00499 } 00500 } 00501 00502 00503 template <typename block_type, 00504 typename alloc_strategy, 00505 typename input_bid_iterator, 00506 typename value_cmp> 00507 simple_vector<sort_helper::trigger_entry<block_type> > * 00508 sort_blocks(input_bid_iterator input_bids, 00509 unsigned_type _n, 00510 unsigned_type _m, 00511 value_cmp cmp 00512 ) 00513 { 00514 typedef typename block_type::value_type type; 00515 typedef typename block_type::bid_type bid_type; 00516 typedef sort_helper::trigger_entry<block_type> trigger_entry_type; 00517 typedef simple_vector<trigger_entry_type> run_type; 00518 typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy; 00519 00520 unsigned_type m2 = _m / 2; 00521 unsigned_type full_runs = _n / m2; 00522 unsigned_type partial_runs = ((_n % m2) ? 1 : 0); 00523 unsigned_type nruns = full_runs + partial_runs; 00524 unsigned_type i; 00525 00526 block_manager * mng = block_manager::get_instance(); 00527 00528 //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs); 00529 00530 double begin = timestamp(), after_runs_creation, end; 00531 00532 run_type ** runs = new run_type *[nruns]; 00533 00534 for (i = 0; i < full_runs; i++) 00535 runs[i] = new run_type(m2); 00536 00537 00538 if (partial_runs) 00539 runs[i] = new run_type(_n - full_runs * m2); 00540 00541 for (i = 0; i < nruns; ++i) 00542 mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end())); 00543 00544 sort_local::create_runs<block_type, 00545 run_type, 00546 input_bid_iterator, 00547 value_cmp>(input_bids, runs, nruns, _m, cmp); 00548 00549 after_runs_creation = timestamp(); 00550 00551 double io_wait_after_rf = stats::get_instance()->get_io_wait_time(); 00552 00553 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00554 00555 const int_type merge_factor = optimal_merge_factor(nruns, _m); 00556 run_type ** new_runs; 00557 00558 while (nruns > 1) 00559 { 00560 int_type new_nruns = div_ceil(nruns, merge_factor); 00561 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns << 00562 " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns); 00563 00564 new_runs = new run_type *[new_nruns]; 00565 00566 int_type runs_left = nruns; 00567 int_type cur_out_run = 0; 00568 int_type blocks_in_new_run = 0; 00569 00570 while (runs_left > 0) 00571 { 00572 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00573 blocks_in_new_run = 0; 00574 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++) 00575 blocks_in_new_run += runs[i]->size(); 00576 00577 // allocate run 00578 new_runs[cur_out_run++] = new run_type(blocks_in_new_run); 00579 runs_left -= runs2merge; 00580 } 00581 // allocate blocks for the new runs 00582 if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed()) 00583 { 00584 // if we sort a file we can reuse the input bids for the output 00585 input_bid_iterator cur = input_bids; 00586 for (int_type i = 0; cur != (input_bids + _n); ++cur) 00587 { 00588 (*new_runs[0])[i++].bid = *cur; 00589 } 00590 00591 bid_type & firstBID = (*new_runs[0])[0].bid; 00592 if (firstBID.is_managed()) 00593 { 00594 // the first block does not belong to the file 00595 // need to reallocate it 00596 mng->new_block(FR(), firstBID); 00597 } 00598 bid_type & lastBID = (*new_runs[0])[_n - 1].bid; 00599 if (lastBID.is_managed()) 00600 { 00601 // the first block does not belong to the file 00602 // need to reallocate it 00603 mng->new_block(FR(), lastBID); 00604 } 00605 } 00606 else 00607 { 00608 mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()), 00609 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run), 00610 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run)); 00611 } 00612 // merge all 00613 runs_left = nruns; 00614 cur_out_run = 0; 00615 while (runs_left > 0) 00616 { 00617 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00618 #if STXXL_CHECK_ORDER_IN_SORTS 00619 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp))); 00620 #endif 00621 STXXL_VERBOSE("Merging " << runs2merge << " runs"); 00622 merge_runs<block_type, run_type>(runs + nruns - runs_left, 00623 runs2merge, *(new_runs + (cur_out_run++)), _m, cmp 00624 ); 00625 runs_left -= runs2merge; 00626 } 00627 00628 nruns = new_nruns; 00629 delete[] runs; 00630 runs = new_runs; 00631 } 00632 00633 run_type * result = *runs; 00634 delete[] runs; 00635 00636 end = timestamp(); 00637 00638 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " << 00639 after_runs_creation - begin << " s"); 00640 STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s"); 00641 STXXL_VERBOSE(*stats::get_instance()); 00642 STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf); 00643 00644 return result; 00645 } 00646 } 00647 00648 00649 /*! \page comparison Comparison concept 00650 00651 Model of \b Comparison concept must: 00652 - provide \b operator(a,b) that returns comparison result of two user types, 00653 must define strict weak ordering 00654 - provide \b max_value method that returns a value that is \b strictly \b greater than all 00655 other objects of user type, 00656 - provide \b min_value method that returns a value that is \b strictly \b less than all 00657 other objects of user type, 00658 - \b Note: when using unsigned integral types as key in user types, the value 0 00659 cannot be used as a key value of the data to be sorted because it would 00660 conflict with the sentinel value returned by \b min_value 00661 00662 Example: comparator class \b my_less_int 00663 \verbatim 00664 struct my_less_int 00665 { 00666 bool operator() (int a, int b) const 00667 { 00668 return a < b; 00669 } 00670 int min_value() const { return std::numeric_limits<int>::min(); }; 00671 int max_value() const { return std::numeric_limits<int>::max(); }; 00672 }; 00673 \endverbatim 00674 00675 Example: comparator class \b my_less, could be instantiated as e.g. 00676 \b my_less<int> , \b my_less<unsigned long> , ... 00677 \verbatim 00678 template <typename Tp> 00679 struct my_less 00680 { 00681 typedef Tp value_type; 00682 bool operator() (const value_type & a, const value_type & b) const 00683 { 00684 return a < b; 00685 } 00686 value_type min_value() const { return std::numeric_limits<value_type>::min(); }; 00687 value_type max_value() const { return std::numeric_limits<value_type>::max(); }; 00688 }; 00689 \endverbatim 00690 00691 */ 00692 00693 00694 //! \brief Sort records comparison-based 00695 //! \param first object of model of \c ext_random_access_iterator concept 00696 //! \param last object of model of \c ext_random_access_iterator concept 00697 //! \param cmp comparison object 00698 //! \param M amount of memory for internal use (in bytes) 00699 //! \remark Implements external merge sort described in [Dementiev & Sanders'03] 00700 //! \remark non-stable 00701 template <typename ExtIterator_, typename StrictWeakOrdering_> 00702 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M) 00703 { 00704 sort_helper::verify_sentinel_strict_weak_ordering(cmp); 00705 00706 typedef simple_vector<sort_helper::trigger_entry<typename ExtIterator_::block_type> > run_type; 00707 00708 typedef typename ExtIterator_::vector_type::value_type value_type; 00709 typedef typename ExtIterator_::block_type block_type; 00710 00711 unsigned_type n = 0; 00712 00713 block_manager * mng = block_manager::get_instance(); 00714 00715 first.flush(); 00716 00717 if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M) 00718 { 00719 stl_in_memory_sort(first, last, cmp); 00720 } 00721 else 00722 { 00723 if (!(2 * block_type::raw_size * sort_memory_usage_factor() <= M)) { 00724 throw bad_parameter("stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'"); 00725 } 00726 00727 if (first.block_offset()) 00728 { 00729 if (last.block_offset()) // first and last element are 00730 // not the first elements of their block 00731 { 00732 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00733 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00734 typename ExtIterator_::bid_type first_bid, last_bid; 00735 request_ptr req; 00736 00737 req = first_block->read(*first.bid()); 00738 mng->new_block(FR(), first_bid); // try to overlap 00739 mng->new_block(FR(), last_bid); 00740 req->wait(); 00741 00742 00743 req = last_block->read(*last.bid()); 00744 00745 unsigned_type i = 0; 00746 for ( ; i < first.block_offset(); ++i) 00747 { 00748 first_block->elem[i] = cmp.min_value(); 00749 } 00750 00751 req->wait(); 00752 00753 00754 req = first_block->write(first_bid); 00755 for (i = last.block_offset(); i < block_type::size; ++i) 00756 { 00757 last_block->elem[i] = cmp.max_value(); 00758 } 00759 00760 req->wait(); 00761 00762 00763 req = last_block->write(last_bid); 00764 00765 n = last.bid() - first.bid() + 1; 00766 00767 std::swap(first_bid, *first.bid()); 00768 std::swap(last_bid, *last.bid()); 00769 00770 req->wait(); 00771 00772 00773 delete first_block; 00774 delete last_block; 00775 00776 run_type * out = 00777 sort_local::sort_blocks< 00778 typename ExtIterator_::block_type, 00779 typename ExtIterator_::vector_type::alloc_strategy_type, 00780 typename ExtIterator_::bids_container_iterator> 00781 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00782 00783 00784 first_block = new typename ExtIterator_::block_type; 00785 last_block = new typename ExtIterator_::block_type; 00786 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00787 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 00788 request_ptr * reqs = new request_ptr[2]; 00789 00790 reqs[0] = first_block->read(first_bid); 00791 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00792 00793 reqs[0]->wait(); 00794 reqs[1]->wait(); 00795 00796 reqs[0] = last_block->read(last_bid); 00797 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 00798 00799 for (i = first.block_offset(); i < block_type::size; i++) 00800 { 00801 first_block->elem[i] = sorted_first_block->elem[i]; 00802 } 00803 00804 reqs[0]->wait(); 00805 reqs[1]->wait(); 00806 00807 req = first_block->write(first_bid); 00808 00809 for (i = 0; i < last.block_offset(); ++i) 00810 { 00811 last_block->elem[i] = sorted_last_block->elem[i]; 00812 } 00813 00814 req->wait(); 00815 00816 req = last_block->write(last_bid); 00817 00818 mng->delete_block(out->begin()->bid); 00819 mng->delete_block((*out)[out->size() - 1].bid); 00820 00821 *first.bid() = first_bid; 00822 *last.bid() = last_bid; 00823 00824 typename run_type::iterator it = out->begin(); 00825 ++it; 00826 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00827 ++cur_bid; 00828 00829 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00830 { 00831 *cur_bid = (*it).bid; 00832 } 00833 00834 delete first_block; 00835 delete sorted_first_block; 00836 delete sorted_last_block; 00837 delete[] reqs; 00838 delete out; 00839 00840 req->wait(); 00841 00842 00843 delete last_block; 00844 } 00845 else 00846 { 00847 // first element is 00848 // not the first element of its block 00849 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00850 typename ExtIterator_::bid_type first_bid; 00851 request_ptr req; 00852 00853 req = first_block->read(*first.bid()); 00854 mng->new_block(FR(), first_bid); // try to overlap 00855 req->wait(); 00856 00857 00858 unsigned_type i = 0; 00859 for ( ; i < first.block_offset(); ++i) 00860 { 00861 first_block->elem[i] = cmp.min_value(); 00862 } 00863 00864 req = first_block->write(first_bid); 00865 00866 n = last.bid() - first.bid(); 00867 00868 std::swap(first_bid, *first.bid()); 00869 00870 req->wait(); 00871 00872 00873 delete first_block; 00874 00875 run_type * out = 00876 sort_local::sort_blocks< 00877 typename ExtIterator_::block_type, 00878 typename ExtIterator_::vector_type::alloc_strategy_type, 00879 typename ExtIterator_::bids_container_iterator> 00880 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00881 00882 00883 first_block = new typename ExtIterator_::block_type; 00884 00885 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00886 00887 request_ptr * reqs = new request_ptr[2]; 00888 00889 reqs[0] = first_block->read(first_bid); 00890 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00891 00892 reqs[0]->wait(); 00893 reqs[1]->wait(); 00894 00895 for (i = first.block_offset(); i < block_type::size; ++i) 00896 { 00897 first_block->elem[i] = sorted_first_block->elem[i]; 00898 } 00899 00900 req = first_block->write(first_bid); 00901 00902 mng->delete_block(out->begin()->bid); 00903 00904 *first.bid() = first_bid; 00905 00906 typename run_type::iterator it = out->begin(); 00907 ++it; 00908 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00909 ++cur_bid; 00910 00911 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00912 { 00913 *cur_bid = (*it).bid; 00914 } 00915 00916 *cur_bid = (*it).bid; 00917 00918 delete sorted_first_block; 00919 delete[] reqs; 00920 delete out; 00921 00922 req->wait(); 00923 00924 delete first_block; 00925 } 00926 } 00927 else 00928 { 00929 if (last.block_offset()) // last is 00930 // not the first element of its block 00931 { 00932 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00933 typename ExtIterator_::bid_type last_bid; 00934 request_ptr req; 00935 unsigned_type i; 00936 00937 req = last_block->read(*last.bid()); 00938 mng->new_block(FR(), last_bid); 00939 req->wait(); 00940 00941 00942 for (i = last.block_offset(); i < block_type::size; ++i) 00943 { 00944 last_block->elem[i] = cmp.max_value(); 00945 } 00946 00947 req = last_block->write(last_bid); 00948 00949 n = last.bid() - first.bid() + 1; 00950 00951 std::swap(last_bid, *last.bid()); 00952 00953 req->wait(); 00954 00955 00956 delete last_block; 00957 00958 run_type * out = 00959 sort_local::sort_blocks< 00960 typename ExtIterator_::block_type, 00961 typename ExtIterator_::vector_type::alloc_strategy_type, 00962 typename ExtIterator_::bids_container_iterator> 00963 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00964 00965 00966 last_block = new typename ExtIterator_::block_type; 00967 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 00968 request_ptr * reqs = new request_ptr[2]; 00969 00970 reqs[0] = last_block->read(last_bid); 00971 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 00972 00973 reqs[0]->wait(); 00974 reqs[1]->wait(); 00975 00976 for (i = 0; i < last.block_offset(); ++i) 00977 { 00978 last_block->elem[i] = sorted_last_block->elem[i]; 00979 } 00980 00981 req = last_block->write(last_bid); 00982 00983 mng->delete_block((*out)[out->size() - 1].bid); 00984 00985 *last.bid() = last_bid; 00986 00987 typename run_type::iterator it = out->begin(); 00988 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00989 00990 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00991 { 00992 *cur_bid = (*it).bid; 00993 } 00994 00995 delete sorted_last_block; 00996 delete[] reqs; 00997 delete out; 00998 00999 req->wait(); 01000 01001 delete last_block; 01002 } 01003 else 01004 { 01005 // first and last element are first elements of their of blocks 01006 n = last.bid() - first.bid(); 01007 01008 run_type * out = 01009 sort_local::sort_blocks<typename ExtIterator_::block_type, 01010 typename ExtIterator_::vector_type::alloc_strategy_type, 01011 typename ExtIterator_::bids_container_iterator> 01012 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 01013 01014 typename run_type::iterator it = out->begin(); 01015 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 01016 01017 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 01018 { 01019 *cur_bid = (*it).bid; 01020 } 01021 01022 delete out; 01023 } 01024 } 01025 } 01026 01027 #if STXXL_CHECK_ORDER_IN_SORTS 01028 typedef typename ExtIterator_::const_iterator const_iterator; 01029 assert(stxxl::is_sorted(const_iterator(first), const_iterator(last), cmp)); 01030 #endif 01031 } 01032 01033 //! \} 01034 01035 __STXXL_END_NAMESPACE 01036 01037 #endif // !STXXL_SORT_HEADER 01038 // vim: et:ts=4:sw=4