Stxxl  1.4.0
include/stxxl/bits/algo/sort.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/algo/sort.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
00008  *  Copyright (C) 2008-2011 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_HEADER
00016 #define STXXL_SORT_HEADER
00017 
00018 #include <functional>
00019 
00020 #include <stxxl/bits/mng/mng.h>
00021 #include <stxxl/bits/common/rand.h>
00022 #include <stxxl/bits/mng/adaptor.h>
00023 #include <stxxl/bits/common/simple_vector.h>
00024 #include <stxxl/bits/common/settings.h>
00025 #include <stxxl/bits/mng/block_alloc_interleaved.h>
00026 #include <stxxl/bits/io/request_operations.h>
00027 #include <stxxl/bits/algo/sort_base.h>
00028 #include <stxxl/bits/algo/sort_helper.h>
00029 #include <stxxl/bits/algo/intksort.h>
00030 #include <stxxl/bits/algo/adaptor.h>
00031 #include <stxxl/bits/algo/async_schedule.h>
00032 #include <stxxl/bits/mng/block_prefetcher.h>
00033 #include <stxxl/bits/mng/buf_writer.h>
00034 #include <stxxl/bits/algo/run_cursor.h>
00035 #include <stxxl/bits/algo/losertree.h>
00036 #include <stxxl/bits/algo/inmemsort.h>
00037 #include <stxxl/bits/parallel.h>
00038 #include <stxxl/bits/common/is_sorted.h>
00039 
00040 
00041 __STXXL_BEGIN_NAMESPACE
00042 
00043 //! \addtogroup stlalgo
00044 //! \{
00045 
00046 
00047 /*! \internal
00048  */
00049 namespace sort_local
00050 {
00051     template <typename block_type, typename bid_type>
00052     struct read_next_after_write_completed
00053     {
00054         block_type * block;
00055         bid_type bid;
00056         request_ptr * req;
00057         void operator () (request * /*completed_req*/)
00058         {
00059             *req = block->read(bid);
00060         }
00061     };
00062 
00063 
00064     template <
00065         typename block_type,
00066         typename run_type,
00067         typename input_bid_iterator,
00068         typename value_cmp>
00069     void
00070     create_runs(
00071         input_bid_iterator it,
00072         run_type ** runs,
00073         int_type nruns,
00074         int_type _m,
00075         value_cmp cmp)
00076     {
00077         typedef typename block_type::value_type type;
00078         typedef typename block_type::bid_type bid_type;
00079         STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
00080 
00081         int_type m2 = _m / 2;
00082         block_manager * bm = block_manager::get_instance();
00083         block_type * Blocks1 = new block_type[m2];
00084         block_type * Blocks2 = new block_type[m2];
00085         bid_type * bids1 = new bid_type[m2];
00086         bid_type * bids2 = new bid_type[m2];
00087         request_ptr * read_reqs1 = new request_ptr[m2];
00088         request_ptr * read_reqs2 = new request_ptr[m2];
00089         request_ptr * write_reqs = new request_ptr[m2];
00090         read_next_after_write_completed<block_type, bid_type> * next_run_reads =
00091             new read_next_after_write_completed<block_type, bid_type>[m2];
00092 
00093         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00094 
00095         int_type i;
00096         int_type run_size = 0;
00097 
00098         assert(nruns >= 2);
00099 
00100         run_size = runs[0]->size();
00101         assert(run_size == m2);
00102 
00103         for (i = 0; i < run_size; ++i)
00104         {
00105             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
00106             bids1[i] = *(it++);
00107             read_reqs1[i] = Blocks1[i].read(bids1[i]);
00108         }
00109 
00110         run_size = runs[1]->size();
00111 
00112         for (i = 0; i < run_size; ++i)
00113         {
00114             STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
00115             bids2[i] = *(it++);
00116             read_reqs2[i] = Blocks2[i].read(bids2[i]);
00117         }
00118 
00119         for (int_type k = 0; k < nruns - 1; ++k)
00120         {
00121             run_type * run = runs[k];
00122             run_size = run->size();
00123             assert(run_size == m2);
00124             #ifndef NDEBUG
00125             int_type next_run_size = runs[k + 1]->size();
00126             #endif
00127             assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
00128 
00129             STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00130             wait_all(read_reqs1, run_size);
00131             STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00132             for (i = 0; i < run_size; ++i)
00133                 bm->delete_block(bids1[i]);
00134 
00135             check_sort_settings();
00136             potentially_parallel::
00137             sort(make_element_iterator(Blocks1, 0),
00138                  make_element_iterator(Blocks1, run_size * block_type::size),
00139                  cmp);
00140 
00141             STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00142             if (k > 0)
00143                 wait_all(write_reqs, m2);
00144             STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00145 
00146             int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
00147             for (i = 0; i < m2; ++i)
00148             {
00149                 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00150                 (*run)[i].value = Blocks1[i][0];
00151                 if (i >= runplus2size) {
00152                     write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00153                 }
00154                 else
00155                 {
00156                     next_run_reads[i].block = Blocks1 + i;
00157                     next_run_reads[i].req = read_reqs1 + i;
00158                     bids1[i] = next_run_reads[i].bid = *(it++);
00159                     write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
00160                 }
00161             }
00162             std::swap(Blocks1, Blocks2);
00163             std::swap(bids1, bids2);
00164             std::swap(read_reqs1, read_reqs2);
00165         }
00166 
00167         run_type * run = runs[nruns - 1];
00168         run_size = run->size();
00169         STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
00170         wait_all(read_reqs1, run_size);
00171         STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
00172         for (i = 0; i < run_size; ++i)
00173             bm->delete_block(bids1[i]);
00174 
00175         check_sort_settings();
00176         potentially_parallel::
00177         sort(make_element_iterator(Blocks1, 0),
00178              make_element_iterator(Blocks1, run_size * block_type::size),
00179              cmp);
00180 
00181         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00182         wait_all(write_reqs, m2);
00183         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00184 
00185         for (i = 0; i < run_size; ++i)
00186         {
00187             STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
00188             (*run)[i].value = Blocks1[i][0];
00189             write_reqs[i] = Blocks1[i].write((*run)[i].bid);
00190         }
00191 
00192         STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
00193         wait_all(write_reqs, run_size);
00194         STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
00195 
00196         delete[] Blocks1;
00197         delete[] Blocks2;
00198         delete[] bids1;
00199         delete[] bids2;
00200         delete[] read_reqs1;
00201         delete[] read_reqs2;
00202         delete[] write_reqs;
00203         delete[] next_run_reads;
00204     }
00205 
00206 
00207     template <typename block_type, typename run_type, typename value_cmp>
00208     bool check_sorted_runs(run_type ** runs,
00209                            unsigned_type nruns,
00210                            unsigned_type m,
00211                            value_cmp cmp)
00212     {
00213         typedef typename block_type::value_type value_type;
00214 
00215         STXXL_MSG("check_sorted_runs  Runs: " << nruns);
00216         unsigned_type irun = 0;
00217         for (irun = 0; irun < nruns; ++irun)
00218         {
00219             const unsigned_type nblocks_per_run = runs[irun]->size();
00220             unsigned_type blocks_left = nblocks_per_run;
00221             block_type * blocks = new block_type[m];
00222             request_ptr * reqs = new request_ptr[m];
00223             value_type last = cmp.min_value();
00224 
00225             for (unsigned_type off = 0; off < nblocks_per_run; off += m)
00226             {
00227                 const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
00228                 const unsigned_type nelements = nblocks * block_type::size;
00229                 blocks_left -= nblocks;
00230 
00231                 for (unsigned_type j = 0; j < nblocks; ++j)
00232                 {
00233                     reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
00234                 }
00235                 wait_all(reqs, reqs + nblocks);
00236 
00237                 if (off && cmp(blocks[0][0], last))
00238                 {
00239                     STXXL_MSG("check_sorted_runs  wrong first value in the run " << irun);
00240                     STXXL_MSG(" first value: " << blocks[0][0]);
00241                     STXXL_MSG(" last  value: " << last);
00242                     for (unsigned_type k = 0; k < block_type::size; ++k)
00243                         STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
00244 
00245                     return false;
00246                 }
00247 
00248                 for (unsigned_type j = 0; j < nblocks; ++j)
00249                 {
00250                     if (!(blocks[j][0] == (*runs[irun])[j + off].value))
00251                     {
00252                         STXXL_MSG("check_sorted_runs  wrong trigger in the run " << irun << " block " << (j + off));
00253                         STXXL_MSG("                   trigger value: " << (*runs[irun])[j + off].value);
00254                         STXXL_MSG("Data in the block:");
00255                         for (unsigned_type k = 0; k < block_type::size; ++k)
00256                             STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
00257 
00258                         STXXL_MSG("BIDS:");
00259                         for (unsigned_type k = 0; k < nblocks; ++k)
00260                         {
00261                             if (k == j)
00262                                 STXXL_MSG("Bad one comes next.");
00263                             STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00264                         }
00265 
00266                         return false;
00267                     }
00268                 }
00269                 if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00270                                       make_element_iterator(blocks, nelements),
00271                                       cmp))
00272                 {
00273                     STXXL_MSG("check_sorted_runs  wrong order in the run " << irun);
00274                     STXXL_MSG("Data in blocks:");
00275                     for (unsigned_type j = 0; j < nblocks; ++j)
00276                     {
00277                         for (unsigned_type k = 0; k < block_type::size; ++k)
00278                             STXXL_MSG("     Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
00279                     }
00280                     STXXL_MSG("BIDS:");
00281                     for (unsigned_type k = 0; k < nblocks; ++k)
00282                     {
00283                         STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
00284                     }
00285 
00286                     return false;
00287                 }
00288 
00289                 last = blocks[nblocks - 1][block_type::size - 1];
00290             }
00291 
00292             assert(blocks_left == 0);
00293             delete[] reqs;
00294             delete[] blocks;
00295         }
00296 
00297         return true;
00298     }
00299 
00300 
00301     template <typename block_type, typename run_type, typename value_cmp>
00302     void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp)
00303     {
00304         typedef typename block_type::value_type value_type;
00305         typedef typename run_type::value_type trigger_entry_type;
00306         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00307         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00308         typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00309 
00310         run_type consume_seq(out_run->size());
00311 
00312         int_type * prefetch_seq = new int_type[out_run->size()];
00313 
00314         typename run_type::iterator copy_start = consume_seq.begin();
00315         for (int_type i = 0; i < nruns; i++)
00316         {
00317             // TODO: try to avoid copy
00318             copy_start = std::copy(
00319                 in_runs[i]->begin(),
00320                 in_runs[i]->end(),
00321                 copy_start);
00322         }
00323 
00324         std::stable_sort(consume_seq.begin(), consume_seq.end(),
00325                          sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
00326 
00327         int_type disks_number = config::get_instance()->disks_number();
00328 
00329 #ifdef PLAY_WITH_OPT_PREF
00330         const int_type n_write_buffers = 4 * disks_number;
00331 #else
00332         const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
00333         const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
00334  #if STXXL_SORT_OPTIMAL_PREFETCHING
00335         // heuristic
00336         const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
00337  #endif
00338 #endif
00339 
00340 #if STXXL_SORT_OPTIMAL_PREFETCHING
00341         compute_prefetch_schedule(
00342             consume_seq,
00343             prefetch_seq,
00344             n_opt_prefetch_buffers,
00345             disks_number);
00346 #else
00347         for (unsigned_type i = 0; i < out_run->size(); i++)
00348             prefetch_seq[i] = i;
00349 
00350 #endif
00351 
00352         prefetcher_type prefetcher(consume_seq.begin(),
00353                                    consume_seq.end(),
00354                                    prefetch_seq,
00355                                    nruns + n_prefetch_buffers);
00356 
00357         buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
00358 
00359         int_type out_run_size = out_run->size();
00360 
00361         block_type * out_buffer = writer.get_free_block();
00362 
00363 //If parallelism is activated, one can still fall back to the
00364 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway.
00365 
00366         if (do_parallel_merge())
00367         {
00368 #if STXXL_PARALLEL_MULTIWAY_MERGE
00369 
00370 // begin of STL-style merging
00371 
00372             typedef stxxl::int64 diff_type;
00373             typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00374             typedef typename std::vector<sequence>::size_type seqs_size_type;
00375             std::vector<sequence> seqs(nruns);
00376             std::vector<block_type *> buffers(nruns);
00377 
00378             for (int_type i = 0; i < nruns; i++)                // initialize sequences
00379             {
00380                 buffers[i] = prefetcher.pull_block();           // get first block of each run
00381                 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
00382                 // this memory location stays the same, only the data is exchanged
00383             }
00384 
00385  #if STXXL_CHECK_ORDER_IN_SORTS
00386             value_type last_elem = cmp.min_value();
00387  #endif
00388             diff_type num_currently_mergeable = 0;
00389 
00390             for (int_type j = 0; j < out_run_size; ++j)                 // for the whole output run, out_run_size is in blocks
00391             {
00392                 diff_type rest = block_type::size;                      // elements still to merge for this output block
00393 
00394                 STXXL_VERBOSE1("output block " << j);
00395                 do {
00396                     if (num_currently_mergeable < rest)
00397                     {
00398                         if (prefetcher.empty())
00399                         {
00400                             // anything remaining is already in memory
00401                             num_currently_mergeable = (out_run_size - j) * block_type::size
00402                                                       - (block_type::size - rest);
00403                         }
00404                         else
00405                         {
00406                             num_currently_mergeable = sort_helper::count_elements_less_equal(
00407                                 seqs, consume_seq[prefetcher.pos()].value, cmp);
00408                         }
00409                     }
00410 
00411                     diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);   // at most rest elements
00412 
00413                     STXXL_VERBOSE1("before merge " << output_size);
00414 
00415                     stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
00416                     // sequence iterators are progressed appropriately
00417 
00418                     rest -= output_size;
00419                     num_currently_mergeable -= output_size;
00420 
00421                     STXXL_VERBOSE1("after merge");
00422 
00423                     sort_helper::refill_or_remove_empty_sequences(seqs, buffers, prefetcher);
00424                 } while (rest > 0 && seqs.size() > 0);
00425 
00426  #if STXXL_CHECK_ORDER_IN_SORTS
00427                 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
00428                 {
00429                     for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
00430                         if (cmp(*i, *(i - 1)))
00431                         {
00432                             STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
00433                         }
00434                     assert(false);
00435                 }
00436 
00437                 if (j > 0) // do not check in first iteration
00438                     assert(cmp((*out_buffer)[0], last_elem) == false);
00439 
00440                 last_elem = (*out_buffer)[block_type::size - 1];
00441  #endif
00442 
00443                 (*out_run)[j].value = (*out_buffer)[0];                              // save smallest value
00444 
00445                 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
00446             }
00447 
00448 // end of STL-style merging
00449 
00450 #else
00451             STXXL_THROW_UNREACHABLE();
00452 #endif
00453         }
00454         else
00455         {
00456 // begin of native merging procedure
00457 
00458             loser_tree<run_cursor_type, run_cursor2_cmp_type>
00459             losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
00460 
00461 #if STXXL_CHECK_ORDER_IN_SORTS
00462             value_type last_elem = cmp.min_value();
00463 #endif
00464 
00465             for (int_type i = 0; i < out_run_size; ++i)
00466             {
00467                 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
00468                 (*out_run)[i].value = *(out_buffer->elem);
00469 
00470 #if STXXL_CHECK_ORDER_IN_SORTS
00471                 assert(stxxl::is_sorted(
00472                            out_buffer->begin(),
00473                            out_buffer->end(),
00474                            cmp));
00475 
00476                 if (i)
00477                     assert(cmp(*(out_buffer->elem), last_elem) == false);
00478 
00479                 last_elem = (*out_buffer).elem[block_type::size - 1];
00480 #endif
00481 
00482                 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
00483             }
00484 
00485 // end of native merging procedure
00486         }
00487 
00488         delete[] prefetch_seq;
00489 
00490         block_manager * bm = block_manager::get_instance();
00491         for (int_type i = 0; i < nruns; ++i)
00492         {
00493             unsigned_type sz = in_runs[i]->size();
00494             for (unsigned_type j = 0; j < sz; ++j)
00495                 bm->delete_block((*in_runs[i])[j].bid);
00496 
00497 
00498             delete in_runs[i];
00499         }
00500     }
00501 
00502 
00503     template <typename block_type,
00504               typename alloc_strategy,
00505               typename input_bid_iterator,
00506               typename value_cmp>
00507     simple_vector<sort_helper::trigger_entry<block_type> > *
00508     sort_blocks(input_bid_iterator input_bids,
00509                 unsigned_type _n,
00510                 unsigned_type _m,
00511                 value_cmp cmp
00512                 )
00513     {
00514         typedef typename block_type::value_type type;
00515         typedef typename block_type::bid_type bid_type;
00516         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00517         typedef simple_vector<trigger_entry_type> run_type;
00518         typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
00519 
00520         unsigned_type m2 = _m / 2;
00521         unsigned_type full_runs = _n / m2;
00522         unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
00523         unsigned_type nruns = full_runs + partial_runs;
00524         unsigned_type i;
00525 
00526         block_manager * mng = block_manager::get_instance();
00527 
00528         //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
00529 
00530         double begin = timestamp(), after_runs_creation, end;
00531 
00532         run_type ** runs = new run_type *[nruns];
00533 
00534         for (i = 0; i < full_runs; i++)
00535             runs[i] = new run_type(m2);
00536 
00537 
00538         if (partial_runs)
00539             runs[i] = new run_type(_n - full_runs * m2);
00540 
00541         for (i = 0; i < nruns; ++i)
00542             mng->new_blocks(alloc_strategy(), make_bid_iterator(runs[i]->begin()), make_bid_iterator(runs[i]->end()));
00543 
00544         sort_local::create_runs<block_type,
00545                                 run_type,
00546                                 input_bid_iterator,
00547                                 value_cmp>(input_bids, runs, nruns, _m, cmp);
00548 
00549         after_runs_creation = timestamp();
00550 
00551         double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
00552 
00553         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00554 
00555         const int_type merge_factor = optimal_merge_factor(nruns, _m);
00556         run_type ** new_runs;
00557 
00558         while (nruns > 1)
00559         {
00560             int_type new_nruns = div_ceil(nruns, merge_factor);
00561             STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
00562                           " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
00563 
00564             new_runs = new run_type *[new_nruns];
00565 
00566             int_type runs_left = nruns;
00567             int_type cur_out_run = 0;
00568             int_type blocks_in_new_run = 0;
00569 
00570             while (runs_left > 0)
00571             {
00572                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00573                 blocks_in_new_run = 0;
00574                 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
00575                     blocks_in_new_run += runs[i]->size();
00576 
00577                 // allocate run
00578                 new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
00579                 runs_left -= runs2merge;
00580             }
00581             // allocate blocks for the new runs
00582             if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && !input_bids->is_managed())
00583             {
00584                 // if we sort a file we can reuse the input bids for the output
00585                 input_bid_iterator cur = input_bids;
00586                 for (int_type i = 0; cur != (input_bids + _n); ++cur)
00587                 {
00588                     (*new_runs[0])[i++].bid = *cur;
00589                 }
00590 
00591                 bid_type & firstBID = (*new_runs[0])[0].bid;
00592                 if (firstBID.is_managed())
00593                 {
00594                     // the first block does not belong to the file
00595                     // need to reallocate it
00596                     mng->new_block(FR(), firstBID);
00597                 }
00598                 bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
00599                 if (lastBID.is_managed())
00600                 {
00601                     // the first block does not belong to the file
00602                     // need to reallocate it
00603                     mng->new_block(FR(), lastBID);
00604                 }
00605             }
00606             else
00607             {
00608                 mng->new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
00609                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
00610                                 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
00611             }
00612             // merge all
00613             runs_left = nruns;
00614             cur_out_run = 0;
00615             while (runs_left > 0)
00616             {
00617                 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
00618 #if STXXL_CHECK_ORDER_IN_SORTS
00619                 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
00620 #endif
00621                 STXXL_VERBOSE("Merging " << runs2merge << " runs");
00622                 merge_runs<block_type, run_type>(runs + nruns - runs_left,
00623                                                  runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
00624                                                  );
00625                 runs_left -= runs2merge;
00626             }
00627 
00628             nruns = new_nruns;
00629             delete[] runs;
00630             runs = new_runs;
00631         }
00632 
00633         run_type * result = *runs;
00634         delete[] runs;
00635 
00636         end = timestamp();
00637 
00638         STXXL_VERBOSE("Elapsed time        : " << end - begin << " s. Run creation time: " <<
00639                       after_runs_creation - begin << " s");
00640         STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
00641         STXXL_VERBOSE(*stats::get_instance());
00642         STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
00643 
00644         return result;
00645     }
00646 }
00647 
00648 
00649 /*! \page comparison Comparison concept
00650 
00651    Model of \b Comparison concept must:
00652    - provide \b operator(a,b) that returns comparison result of two user types,
00653      must define strict weak ordering
00654    - provide \b max_value method that returns a value that is \b strictly \b greater than all
00655    other objects of user type,
00656    - provide \b min_value method that returns a value that is \b strictly \b less than all
00657    other objects of user type,
00658    - \b Note: when using unsigned integral types as key in user types, the value 0
00659    cannot be used as a key value of the data to be sorted because it would
00660    conflict with the sentinel value returned by \b min_value
00661 
00662    Example: comparator class \b my_less_int
00663  \verbatim
00664    struct my_less_int
00665    {
00666     bool operator() (int a, int b) const
00667     {
00668         return a < b;
00669     }
00670     int min_value() const { return std::numeric_limits<int>::min(); };
00671     int max_value() const { return std::numeric_limits<int>::max(); };
00672    };
00673  \endverbatim
00674 
00675    Example: comparator class \b my_less, could be instantiated as e.g.
00676       \b my_less<int> , \b my_less<unsigned long> , ...
00677  \verbatim
00678    template <typename Tp>
00679    struct my_less
00680    {
00681     typedef Tp value_type;
00682     bool operator() (const value_type & a, const value_type & b) const
00683     {
00684         return a < b;
00685     }
00686     value_type min_value() const { return std::numeric_limits<value_type>::min(); };
00687     value_type max_value() const { return std::numeric_limits<value_type>::max(); };
00688    };
00689  \endverbatim
00690 
00691  */
00692 
00693 
00694 //! \brief Sort records comparison-based
00695 //! \param first object of model of \c ext_random_access_iterator concept
00696 //! \param last object of model of \c ext_random_access_iterator concept
00697 //! \param cmp comparison object
00698 //! \param M amount of memory for internal use (in bytes)
00699 //! \remark Implements external merge sort described in [Dementiev & Sanders'03]
00700 //! \remark non-stable
00701 template <typename ExtIterator_, typename StrictWeakOrdering_>
00702 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
00703 {
00704     sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00705 
00706     typedef simple_vector<sort_helper::trigger_entry<typename ExtIterator_::block_type> > run_type;
00707 
00708     typedef typename ExtIterator_::vector_type::value_type value_type;
00709     typedef typename ExtIterator_::block_type block_type;
00710 
00711     unsigned_type n = 0;
00712 
00713     block_manager * mng = block_manager::get_instance();
00714 
00715     first.flush();
00716 
00717     if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
00718     {
00719         stl_in_memory_sort(first, last, cmp);
00720     }
00721     else
00722     {
00723         if (!(2 * block_type::raw_size * sort_memory_usage_factor() <= M)) {
00724             throw bad_parameter("stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
00725         }
00726 
00727         if (first.block_offset())
00728         {
00729             if (last.block_offset())              // first and last element are
00730             // not the first elements of their block
00731             {
00732                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00733                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00734                 typename ExtIterator_::bid_type first_bid, last_bid;
00735                 request_ptr req;
00736 
00737                 req = first_block->read(*first.bid());
00738                 mng->new_block(FR(), first_bid);                // try to overlap
00739                 mng->new_block(FR(), last_bid);
00740                 req->wait();
00741 
00742 
00743                 req = last_block->read(*last.bid());
00744 
00745                 unsigned_type i = 0;
00746                 for ( ; i < first.block_offset(); ++i)
00747                 {
00748                     first_block->elem[i] = cmp.min_value();
00749                 }
00750 
00751                 req->wait();
00752 
00753 
00754                 req = first_block->write(first_bid);
00755                 for (i = last.block_offset(); i < block_type::size; ++i)
00756                 {
00757                     last_block->elem[i] = cmp.max_value();
00758                 }
00759 
00760                 req->wait();
00761 
00762 
00763                 req = last_block->write(last_bid);
00764 
00765                 n = last.bid() - first.bid() + 1;
00766 
00767                 std::swap(first_bid, *first.bid());
00768                 std::swap(last_bid, *last.bid());
00769 
00770                 req->wait();
00771 
00772 
00773                 delete first_block;
00774                 delete last_block;
00775 
00776                 run_type * out =
00777                     sort_local::sort_blocks<
00778                         typename ExtIterator_::block_type,
00779                         typename ExtIterator_::vector_type::alloc_strategy_type,
00780                         typename ExtIterator_::bids_container_iterator>
00781                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00782 
00783 
00784                 first_block = new typename ExtIterator_::block_type;
00785                 last_block = new typename ExtIterator_::block_type;
00786                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00787                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00788                 request_ptr * reqs = new request_ptr[2];
00789 
00790                 reqs[0] = first_block->read(first_bid);
00791                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00792 
00793                 reqs[0]->wait();
00794                 reqs[1]->wait();
00795 
00796                 reqs[0] = last_block->read(last_bid);
00797                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00798 
00799                 for (i = first.block_offset(); i < block_type::size; i++)
00800                 {
00801                     first_block->elem[i] = sorted_first_block->elem[i];
00802                 }
00803 
00804                 reqs[0]->wait();
00805                 reqs[1]->wait();
00806 
00807                 req = first_block->write(first_bid);
00808 
00809                 for (i = 0; i < last.block_offset(); ++i)
00810                 {
00811                     last_block->elem[i] = sorted_last_block->elem[i];
00812                 }
00813 
00814                 req->wait();
00815 
00816                 req = last_block->write(last_bid);
00817 
00818                 mng->delete_block(out->begin()->bid);
00819                 mng->delete_block((*out)[out->size() - 1].bid);
00820 
00821                 *first.bid() = first_bid;
00822                 *last.bid() = last_bid;
00823 
00824                 typename run_type::iterator it = out->begin();
00825                 ++it;
00826                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00827                 ++cur_bid;
00828 
00829                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00830                 {
00831                     *cur_bid = (*it).bid;
00832                 }
00833 
00834                 delete first_block;
00835                 delete sorted_first_block;
00836                 delete sorted_last_block;
00837                 delete[] reqs;
00838                 delete out;
00839 
00840                 req->wait();
00841 
00842 
00843                 delete last_block;
00844             }
00845             else
00846             {
00847                 // first element is
00848                 // not the first element of its block
00849                 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
00850                 typename ExtIterator_::bid_type first_bid;
00851                 request_ptr req;
00852 
00853                 req = first_block->read(*first.bid());
00854                 mng->new_block(FR(), first_bid);                // try to overlap
00855                 req->wait();
00856 
00857 
00858                 unsigned_type i = 0;
00859                 for ( ; i < first.block_offset(); ++i)
00860                 {
00861                     first_block->elem[i] = cmp.min_value();
00862                 }
00863 
00864                 req = first_block->write(first_bid);
00865 
00866                 n = last.bid() - first.bid();
00867 
00868                 std::swap(first_bid, *first.bid());
00869 
00870                 req->wait();
00871 
00872 
00873                 delete first_block;
00874 
00875                 run_type * out =
00876                     sort_local::sort_blocks<
00877                         typename ExtIterator_::block_type,
00878                         typename ExtIterator_::vector_type::alloc_strategy_type,
00879                         typename ExtIterator_::bids_container_iterator>
00880                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00881 
00882 
00883                 first_block = new typename ExtIterator_::block_type;
00884 
00885                 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
00886 
00887                 request_ptr * reqs = new request_ptr[2];
00888 
00889                 reqs[0] = first_block->read(first_bid);
00890                 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
00891 
00892                 reqs[0]->wait();
00893                 reqs[1]->wait();
00894 
00895                 for (i = first.block_offset(); i < block_type::size; ++i)
00896                 {
00897                     first_block->elem[i] = sorted_first_block->elem[i];
00898                 }
00899 
00900                 req = first_block->write(first_bid);
00901 
00902                 mng->delete_block(out->begin()->bid);
00903 
00904                 *first.bid() = first_bid;
00905 
00906                 typename run_type::iterator it = out->begin();
00907                 ++it;
00908                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00909                 ++cur_bid;
00910 
00911                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00912                 {
00913                     *cur_bid = (*it).bid;
00914                 }
00915 
00916                 *cur_bid = (*it).bid;
00917 
00918                 delete sorted_first_block;
00919                 delete[] reqs;
00920                 delete out;
00921 
00922                 req->wait();
00923 
00924                 delete first_block;
00925             }
00926         }
00927         else
00928         {
00929             if (last.block_offset())            // last is
00930             // not the first element of its block
00931             {
00932                 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
00933                 typename ExtIterator_::bid_type last_bid;
00934                 request_ptr req;
00935                 unsigned_type i;
00936 
00937                 req = last_block->read(*last.bid());
00938                 mng->new_block(FR(), last_bid);
00939                 req->wait();
00940 
00941 
00942                 for (i = last.block_offset(); i < block_type::size; ++i)
00943                 {
00944                     last_block->elem[i] = cmp.max_value();
00945                 }
00946 
00947                 req = last_block->write(last_bid);
00948 
00949                 n = last.bid() - first.bid() + 1;
00950 
00951                 std::swap(last_bid, *last.bid());
00952 
00953                 req->wait();
00954 
00955 
00956                 delete last_block;
00957 
00958                 run_type * out =
00959                     sort_local::sort_blocks<
00960                         typename ExtIterator_::block_type,
00961                         typename ExtIterator_::vector_type::alloc_strategy_type,
00962                         typename ExtIterator_::bids_container_iterator>
00963                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
00964 
00965 
00966                 last_block = new typename ExtIterator_::block_type;
00967                 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
00968                 request_ptr * reqs = new request_ptr[2];
00969 
00970                 reqs[0] = last_block->read(last_bid);
00971                 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
00972 
00973                 reqs[0]->wait();
00974                 reqs[1]->wait();
00975 
00976                 for (i = 0; i < last.block_offset(); ++i)
00977                 {
00978                     last_block->elem[i] = sorted_last_block->elem[i];
00979                 }
00980 
00981                 req = last_block->write(last_bid);
00982 
00983                 mng->delete_block((*out)[out->size() - 1].bid);
00984 
00985                 *last.bid() = last_bid;
00986 
00987                 typename run_type::iterator it = out->begin();
00988                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
00989 
00990                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
00991                 {
00992                     *cur_bid = (*it).bid;
00993                 }
00994 
00995                 delete sorted_last_block;
00996                 delete[] reqs;
00997                 delete out;
00998 
00999                 req->wait();
01000 
01001                 delete last_block;
01002             }
01003             else
01004             {
01005                 // first and last element are first elements of their of blocks
01006                 n = last.bid() - first.bid();
01007 
01008                 run_type * out =
01009                     sort_local::sort_blocks<typename ExtIterator_::block_type,
01010                                             typename ExtIterator_::vector_type::alloc_strategy_type,
01011                                             typename ExtIterator_::bids_container_iterator>
01012                         (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
01013 
01014                 typename run_type::iterator it = out->begin();
01015                 typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
01016 
01017                 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
01018                 {
01019                     *cur_bid = (*it).bid;
01020                 }
01021 
01022                 delete out;
01023             }
01024         }
01025     }
01026 
01027 #if STXXL_CHECK_ORDER_IN_SORTS
01028     typedef typename ExtIterator_::const_iterator const_iterator;
01029     assert(stxxl::is_sorted(const_iterator(first), const_iterator(last), cmp));
01030 #endif
01031 }
01032 
01033 //! \}
01034 
01035 __STXXL_END_NAMESPACE
01036 
01037 #endif // !STXXL_SORT_HEADER
01038 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines