Stxxl  1.4.0
include/stxxl/bits/stream/sort_stream.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/stream/sort_stream.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2006-2008 Johannes Singler <singler@ira.uka.de>
00008  *  Copyright (C) 2008-2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_SORT_STREAM_HEADER
00016 #define STXXL_SORT_STREAM_HEADER
00017 
00018 #ifdef STXXL_BOOST_CONFIG
00019  #include <boost/config.hpp>
00020 #endif
00021 
00022 #include <stxxl/bits/stream/stream.h>
00023 #include <stxxl/bits/mng/mng.h>
00024 #include <stxxl/bits/algo/sort_base.h>
00025 #include <stxxl/bits/algo/sort_helper.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/run_cursor.h>
00028 #include <stxxl/bits/algo/losertree.h>
00029 #include <stxxl/bits/stream/sorted_runs.h>
00030 #include <stxxl/bits/counting_ptr.h>
00031 
00032 __STXXL_BEGIN_NAMESPACE
00033 
00034 namespace stream
00035 {
00036     //! \addtogroup streampack Stream package
00037     //! \{
00038 
00039     ////////////////////////////////////////////////////////////////////////
00040     //     CREATE RUNS                                                    //
00041     ////////////////////////////////////////////////////////////////////////
00042 
00043     //! \brief Forms sorted runs of data from a stream
00044     //!
00045     //! \tparam Input_ type of the input stream
00046     //! \tparam CompareType_ type of comparison object used for sorting the runs
00047     //! \tparam BlockSize_ size of blocks used to store the runs (in bytes)
00048     //! \tparam AllocStr_ functor that defines allocation strategy for the runs
00049     template <
00050         class Input_,
00051         class CompareType_,
00052         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00053         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00054     class basic_runs_creator : private noncopyable
00055     {
00056     public:
00057         typedef Input_ input_type;
00058         typedef CompareType_ cmp_type;
00059         static const unsigned block_size = BlockSize_;
00060         typedef AllocStr_ allocation_strategy_type;
00061 
00062     public:
00063         typedef typename Input_::value_type value_type;
00064         typedef typed_block<BlockSize_, value_type> block_type;
00065         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00066         typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type;
00067         typedef typename sorted_runs_data_type::run_type run_type;
00068         typedef counting_ptr<sorted_runs_data_type> sorted_runs_type;
00069 
00070     protected:
00071         Input_ & m_input;               /// reference to the input stream
00072         CompareType_ m_cmp;              /// comparator used to sort block groups
00073 
00074     private:
00075 
00076         sorted_runs_type m_result;      /// stores the result (sorted runs) as smart pointer
00077         unsigned_type m_memsize;        /// memory for internal use in blocks
00078         bool m_result_computed;         /// true iff result is already computed (used in 'result()' method)
00079 
00080         /// Fetch data from input into blocks[first_idx,last_idx).
00081         unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
00082         {
00083             typename element_iterator_traits<block_type>::element_iterator output =
00084                 make_element_iterator(blocks, first_idx);
00085             unsigned_type curr_idx = first_idx;
00086             while (!m_input.empty() && curr_idx != last_idx) {
00087                 *output = *m_input;
00088                 ++m_input;
00089                 ++output;
00090                 ++curr_idx;
00091             }
00092             return curr_idx;
00093         }
00094 
00095         ///  fill the rest of the block with max values
00096         void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00097         {
00098             unsigned_type last_idx = num_blocks * block_type::size;
00099             if (first_idx < last_idx) {
00100                 typename element_iterator_traits<block_type>::element_iterator curr =
00101                     make_element_iterator(blocks, first_idx);
00102                 while (first_idx != last_idx) {
00103                     *curr = m_cmp.max_value();
00104                     ++curr;
00105                     ++first_idx;
00106                 }
00107             }
00108         }
00109 
00110         /// Sort a specific run, contained in a sequences of blocks.
00111         void sort_run(block_type * run, unsigned_type elements)
00112         {
00113             check_sort_settings();
00114             potentially_parallel::sort(make_element_iterator(run, 0),
00115                                        make_element_iterator(run, elements),
00116                                        m_cmp);
00117         }
00118 
00119         void compute_result();
00120 
00121     public:
00122         //! \brief Create the object
00123         //! \param i input stream
00124         //! \param c comparator object
00125         //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
00126         basic_runs_creator(Input_ & input, CompareType_ cmp, unsigned_type memory_to_use)
00127             : m_input(input),
00128               m_cmp(cmp),
00129               m_result(new sorted_runs_data_type),
00130               m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00131               m_result_computed(false)
00132         {
00133             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00134             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00135                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00136             }
00137             assert(m_memsize > 0);
00138         }
00139 
00140         //! \brief Returns the sorted runs object
00141         //! \return Sorted runs object. The result is computed lazily, i.e. on the first call
00142         //! \remark Returned object is intended to be used by \c runs_merger object as input
00143         sorted_runs_type & result()
00144         {
00145             if (!m_result_computed)
00146             {
00147                 compute_result();
00148                 m_result_computed = true;
00149 #ifdef STXXL_PRINT_STAT_AFTER_RF
00150                 STXXL_MSG(*stats::get_instance());
00151 #endif //STXXL_PRINT_STAT_AFTER_RF
00152             }
00153             return m_result;
00154         }
00155     };
00156 
00157     //! \brief Finish the results, i. e. create all runs.
00158     //!
00159     //! This is the main routine of this class.
00160     template <class Input_, class CompareType_, unsigned BlockSize_, class AllocStr_>
00161     void basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_>::compute_result()
00162     {
00163         unsigned_type i = 0;
00164         unsigned_type m2 = m_memsize / 2;
00165         const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
00166         STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
00167         unsigned_type blocks1_length = 0, blocks2_length = 0;
00168         block_type * Blocks1 = NULL;
00169 
00170 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00171         Blocks1 = new block_type[m2 * 2];
00172 #else
00173         // push input element into small_run vector in result until it is full
00174         while (!input.empty() && blocks1_length != block_type::size)
00175         {
00176             m_result->small_run.push_back(*input);
00177             ++input;
00178             ++blocks1_length;
00179         }
00180 
00181         if (blocks1_length == block_type::size && !input.empty())
00182         {
00183             Blocks1 = new block_type[m2 * 2];
00184             std::copy(m_result->small_run.begin(), m_result->small_run.end(), Blocks1[0].begin());
00185             m_result->small_run.clear();
00186         }
00187         else
00188         {
00189             STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00190             m_result->elements = blocks1_length;
00191             check_sort_settings();
00192             potentially_parallel::sort(m_result->small_run.begin(), m_result->small_run.end(), cmp);
00193             return;
00194         }
00195 #endif //STXXL_SMALL_INPUT_PSORT_OPT
00196 
00197         // the first block may be there already, now fetch until memsize is filled.
00198         blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
00199 
00200         // sort first run
00201         sort_run(Blocks1, blocks1_length);
00202 
00203         if (blocks1_length <= block_type::size && m_input.empty())
00204         {
00205             // small input, do not flush it on the disk(s)
00206             STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00207             assert(m_result->small_run.empty());
00208             m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
00209             m_result->elements = blocks1_length;
00210             delete[] Blocks1;
00211             return;
00212         }
00213 
00214         block_type * Blocks2 = Blocks1 + m2;
00215         block_manager * bm = block_manager::get_instance();
00216         request_ptr * write_reqs = new request_ptr[m2];
00217         run_type run;
00218 
00219         unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);  // in blocks
00220         run.resize(cur_run_size);
00221         bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00222 
00223         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00224 
00225         // fill the rest of the last block with max values
00226         fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00227 
00228         for (i = 0; i < cur_run_size; ++i)
00229         {
00230             run[i].value = Blocks1[i][0];
00231             write_reqs[i] = Blocks1[i].write(run[i].bid);
00232         }
00233         m_result->runs.push_back(run);
00234         m_result->runs_sizes.push_back(blocks1_length);
00235         m_result->elements += blocks1_length;
00236 
00237         if (m_input.empty())
00238         {
00239             // return
00240             wait_all(write_reqs, write_reqs + cur_run_size);
00241             delete[] write_reqs;
00242             delete[] Blocks1;
00243             return;
00244         }
00245 
00246         STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00247         blocks2_length = fetch(Blocks2, 0, el_in_run);
00248 
00249         if (m_input.empty())
00250         {
00251             // optimization if the whole set fits into both halves
00252             // (re)sort internally and return
00253             blocks2_length += el_in_run;
00254             sort_run(Blocks1, blocks2_length);  // sort first an second run together
00255             wait_all(write_reqs, write_reqs + cur_run_size);
00256             bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00257 
00258             cur_run_size = div_ceil(blocks2_length, block_type::size);
00259             run.resize(cur_run_size);
00260             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00261 
00262             // fill the rest of the last block with max values
00263             fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
00264 
00265             assert(cur_run_size > m2);
00266 
00267             for (i = 0; i < m2; ++i)
00268             {
00269                 run[i].value = Blocks1[i][0];
00270                 write_reqs[i]->wait();
00271                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00272             }
00273 
00274             request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00275 
00276             for ( ; i < cur_run_size; ++i)
00277             {
00278                 run[i].value = Blocks1[i][0];
00279                 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00280             }
00281 
00282             m_result->runs[0] = run;
00283             m_result->runs_sizes[0] = blocks2_length;
00284             m_result->elements = blocks2_length;
00285 
00286             wait_all(write_reqs, write_reqs + m2);
00287             delete[] write_reqs;
00288             wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00289             delete[] write_reqs1;
00290 
00291             delete[] Blocks1;
00292 
00293             return;
00294         }
00295 
00296         // more than 2 runs can be filled, i. e. the general case
00297 
00298         sort_run(Blocks2, blocks2_length);
00299 
00300         cur_run_size = div_ceil(blocks2_length, block_type::size);  // in blocks
00301         run.resize(cur_run_size);
00302         bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00303 
00304         for (i = 0; i < cur_run_size; ++i)
00305         {
00306             run[i].value = Blocks2[i][0];
00307             write_reqs[i]->wait();
00308             write_reqs[i] = Blocks2[i].write(run[i].bid);
00309         }
00310         assert((blocks2_length % el_in_run) == 0);
00311 
00312         m_result->add_run(run, blocks2_length);
00313 
00314         while (!m_input.empty())
00315         {
00316             blocks1_length = fetch(Blocks1, 0, el_in_run);
00317             sort_run(Blocks1, blocks1_length);
00318             cur_run_size = div_ceil(blocks1_length, block_type::size);  // in blocks
00319             run.resize(cur_run_size);
00320             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00321 
00322             // fill the rest of the last block with max values (occurs only on the last run)
00323             fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00324 
00325             for (i = 0; i < cur_run_size; ++i)
00326             {
00327                 run[i].value = Blocks1[i][0];
00328                 write_reqs[i]->wait();
00329                 write_reqs[i] = Blocks1[i].write(run[i].bid);
00330             }
00331             m_result->add_run(run, blocks1_length);
00332 
00333             std::swap(Blocks1, Blocks2);
00334             std::swap(blocks1_length, blocks2_length);
00335         }
00336 
00337         wait_all(write_reqs, write_reqs + m2);
00338         delete[] write_reqs;
00339         delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00340     }
00341 
00342     //! \brief Forms sorted runs of data from a stream
00343     //!
00344     //! \tparam Input_ type of the input stream
00345     //! \tparam CompareType_ type of omparison object used for sorting the runs
00346     //! \tparam BlockSize_ size of blocks used to store the runs
00347     //! \tparam AllocStr_ functor that defines allocation strategy for the runs
00348     template <
00349         class Input_,
00350         class CompareType_,
00351         unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00352         class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00353     class runs_creator : public basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_>
00354     {
00355     private:
00356         typedef basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> base;
00357 
00358     public:
00359         typedef typename base::cmp_type cmp_type;
00360         typedef typename base::value_type value_type;
00361         typedef typename base::block_type block_type;
00362         typedef typename base::sorted_runs_data_type sorted_runs_data_type;
00363         typedef typename base::sorted_runs_type sorted_runs_type;
00364         
00365     public:
00366         //! \brief Creates the object
00367         //! \param i input stream
00368         //! \param c comparator object
00369         //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
00370         runs_creator(Input_ & input, CompareType_ cmp, unsigned_type memory_to_use)
00371             : base(input, cmp, memory_to_use)
00372         { }
00373     };
00374 
00375 
00376     //! \brief Input strategy for \c runs_creator class
00377     //!
00378     //! This strategy together with \c runs_creator class
00379     //! allows to create sorted runs
00380     //! data structure usable for \c runs_merger
00381     //! pushing elements into the sorter
00382     //! (using runs_creator::push())
00383     template <class ValueType_>
00384     struct use_push
00385     {
00386         typedef ValueType_ value_type;
00387     };
00388 
00389     //! \brief Forms sorted runs of elements passed in push() method
00390     //!
00391     //! A specialization of \c runs_creator that
00392     //! allows to create sorted runs
00393     //! data structure usable for \c runs_merger from
00394     //! elements passed in sorted push() method. <BR>
00395     //! \tparam ValueType_ type of values (parameter for \c use_push strategy)
00396     //! \tparam CompareType_ type of comparison object used for sorting the runs
00397     //! \tparam BlockSize_ size of blocks used to store the runs
00398     //! \tparam AllocStr_ functor that defines allocation strategy for the runs
00399     template <
00400         class ValueType_,
00401         class CompareType_,
00402         unsigned BlockSize_,
00403         class AllocStr_>
00404     class runs_creator<
00405         use_push<ValueType_>,
00406         CompareType_,
00407         BlockSize_,
00408         AllocStr_>
00409         : private noncopyable
00410     {
00411     public:
00412         typedef CompareType_ cmp_type;
00413         typedef ValueType_ value_type;
00414         typedef typed_block<BlockSize_, value_type> block_type;
00415         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00416         typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type;
00417         typedef counting_ptr<sorted_runs_data_type> sorted_runs_type;
00418         typedef sorted_runs_type result_type;
00419 
00420     private:
00421         /// comparator object to sort runs
00422         CompareType_ m_cmp;
00423 
00424         typedef typename sorted_runs_data_type::run_type run_type;
00425 
00426         /// stores the result (sorted runs) in a reference counted object
00427         sorted_runs_type m_result;
00428 
00429         /// memory size in bytes to use
00430         const unsigned_type m_memory_to_use;
00431         
00432         /// memory size in numberr of blocks for internal use
00433         const unsigned_type m_memsize;
00434         
00435         /// m_memsize / 2
00436         const unsigned_type m_m2;
00437 
00438         /// true after the result() method was called for the first time
00439         bool m_result_computed;              
00440 
00441         /// total number of elements in a run
00442         const unsigned_type m_el_in_run;
00443 
00444         /// current number of elements in the run m_blocks1
00445         unsigned_type m_cur_el;
00446 
00447         /// accumulation buffer of size m_m2 blocks, half the available memory size
00448         block_type * m_blocks1;
00449 
00450         /// accumulation buffer that is currently being written to disk
00451         block_type * m_blocks2;
00452 
00453         /// reference to write requests transporting the last accumulation buffer to disk
00454         request_ptr * m_write_reqs;
00455 
00456         /// run object containing block ids of the run being written to disk
00457         run_type run;
00458 
00459     protected:
00460         ///  fill the rest of the block with max values
00461         void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00462         {
00463             unsigned_type last_idx = num_blocks * block_type::size;
00464             if (first_idx < last_idx) {
00465                 typename element_iterator_traits<block_type>::element_iterator curr =
00466                     make_element_iterator(blocks, first_idx);
00467                 while (first_idx != last_idx) {
00468                     *curr = m_cmp.max_value();
00469                     ++curr;
00470                     ++first_idx;
00471                 }
00472             }
00473         }
00474 
00475         /// Sort a specific run, contained in a sequences of blocks.
00476         void sort_run(block_type * run, unsigned_type elements)
00477         {
00478             check_sort_settings();
00479             potentially_parallel::sort(make_element_iterator(run, 0),
00480                                        make_element_iterator(run, elements),
00481                                        m_cmp);
00482         }
00483 
00484         void compute_result()
00485         {
00486             if (m_cur_el == 0)
00487                 return;
00488 
00489             sort_run(m_blocks1, m_cur_el);
00490 
00491             if (m_cur_el <= block_type::size && m_result->elements == 0)
00492             {
00493                 // small input, do not flush it on the disk(s)
00494                 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
00495                 m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
00496                 m_result->elements = m_cur_el;
00497                 return;
00498             }
00499 
00500             const unsigned_type cur_run_size = div_ceil(m_cur_el, block_type::size);     // in blocks
00501             run.resize(cur_run_size);
00502             block_manager * bm = block_manager::get_instance();
00503             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00504 
00505             disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00506 
00507             // fill the rest of the last block with max values
00508             fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
00509 
00510             unsigned_type i = 0;
00511             for ( ; i < cur_run_size; ++i)
00512             {
00513                 run[i].value = m_blocks1[i][0];
00514                 if (m_write_reqs[i].get())
00515                     m_write_reqs[i]->wait();
00516 
00517                 m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
00518             }
00519             m_result->add_run(run, m_cur_el);
00520 
00521             for (i = 0; i < m_m2; ++i)
00522             {
00523                 if (m_write_reqs[i].get())
00524                     m_write_reqs[i]->wait();
00525             }
00526         }
00527 
00528     public:
00529         //! \brief Creates the object
00530         //! \param c comparator object
00531         //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
00532         runs_creator(CompareType_ cmp, unsigned_type memory_to_use) :
00533             m_cmp(cmp),
00534             m_memory_to_use(memory_to_use),
00535             m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00536             m_m2(m_memsize / 2),
00537             m_el_in_run(m_m2 * block_type::size),
00538             m_blocks1(NULL), m_blocks2(NULL),
00539             m_write_reqs(NULL)
00540         {
00541             sort_helper::verify_sentinel_strict_weak_ordering(m_cmp);
00542             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= m_memory_to_use)) {
00543                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00544             }
00545             assert(m_m2 > 0);
00546 
00547             allocate();
00548         }
00549 
00550         ~runs_creator()
00551         {
00552             m_result_computed = 1;
00553             deallocate();
00554         }
00555 
00556         //! \brief Clear current state and remove all items
00557         void clear()
00558         {
00559             if (!m_result)
00560                 m_result = new sorted_runs_data_type;
00561             else
00562                 m_result->clear();
00563 
00564             m_result_computed = false;
00565             m_cur_el = 0;
00566 
00567             for (unsigned_type i = 0; i < m_m2; ++i)
00568             {
00569                 if (m_write_reqs[i].get())
00570                     m_write_reqs[i]->cancel();
00571             }
00572         }
00573 
00574         //! \brief Allocates input buffers and clears result.
00575         void allocate()
00576         {
00577             if (!m_blocks1)
00578             {
00579                 m_blocks1 = new block_type[m_m2 * 2];
00580                 m_blocks2 = m_blocks1 + m_m2;
00581 
00582                 m_write_reqs = new request_ptr[m_m2];
00583             }
00584 
00585             clear();
00586         }
00587 
00588         //! \brief Deallocates input buffers but not the current result.
00589         void deallocate()
00590         {
00591             result();   // finishes result
00592 
00593             if (m_blocks1)
00594             {
00595                 delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
00596                 m_blocks1 = m_blocks2 = NULL;
00597 
00598                 delete[] m_write_reqs;
00599                 m_write_reqs = NULL;
00600             }
00601         }
00602 
00603         //! \brief Adds new element to the sorter
00604         //! \param val value to be added
00605         void push(const value_type & val)
00606         {
00607             assert(m_result_computed == false);
00608             if (LIKELY(m_cur_el < m_el_in_run))
00609             {
00610                 m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
00611                 ++m_cur_el;
00612                 return;
00613             }
00614 
00615             assert(m_el_in_run == m_cur_el);
00616             m_cur_el = 0;
00617 
00618             // sort and store m_blocks1
00619             sort_run(m_blocks1, m_el_in_run);
00620 
00621             const unsigned_type cur_run_blocks = div_ceil(m_el_in_run, block_type::size);    // in blocks
00622             run.resize(cur_run_blocks);
00623             block_manager * bm = block_manager::get_instance();
00624             bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00625 
00626             disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00627 
00628             for (unsigned_type i = 0; i < cur_run_blocks; ++i)
00629             {
00630                 run[i].value = m_blocks1[i][0];
00631                 if (m_write_reqs[i].get())
00632                     m_write_reqs[i]->wait();
00633 
00634                 m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
00635             }
00636 
00637             m_result->add_run(run, m_el_in_run);
00638 
00639             std::swap(m_blocks1, m_blocks2);
00640 
00641             push(val);
00642         }
00643 
00644         //! \brief Returns the sorted runs object
00645         //! \return Sorted runs object.
00646         //! \remark Returned object is intended to be used by \c runs_merger object as input
00647         sorted_runs_type & result()
00648         {
00649             if (!m_result_computed)
00650             {
00651                 compute_result();
00652                 m_result_computed = true;
00653 #ifdef STXXL_PRINT_STAT_AFTER_RF
00654                 STXXL_MSG(*stats::get_instance());
00655 #endif //STXXL_PRINT_STAT_AFTER_RF
00656             }
00657             return m_result;
00658         }
00659 
00660         //! \brief number of items currently inserted.
00661         unsigned_type size() const
00662         {
00663             return m_result->elements + m_cur_el;
00664         }
00665 
00666         //! \brief return comparator object
00667         const cmp_type& cmp() const
00668         {
00669             return m_cmp;
00670         }
00671 
00672         //! \brief return memory size used (in bytes)
00673         unsigned_type memory_used() const
00674         {
00675             return m_memory_to_use;
00676         }
00677     };
00678 
00679 
00680     //! \brief Input strategy for \c runs_creator class
00681     //!
00682     //! This strategy together with \c runs_creator class
00683     //! allows to create sorted runs
00684     //! data structure usable for \c runs_merger from
00685     //! sequences of elements in sorted order
00686     template <class ValueType_>
00687     struct from_sorted_sequences
00688     {
00689         typedef ValueType_ value_type;
00690     };
00691 
00692     //! \brief Forms sorted runs of data taking elements in sorted order (element by element)
00693     //!
00694     //! A specialization of \c runs_creator that
00695     //! allows to create sorted runs
00696     //! data structure usable for \c runs_merger from
00697     //! sequences of elements in sorted order. <BR>
00698     //! \tparam ValueType_ type of values (parameter for \c from_sorted_sequences strategy)
00699     //! \tparam CompareType_ type of comparison object used for sorting the runs
00700     //! \tparam BlockSize_ size of blocks used to store the runs
00701     //! \tparam AllocStr_ functor that defines allocation strategy for the runs
00702     template <
00703         class ValueType_,
00704         class CompareType_,
00705         unsigned BlockSize_,
00706         class AllocStr_>
00707     class runs_creator<
00708         from_sorted_sequences<ValueType_>,
00709         CompareType_,
00710         BlockSize_,
00711         AllocStr_>
00712         : private noncopyable
00713     {
00714     public:
00715         typedef ValueType_ value_type;
00716         typedef typed_block<BlockSize_, value_type> block_type;
00717         typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00718         typedef AllocStr_ alloc_strategy_type;
00719 
00720     public:
00721         typedef CompareType_ cmp_type;
00722         typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type;
00723         typedef counting_ptr<sorted_runs_data_type> sorted_runs_type;
00724         typedef sorted_runs_type result_type;
00725 
00726     private:
00727         typedef typename sorted_runs_data_type::run_type run_type;
00728 
00729         CompareType_ cmp;
00730 
00731         sorted_runs_type result_; // stores the result (sorted runs)
00732         unsigned_type m_;         // memory for internal use in blocks
00733         buffered_writer<block_type> writer;
00734         block_type * cur_block;
00735         unsigned_type offset;
00736         unsigned_type iblock;
00737         unsigned_type irun;
00738         alloc_strategy_type alloc_strategy;  // needs to be reset after each run
00739 
00740     public:
00741         //! \brief Creates the object
00742         //! \param c comparator object
00743         //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes.
00744         //! Recommended value: 2 * block_size * D
00745         runs_creator(CompareType_ c, unsigned_type memory_to_use) :
00746             cmp(c),
00747             result_(new sorted_runs_data_type),
00748             m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00749             writer(m_, m_ / 2),
00750             cur_block(writer.get_free_block()),
00751             offset(0),
00752             iblock(0),
00753             irun(0)
00754         {
00755             sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00756             assert(m_ > 0);
00757             if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00758                 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00759             }
00760         }
00761 
00762         //! \brief Adds new element to the current run
00763         //! \param val value to be added to the current run
00764         void push(const value_type & val)
00765         {
00766             assert(offset < block_type::size);
00767 
00768             (*cur_block)[offset] = val;
00769             ++offset;
00770 
00771             if (offset == block_type::size)
00772             {
00773                 // write current block
00774 
00775                 block_manager * bm = block_manager::get_instance();
00776                 // allocate space for the block
00777                 result_->runs.resize(irun + 1);
00778                 result_->runs[irun].resize(iblock + 1);
00779                 bm->new_blocks(
00780                     alloc_strategy,
00781                     make_bid_iterator(result_->runs[irun].begin() + iblock),
00782                     make_bid_iterator(result_->runs[irun].end()),
00783                     iblock
00784                     );
00785 
00786                 result_->runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00787                 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
00788                 ++iblock;
00789 
00790                 offset = 0;
00791             }
00792 
00793             ++result_->elements;
00794         }
00795 
00796         //! \brief Finishes current run and begins new one
00797         void finish()
00798         {
00799             if (offset == 0 && iblock == 0) // current run is empty
00800                 return;
00801 
00802 
00803             result_->runs_sizes.resize(irun + 1);
00804             result_->runs_sizes.back() = iblock * block_type::size + offset;
00805 
00806             if (offset)    // if current block is partially filled
00807             {
00808                 while (offset != block_type::size)
00809                 {
00810                     (*cur_block)[offset] = cmp.max_value();
00811                     ++offset;
00812                 }
00813                 offset = 0;
00814 
00815                 block_manager * bm = block_manager::get_instance();
00816                 // allocate space for the block
00817                 result_->runs.resize(irun + 1);
00818                 result_->runs[irun].resize(iblock + 1);
00819                 bm->new_blocks(
00820                     alloc_strategy,
00821                     make_bid_iterator(result_->runs[irun].begin() + iblock),
00822                     make_bid_iterator(result_->runs[irun].end()),
00823                     iblock
00824                     );
00825 
00826                 result_->runs[irun][iblock].value = (*cur_block)[0];         // init trigger
00827                 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
00828             }
00829             else
00830             { }
00831 
00832             alloc_strategy = alloc_strategy_type();  // reinitialize block allocator for the next run
00833             iblock = 0;
00834             ++irun;
00835         }
00836 
00837         //! \brief Returns the sorted runs object
00838         //! \return Sorted runs object
00839         //! \remark Returned object is intended to be used by \c runs_merger object as input
00840         sorted_runs_type & result()
00841         {
00842             finish();
00843             writer.flush();
00844 
00845             return result_;
00846         }
00847     };
00848 
00849 
00850     //! \brief Checker for the sorted runs object created by the \c runs_creator .
00851     //! \param sruns sorted runs object
00852     //! \param cmp comparison object used for checking the order of elements in runs
00853     //! \return \c true if runs are sorted, \c false otherwise
00854     template <class RunsType_, class CompareType_>
00855     bool check_sorted_runs(const RunsType_ & sruns, CompareType_ cmp)
00856     {
00857         sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00858         typedef typename RunsType_::element_type::block_type block_type;
00859         typedef typename block_type::value_type value_type;
00860         STXXL_VERBOSE2("Elements: " << sruns->elements);
00861         unsigned_type nruns = sruns->runs.size();
00862         STXXL_VERBOSE2("Runs: " << nruns);
00863         unsigned_type irun = 0;
00864         for (irun = 0; irun < nruns; ++irun)
00865         {
00866             const unsigned_type nblocks = sruns->runs[irun].size();
00867             block_type * blocks = new block_type[nblocks];
00868             request_ptr * reqs = new request_ptr[nblocks];
00869             for (unsigned_type j = 0; j < nblocks; ++j)
00870             {
00871                 reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
00872             }
00873             wait_all(reqs, reqs + nblocks);
00874             for (unsigned_type j = 0; j < nblocks; ++j)
00875             {
00876                 if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
00877                     cmp(sruns->runs[irun][j].value, blocks[j][0])) //!=
00878                 {
00879                     STXXL_ERRMSG("check_sorted_runs  wrong trigger in the run");
00880                     return false;
00881                 }
00882             }
00883             if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00884                                   make_element_iterator(blocks, sruns->runs_sizes[irun]),
00885                                   cmp))
00886             {
00887                 STXXL_ERRMSG("check_sorted_runs  wrong order in the run");
00888                 return false;
00889             }
00890 
00891             delete[] reqs;
00892             delete[] blocks;
00893         }
00894 
00895         STXXL_MSG("Checking runs finished successfully");
00896 
00897         return true;
00898     }
00899 
00900 
00901     ////////////////////////////////////////////////////////////////////////
00902     //     MERGE RUNS                                                     //
00903     ////////////////////////////////////////////////////////////////////////
00904 
00905     //! \brief Merges sorted runs
00906     //!
00907     //! \tparam RunsType_ type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
00908     //! \tparam CompareType_ type of comparison object used for merging
00909     //! \tparam AllocStr_ allocation strategy used to allocate the blocks for
00910     //! storing intermediate results if several merge passes are required
00911     template <class RunsType_,
00912               class CompareType_,
00913               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00914     class basic_runs_merger : private noncopyable
00915     {
00916     public:
00917         typedef RunsType_ sorted_runs_type;
00918         typedef CompareType_ value_cmp;
00919         typedef AllocStr_ alloc_strategy;
00920 
00921         typedef typename sorted_runs_type::element_type sorted_runs_data_type;
00922         typedef typename sorted_runs_data_type::size_type size_type;
00923         typedef typename sorted_runs_data_type::run_type run_type;
00924         typedef typename sorted_runs_data_type::block_type block_type;
00925         typedef block_type out_block_type;
00926         typedef typename run_type::value_type trigger_entry_type;
00927         typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00928         typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00929         typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00930         typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
00931         typedef stxxl::int64 diff_type;
00932         typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00933         typedef typename std::vector<sequence>::size_type seqs_size_type;
00934 
00935     public:
00936         //! \brief Standard stream typedef
00937         typedef typename sorted_runs_data_type::value_type value_type;
00938 
00939     private:
00940         /// comparator object to sort runs
00941         value_cmp m_cmp;
00942 
00943         /// memory size in bytes to use
00944         unsigned_type   m_memory_to_use;
00945         
00946         /// smart pointer to sorted_runs object
00947         sorted_runs_type m_sruns;
00948 
00949         /// items remaining in input
00950         size_type       m_elements_remaining;
00951 
00952         /// memory buffer for merging from external streams
00953         out_block_type*  m_buffer_block;
00954 
00955         /// pointer into current memory buffer: this is either m_buffer_block or the small_runs vector
00956         const value_type* m_current_ptr;
00957         
00958         /// pointer into current memory buffer: end after range of current values
00959         const value_type* m_current_end;
00960 
00961         /// sequence of block needed for merging
00962         run_type        m_consume_seq;
00963         
00964         /// precalculated order of blocks in which they are prefetched
00965         int_type*       m_prefetch_seq;
00966 
00967         /// prefetcher object
00968         prefetcher_type * m_prefetcher;
00969 
00970         /// loser tree used for native merging
00971         loser_tree_type * m_losers;
00972 
00973 #if STXXL_PARALLEL_MULTIWAY_MERGE
00974         std::vector<sequence> * seqs;
00975         std::vector<block_type *> * buffers;
00976         diff_type num_currently_mergeable;
00977 #endif
00978 
00979 #if STXXL_CHECK_ORDER_IN_SORTS
00980         /// previous element to ensure the current output ordering
00981         value_type      m_last_element;
00982 #endif //STXXL_CHECK_ORDER_IN_SORTS
00983         
00984         ////////////////////////////////////////////////////////////////////
00985 
00986         void merge_recursively();
00987 
00988         void deallocate_prefetcher()
00989         {
00990             if (m_prefetcher)
00991             {
00992                 delete m_losers;
00993 #if STXXL_PARALLEL_MULTIWAY_MERGE
00994                 delete seqs;
00995                 delete buffers;
00996 #endif
00997                 delete m_prefetcher;
00998                 delete[] m_prefetch_seq;
00999                 m_prefetcher = NULL;
01000             }
01001         }
01002 
01003         void fill_buffer_block()
01004         {
01005             STXXL_VERBOSE1("fill_buffer_block");
01006             if (do_parallel_merge())
01007             {
01008 #if STXXL_PARALLEL_MULTIWAY_MERGE
01009 // begin of STL-style merging
01010                 diff_type rest = out_block_type::size;          // elements still to merge for this output block
01011 
01012                 do                                              // while rest > 0 and still elements available
01013                 {
01014                     if (num_currently_mergeable < rest)
01015                     {
01016                         if (!m_prefetcher || m_prefetcher->empty())
01017                         {
01018                             // anything remaining is already in memory
01019                             num_currently_mergeable = m_elements_remaining;
01020                         }
01021                         else
01022                         {
01023                             num_currently_mergeable = sort_helper::count_elements_less_equal(
01024                                 *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
01025                         }
01026                     }
01027 
01028                     diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);     // at most rest elements
01029 
01030                     STXXL_VERBOSE1("before merge " << output_size);
01031 
01032                     stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size);
01033                     // sequence iterators are progressed appropriately
01034 
01035                     rest -= output_size;
01036                     num_currently_mergeable -= output_size;
01037 
01038                     STXXL_VERBOSE1("after merge");
01039 
01040                     sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *m_prefetcher);
01041                 } while (rest > 0 && (*seqs).size() > 0);
01042 
01043 #if STXXL_CHECK_ORDER_IN_SORTS
01044                 if (!stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
01045                 {
01046                     for (value_type * i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
01047                         if (cmp(*i, *(i - 1)))
01048                         {
01049                             STXXL_VERBOSE1("Error at position " << (i - m_buffer_block->begin()));
01050                         }
01051                     assert(false);
01052                 }
01053 #endif //STXXL_CHECK_ORDER_IN_SORTS
01054 
01055 // end of STL-style merging
01056 #else
01057                 STXXL_THROW_UNREACHABLE();
01058 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
01059             }
01060             else
01061             {
01062 // begin of native merging procedure
01063                 m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
01064 // end of native merging procedure
01065             }
01066             STXXL_VERBOSE1("current block filled");
01067 
01068             m_current_ptr = m_buffer_block->elem;
01069             m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
01070 
01071             if (m_elements_remaining <= out_block_type::size)
01072                 deallocate_prefetcher();
01073         }
01074 
01075     public:
01076         //! \brief Creates a runs merger object
01077         //! \param c comparison object
01078         //! \param memory_to_use amount of memory available for the merger in bytes
01079         basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
01080             : m_cmp(c),
01081               m_memory_to_use(memory_to_use),
01082               m_buffer_block(new out_block_type),
01083               m_prefetch_seq(NULL),
01084               m_prefetcher(NULL),
01085               m_losers(NULL)
01086 #if STXXL_PARALLEL_MULTIWAY_MERGE
01087               , seqs(NULL),
01088               buffers(NULL),
01089               num_currently_mergeable(0)
01090 #endif
01091 #if STXXL_CHECK_ORDER_IN_SORTS
01092               , m_last_element(m_cmp.min_value())
01093 #endif //STXXL_CHECK_ORDER_IN_SORTS
01094         {
01095             sort_helper::verify_sentinel_strict_weak_ordering(m_cmp);
01096         }
01097 
01098         //! \brief Set memory amount to use for the merger in bytes
01099         void set_memory_to_use(unsigned_type memory_to_use)
01100         {
01101             m_memory_to_use = memory_to_use;
01102         }
01103 
01104         //! \brief Initialize the runs merger object with a new round of sorted_runs
01105         void initialize(const sorted_runs_type & sruns)
01106         {
01107             m_sruns = sruns;
01108             m_elements_remaining = m_sruns->elements;
01109 
01110             if (empty())
01111                 return;
01112 
01113             if (!m_sruns->small_run.empty())
01114             {
01115                 // we have a small input <= B, that is kept in the main memory
01116                 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
01117                 assert(m_elements_remaining == size_type(m_sruns->small_run.size()));
01118 
01119                 m_current_ptr = &m_sruns->small_run[0];
01120                 m_current_end = m_current_ptr + m_sruns->small_run.size();
01121 
01122                 return;
01123             }
01124 
01125 #if STXXL_CHECK_ORDER_IN_SORTS
01126             assert(check_sorted_runs(m_sruns, m_cmp));
01127 #endif //STXXL_CHECK_ORDER_IN_SORTS
01128 
01129             // *** test whether recursive merging is necessary
01130 
01131             disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
01132 
01133             int_type disks_number = config::get_instance()->disks_number();
01134             unsigned_type min_prefetch_buffers = 2 * disks_number;
01135             unsigned_type input_buffers = (m_memory_to_use > sizeof(out_block_type) ? m_memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
01136             unsigned_type nruns = m_sruns->runs.size();
01137 
01138             if (input_buffers < nruns + min_prefetch_buffers)
01139             {
01140                 // can not merge runs in one pass. merge recursively:
01141                 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
01142                 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
01143                 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
01144                 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
01145                 STXXL_WARNMSG_RECURSIVE_SORT("memory_to_use=" << m_memory_to_use << " bytes  block_type::raw_size=" << block_type::raw_size << " bytes");
01146 
01147                 // check whether we have enough memory to merge recursively
01148                 unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
01149                 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
01150                     // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
01151                     // as well as 1 current output block and at least 2 input blocks
01152                     STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
01153                                  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
01154                     STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
01155                     throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
01156                 }
01157 
01158                 merge_recursively();
01159 
01160                 nruns = m_sruns->runs.size();
01161             }
01162 
01163             assert(nruns + min_prefetch_buffers <= input_buffers);
01164 
01165             // *** Allocate prefetcher and merge data structure
01166 
01167             deallocate_prefetcher();
01168 
01169             unsigned_type prefetch_seq_size = 0;
01170             for (unsigned_type i = 0; i < nruns; ++i)
01171             {
01172                 prefetch_seq_size += m_sruns->runs[i].size();
01173             }
01174 
01175             m_consume_seq.resize(prefetch_seq_size);
01176             m_prefetch_seq = new int_type[prefetch_seq_size];
01177 
01178             typename run_type::iterator copy_start = m_consume_seq.begin();
01179             for (unsigned_type i = 0; i < nruns; ++i)
01180             {
01181                 copy_start = std::copy(m_sruns->runs[i].begin(),
01182                                        m_sruns->runs[i].end(),
01183                                        copy_start);
01184             }
01185 
01186             std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
01187                              sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(m_cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
01188 
01189             const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
01190 
01191 #if STXXL_SORT_OPTIMAL_PREFETCHING
01192             // heuristic
01193             const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
01194 
01195             compute_prefetch_schedule(
01196                 m_consume_seq,
01197                 m_prefetch_seq,
01198                 n_opt_prefetch_buffers,
01199                 disks_number);
01200 #else
01201             for (unsigned_type i = 0; i < prefetch_seq_size; ++i)
01202                 m_prefetch_seq[i] = i;
01203 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
01204 
01205             m_prefetcher = new prefetcher_type(
01206                 m_consume_seq.begin(),
01207                 m_consume_seq.end(),
01208                 m_prefetch_seq,
01209                 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
01210 
01211             if (do_parallel_merge())
01212             {
01213 #if STXXL_PARALLEL_MULTIWAY_MERGE
01214 // begin of STL-style merging
01215                 seqs = new std::vector<sequence>(nruns);
01216                 buffers = new std::vector<block_type *>(nruns);
01217 
01218                 for (unsigned_type i = 0; i < nruns; ++i)                                       //initialize sequences
01219                 {
01220                     (*buffers)[i] = m_prefetcher->pull_block();                                   //get first block of each run
01221                     (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());  //this memory location stays the same, only the data is exchanged
01222                 }
01223 // end of STL-style merging
01224 #else
01225                 STXXL_THROW_UNREACHABLE();
01226 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
01227             }
01228             else
01229             {
01230 // begin of native merging procedure
01231                 m_losers = new loser_tree_type(m_prefetcher, nruns, run_cursor2_cmp_type(m_cmp));
01232 // end of native merging procedure
01233             }
01234 
01235             fill_buffer_block();
01236         }
01237 
01238         //! \brief Deallocate temporary structures freeing memory prior to next initialize()
01239         void deallocate()
01240         {
01241             deallocate_prefetcher();
01242             m_sruns = NULL;     // release reference on result object
01243         }
01244 
01245     public:
01246         //! \brief Standard stream method
01247         bool empty() const
01248         {
01249             return (m_elements_remaining == 0);
01250         }
01251 
01252         //! \brief Standard size method
01253         size_type size() const
01254         {
01255             return m_elements_remaining;
01256         }
01257 
01258         //! \brief Standard stream method
01259         const value_type & operator * () const
01260         {
01261             assert(!empty());
01262             return *m_current_ptr;
01263         }
01264 
01265         //! \brief Standard stream method
01266         const value_type * operator -> () const
01267         {
01268             return &(operator * ());
01269         }
01270 
01271         //! \brief Standard stream method
01272         basic_runs_merger & operator ++ ()  // preincrement operator
01273         {
01274             assert(!empty());
01275             assert(m_current_ptr != m_current_end);
01276 
01277             --m_elements_remaining;
01278             ++m_current_ptr;
01279 
01280             if (LIKELY(m_current_ptr == m_current_end && !empty()))
01281             {
01282                 fill_buffer_block();
01283 
01284 #if STXXL_CHECK_ORDER_IN_SORTS
01285                 assert(stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
01286 #endif //STXXL_CHECK_ORDER_IN_SORTS
01287             }
01288 
01289 #if STXXL_CHECK_ORDER_IN_SORTS
01290             if (!empty())
01291             {
01292                 assert(!m_cmp(operator*(), m_last_element));
01293                 m_last_element = operator*();
01294             }
01295 #endif //STXXL_CHECK_ORDER_IN_SORTS
01296 
01297             return *this;
01298         }
01299 
01300         //! \brief Destructor
01301         //! \remark Deallocates blocks of the input sorted runs object
01302         virtual ~basic_runs_merger()
01303         {
01304             deallocate_prefetcher();
01305 
01306             delete m_buffer_block;
01307         }
01308     };
01309 
01310 
01311     template <class RunsType_, class CompareType_, class AllocStr_>
01312     void basic_runs_merger<RunsType_, CompareType_, AllocStr_>::merge_recursively()
01313     {
01314         block_manager * bm = block_manager::get_instance();
01315         unsigned_type ndisks = config::get_instance()->disks_number();
01316         unsigned_type nwrite_buffers = 2 * ndisks;
01317         unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
01318 
01319         // memory consumption of the recursive merger (uses block_type as out_block_type)
01320         unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
01321         unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
01322         unsigned_type memory_for_buffers = memory_for_write_buffers
01323                                            + recursive_merger_memory_prefetch_buffers
01324                                            + recursive_merger_memory_out_block;
01325         // maximum arity in the recursive merger
01326         unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
01327 
01328         unsigned_type nruns = m_sruns->runs.size();
01329         const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
01330         assert(merge_factor > 1);
01331         assert(merge_factor <= max_arity);
01332 
01333         while (nruns > max_arity)
01334         {
01335             unsigned_type new_nruns = div_ceil(nruns, merge_factor);
01336             STXXL_MSG("Starting new merge phase: nruns: " << nruns <<
01337                       " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
01338 
01339             // construct new sorted_runs data object which will be swapped into m_sruns
01340 
01341             sorted_runs_data_type new_runs;
01342             new_runs.runs.resize(new_nruns);
01343             new_runs.runs_sizes.resize(new_nruns);
01344             new_runs.elements = m_sruns->elements;
01345 
01346             // merge all runs from m_runs into news_runs
01347 
01348             unsigned_type runs_left = nruns;
01349             unsigned_type cur_out_run = 0;
01350             size_type elements_left = m_sruns->elements;
01351 
01352             while (runs_left > 0)
01353             {
01354                 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01355                 STXXL_MSG("Merging " << runs2merge << " runs");
01356 
01357                 if (runs2merge > 1) // non-trivial merge
01358                 {
01359                     // count the number of elements in the run
01360                     unsigned_type elements_in_new_run = 0;
01361                     for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01362                     {
01363                         elements_in_new_run += m_sruns->runs_sizes[i];
01364                     }
01365                     new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01366 
01367                     // calculate blocks in run
01368                     const unsigned_type blocks_in_new_run = div_ceil(elements_in_new_run, block_type::size);
01369 
01370                     // allocate blocks for the new runs
01371                     new_runs.runs[cur_out_run].resize(blocks_in_new_run);
01372                     bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[cur_out_run].begin()), make_bid_iterator(new_runs.runs[cur_out_run].end()));
01373 
01374                     // Construct temporary sorted_runs object as input into recursive merger.
01375                     // This sorted_runs is copied a subset of the over-large set of runs, which
01376                     // will be deallocated from external memory once the runs are merged.
01377                     sorted_runs_data_type cur_runs;
01378                     cur_runs.runs.resize(runs2merge);
01379                     cur_runs.runs_sizes.resize(runs2merge);
01380                     cur_runs.inc_ref();     // hold dangling reference
01381 
01382                     std::copy(m_sruns->runs.begin() + nruns - runs_left,
01383                               m_sruns->runs.begin() + nruns - runs_left + runs2merge,
01384                               cur_runs.runs.begin());
01385                     std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
01386                               m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
01387                               cur_runs.runs_sizes.begin());
01388 
01389                     cur_runs.elements = elements_in_new_run;
01390                     elements_left -= elements_in_new_run;
01391 
01392                     // construct recursive merger
01393 
01394                     basic_runs_merger<RunsType_, CompareType_, AllocStr_> merger(m_cmp, m_memory_to_use - memory_for_write_buffers);
01395                     merger.initialize(&cur_runs);
01396 
01397                     {   // make sure everything is being destroyed in right time
01398                         buf_ostream<block_type, typename run_type::iterator> out(
01399                             new_runs.runs[cur_out_run].begin(),
01400                             nwrite_buffers);
01401 
01402                         size_type cnt = 0;
01403                         const size_type cnt_max = cur_runs.elements;
01404 
01405                         while (cnt != cnt_max)
01406                         {
01407                             *out = *merger;
01408                             if ((cnt % block_type::size) == 0) // have to write the trigger value
01409                                 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01410 
01411                             ++cnt, ++out, ++merger;
01412                         }
01413                         assert(merger.empty());
01414 
01415                         while (cnt % block_type::size)
01416                         {
01417                             *out = m_cmp.max_value();
01418                             ++out, ++cnt;
01419                         }
01420                     }
01421 
01422                     // deallocate merged runs by destroying cur_runs
01423                 }
01424                 else // runs2merge = 1 -> no merging needed
01425                 {
01426                     assert( cur_out_run+1 == new_runs.runs.size() );
01427 
01428                     elements_left -= m_sruns->runs_sizes.back();
01429 
01430                     // copy block identifiers into new sorted_runs object
01431                     new_runs.runs.back() = m_sruns->runs.back();
01432                     new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
01433                 }
01434 
01435                 runs_left -= runs2merge;
01436                 ++cur_out_run;
01437             }
01438 
01439             assert(elements_left == 0);
01440 
01441             m_sruns->runs.clear();      // clear bid vector of m_sruns to skip deallocation of blocks in destructor
01442 
01443             std::swap(nruns, new_nruns);
01444             m_sruns->swap(new_runs);           // replaces data in referenced counted object m_sruns
01445 
01446         } // end while (nruns > max_arity)
01447     }
01448 
01449 
01450     //! \brief Merges sorted runs
01451     //!
01452     //! \tparam RunsType_ type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
01453     //! \tparam CompareType_ type of comparison object used for merging
01454     //! \tparam AllocStr_ allocation strategy used to allocate the blocks for
01455     //! storing intermediate results if several merge passes are required
01456     template <class RunsType_,
01457               class CompareType_ = typename RunsType_::element_type::cmp_type,
01458               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01459     class runs_merger : public basic_runs_merger<RunsType_, CompareType_, AllocStr_>
01460     {
01461     protected:
01462         typedef basic_runs_merger<RunsType_, CompareType_, AllocStr_> base;
01463 
01464     public:
01465         typedef RunsType_                       sorted_runs_type;
01466         typedef typename base::value_cmp        value_cmp;
01467         typedef typename base::value_cmp        cmp_type;
01468         typedef typename base::block_type       block_type;
01469 
01470     public:
01471         //! \brief Creates a runs merger object
01472         //! \param r input sorted runs object
01473         //! \param c comparison object
01474         //! \param memory_to_use amount of memory available for the merger in bytes
01475         runs_merger(sorted_runs_type & sruns, value_cmp cmp, unsigned_type memory_to_use)
01476             : base(cmp, memory_to_use)
01477         {
01478             initialize(sruns);
01479         }
01480 
01481         //! \brief Creates a runs merger object without initializing a round of sorted_runs
01482         //! \param c comparison object
01483         //! \param memory_to_use amount of memory available for the merger in bytes
01484         runs_merger(value_cmp cmp, unsigned_type memory_to_use)
01485             : base(cmp, memory_to_use)
01486         {
01487         }
01488     };
01489 
01490 
01491     ////////////////////////////////////////////////////////////////////////
01492     //     SORT                                                           //
01493     ////////////////////////////////////////////////////////////////////////
01494 
01495     //! \brief Produces sorted stream from input stream
01496     //!
01497     //! \tparam Input_ type of the input stream
01498     //! \tparam CompareType_ type of comparison object used for sorting the runs
01499     //! \tparam BlockSize_ size of blocks used to store the runs
01500     //! \tparam AllocStr_ functor that defines allocation strategy for the runs
01501     //! \remark Implemented as the composition of \c runs_creator and \c runs_merger .
01502     template <class Input_,
01503               class CompareType_,
01504               unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01505               class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
01506               class runs_creator_type = runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> >
01507     class sort : public noncopyable
01508     {
01509         typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01510         typedef runs_merger<sorted_runs_type, CompareType_, AllocStr_> runs_merger_type;
01511 
01512         runs_creator_type creator;
01513         runs_merger_type merger;
01514 
01515     public:
01516         //! \brief Standard stream typedef
01517         typedef typename Input_::value_type value_type;
01518 
01519         //! \brief Creates the object
01520         //! \param in input stream
01521         //! \param c comparator object
01522         //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
01523         sort(Input_ & in, CompareType_ c, unsigned_type memory_to_use) :
01524             creator(in, c, memory_to_use),
01525             merger(creator.result(), c, memory_to_use)
01526         {
01527             sort_helper::verify_sentinel_strict_weak_ordering(c);
01528         }
01529 
01530         //! \brief Creates the object
01531         //! \param in input stream
01532         //! \param c comparator object
01533         //! \param m_memory_to_userc memory amount that is allowed to used by the runs creator in bytes
01534         //! \param m_memory_to_use memory amount that is allowed to used by the merger in bytes
01535         sort(Input_ & in, CompareType_ c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use) :
01536             creator(in, c, m_memory_to_userc),
01537             merger(creator.result(), c, m_memory_to_use)
01538         {
01539             sort_helper::verify_sentinel_strict_weak_ordering(c);
01540         }
01541 
01542 
01543         //! \brief Standard stream method
01544         bool empty() const
01545         {
01546             return merger.empty();
01547         }
01548 
01549         //! \brief Standard stream method
01550         const value_type & operator * () const
01551         {
01552             assert(!empty());
01553             return *merger;
01554         }
01555 
01556         const value_type * operator -> () const
01557         {
01558             assert(!empty());
01559             return merger.operator -> ();
01560         }
01561 
01562         //! \brief Standard stream method
01563         sort & operator ++ ()
01564         {
01565             ++merger;
01566             return *this;
01567         }
01568     };
01569 
01570     //! \brief Computes sorted runs type from value type and block size
01571     //!
01572     //! \tparam ValueType_ type of values ins sorted runs
01573     //! \tparam BlockSize_ size of blocks where sorted runs stored
01574     template <
01575         class ValueType_,
01576         unsigned BlockSize_>
01577     class compute_sorted_runs_type
01578     {
01579         typedef ValueType_ value_type;
01580         typedef BID<BlockSize_> bid_type;
01581         typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
01582 
01583     public:
01584         typedef sorted_runs<trigger_entry_type,std::less<value_type> > result;
01585     };
01586 
01587 //! \}
01588 }
01589 
01590 //! \addtogroup stlalgo
01591 //! \{
01592 
01593 //! \brief Sorts range of any random access iterators externally
01594 
01595 //! \param begin iterator pointing to the first element of the range
01596 //! \param end iterator pointing to the last+1 element of the range
01597 //! \param cmp comparison object
01598 //! \param MemSize memory to use for sorting (in bytes)
01599 //! \param AS allocation strategy
01600 //!
01601 //! The \c BlockSize template parameter defines the block size to use (in bytes)
01602 //! \warning Slower than External Iterator Sort
01603 template <unsigned BlockSize,
01604           class RandomAccessIterator,
01605           class CmpType,
01606           class AllocStr>
01607 void sort(RandomAccessIterator begin,
01608           RandomAccessIterator end,
01609           CmpType cmp,
01610           unsigned_type MemSize,
01611           AllocStr AS)
01612 {
01613     STXXL_UNUSED(AS);
01614 #ifdef BOOST_MSVC
01615     typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01616 #else
01617     typedef __typeof__(stream::streamify(begin, end)) InputType;
01618 #endif //BOOST_MSVC
01619     InputType Input(begin, end);
01620     typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01621     sorter_type Sort(Input, cmp, MemSize);
01622     stream::materialize(Sort, begin);
01623 }
01624 
01625 //! \}
01626 
01627 __STXXL_END_NAMESPACE
01628 
01629 #endif // !STXXL_SORT_STREAM_HEADER
01630 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines