Stxxl  1.4.0
containers/monotonic_pq.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  containers/monotonic_pq.cpp
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2007, 2009 Johannes Singler <singler@ira.uka.de>
00008  *  Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #include <queue>
00016 #include <limits>
00017 
00018 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
00019 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
00020 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
00021 
00022 #define TINY_PQ 0
00023 #define MANUAL_PQ 0
00024 
00025 #define SIDE_PQ 1       // compare with second, in-memory PQ (needs a lot of memory)
00026 
00027 #include <stxxl/priority_queue>
00028 #include <stxxl/stats>
00029 #include <stxxl/timer>
00030 
00031 const stxxl::unsigned_type mega = 1024 * 1024;    //1 * 1024 does not work here
00032 
00033 //const int block_size = STXXL_DEFAULT_BLOCK_SIZE(my_type);
00034 const stxxl::unsigned_type block_size = 4 * mega;
00035 
00036 #define RECORD_SIZE 16
00037 #define LOAD 0
00038 
00039 typedef stxxl::uint64 my_key_type;
00040 
00041 #define MAGIC 123
00042 
00043 struct my_type
00044 {
00045     typedef my_key_type key_type;
00046 
00047     key_type key;
00048 #if LOAD
00049     key_type load;
00050     char data[RECORD_SIZE - 2 * sizeof(key_type)];
00051 #else
00052     char data[RECORD_SIZE - sizeof(key_type)];
00053 #endif
00054 
00055     my_type() { }
00056     my_type(key_type __key) : key(__key) { }
00057 #if LOAD
00058     my_type(key_type __key, key_type __load) : key(__key), load(__load) { }
00059 #endif
00060 
00061     void operator = (const key_type & __key) { key = __key; }
00062 #if LOAD
00063     void operator = (const my_type & mt)
00064     {
00065         key = mt.key;
00066         load = mt.load;
00067     }
00068     bool operator == (const my_type & mt) { return (key == mt.key) && (load = mt.load); }
00069 #else
00070     void operator = (const my_type & mt) { key = mt.key; }
00071     bool operator == (const my_type & mt) { return key == mt.key; }
00072 #endif
00073 };
00074 
00075 std::ostream & operator << (std::ostream & o, const my_type & obj)
00076 {
00077     o << obj.key;
00078 #if LOAD
00079     o << "/" << obj.load;
00080 #endif
00081     return o;
00082 }
00083 
00084 //STXXL priority queue is a _maximum_ PQ. "Greater" comparator makes this a "minimum" PQ again.
00085 
00086 struct my_cmp /*: public std::binary_function<my_type, my_type, bool>*/ // greater
00087 {
00088     typedef my_type first_argument_type;
00089     typedef my_type second_argument_type;
00090     typedef bool result_type;
00091 
00092     bool operator () (const my_type & a, const my_type & b) const
00093     {
00094         return a.key > b.key;
00095     }
00096 
00097     my_type min_value() const
00098     {
00099 #if LOAD
00100         return my_type((std::numeric_limits<my_type::key_type>::max)(), MAGIC);
00101 #else
00102         return my_type((std::numeric_limits<my_type::key_type>::max)());
00103 #endif
00104     }
00105     my_type max_value() const
00106     {
00107 #if LOAD
00108         return my_type((std::numeric_limits<my_type::key_type>::min)(), MAGIC);
00109 #else
00110         return my_type((std::numeric_limits<my_type::key_type>::min)());
00111 #endif
00112     }
00113 };
00114 
00115 int main(int argc, char * argv[])
00116 {
00117     if (argc < 3)
00118     {
00119         std::cout << "Usage: " << argv[0] << " [n in MiB]"
00120 #if defined(__MCSTL__) || defined(STXXL_PARALLEL_MODE)
00121                   << " [p threads]"
00122 #endif
00123                   << std::endl;
00124         return -1;
00125     }
00126 
00127     STXXL_MSG("----------------------------------------");
00128 
00129     stxxl::config::get_instance();
00130     std::string Flags = std::string("")
00131 #if STXXL_CHECK_ORDER_IN_SORTS
00132                         + " STXXL_CHECK_ORDER_IN_SORTS"
00133 #endif
00134 #ifdef NDEBUG
00135                         + " NDEBUG"
00136 #endif
00137 #if TINY_PQ
00138                         + " TINY_PQ"
00139 #endif
00140 #if MANUAL_PQ
00141                         + " MANUAL_PQ"
00142 #endif
00143 #if SIDE_PQ
00144                         + " SIDE_PQ"
00145 #endif
00146 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00147                         + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL"
00148 #endif
00149 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00150                         + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL"
00151 #endif
00152 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00153                         + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER"
00154 #endif
00155     ;
00156     STXXL_MSG("Flags:" << Flags);
00157 
00158     unsigned long megabytes = atoi(argv[1]);
00159 #ifdef __MCSTL__
00160     int num_threads = atoi(argv[2]);
00161     STXXL_MSG("Threads: " << num_threads);
00162 
00163     mcstl::SETTINGS::num_threads = num_threads;
00164     mcstl::SETTINGS::force_sequential = false;
00165 
00166     mcstl::SETTINGS::sort_algorithm = mcstl::SETTINGS::QS_BALANCED;
00167     mcstl::SETTINGS::sort_splitting = mcstl::SETTINGS::SAMPLING;
00168     mcstl::SETTINGS::sort_minimal_n = 1000;
00169     mcstl::SETTINGS::sort_mwms_oversampling = 10;
00170 
00171     mcstl::SETTINGS::merge_splitting = mcstl::SETTINGS::SAMPLING;
00172     mcstl::SETTINGS::merge_minimal_n = 1000;
00173     mcstl::SETTINGS::merge_oversampling = 10;
00174 
00175     mcstl::SETTINGS::multiway_merge_algorithm = mcstl::SETTINGS::LOSER_TREE;
00176     mcstl::SETTINGS::multiway_merge_splitting = mcstl::SETTINGS::EXACT;
00177     mcstl::SETTINGS::multiway_merge_oversampling = 10;
00178     mcstl::SETTINGS::multiway_merge_minimal_n = 1000;
00179     mcstl::SETTINGS::multiway_merge_minimal_k = 2;
00180 #elif defined(STXXL_PARALLEL_MODE)
00181     int num_threads = atoi(argv[2]);
00182     STXXL_MSG("Threads: " << num_threads);
00183 
00184     omp_set_num_threads(num_threads);
00185     __gnu_parallel::_Settings parallel_settings(__gnu_parallel::_Settings::get());
00186     parallel_settings.sort_algorithm = __gnu_parallel::QS_BALANCED;
00187     parallel_settings.sort_splitting = __gnu_parallel::SAMPLING;
00188     parallel_settings.sort_minimal_n = 1000;
00189     parallel_settings.sort_mwms_oversampling = 10;
00190 
00191     parallel_settings.merge_splitting = __gnu_parallel::SAMPLING;
00192     parallel_settings.merge_minimal_n = 1000;
00193     parallel_settings.merge_oversampling = 10;
00194 
00195     parallel_settings.multiway_merge_algorithm = __gnu_parallel::LOSER_TREE;
00196     parallel_settings.multiway_merge_splitting = __gnu_parallel::EXACT;
00197     parallel_settings.multiway_merge_oversampling = 10;
00198     parallel_settings.multiway_merge_minimal_n = 1000;
00199     parallel_settings.multiway_merge_minimal_k = 2;
00200     __gnu_parallel::_Settings::set(parallel_settings);
00201 #endif
00202 
00203     const stxxl::unsigned_type mem_for_queue = 512 * mega;
00204     const stxxl::unsigned_type mem_for_pools = 512 * mega;
00205 
00206 #if TINY_PQ
00207     stxxl::STXXL_UNUSED(mem_for_queue);
00208     const unsigned BufferSize1 = 32;               // equalize procedure call overheads etc.
00209     const unsigned N = (1 << 9) / sizeof(my_type); // minimal sequence length
00210     const unsigned IntKMAX = 8;                    // maximal arity for internal mergersq
00211     const unsigned IntLevels = 2;                  // number of internal levels
00212     const unsigned BlockSize = (4 * mega);
00213     const unsigned ExtKMAX = 8;                    // maximal arity for external mergers
00214     const unsigned ExtLevels = 2;                  // number of external levels
00215     typedef stxxl::priority_queue<
00216         stxxl::priority_queue_config<
00217             my_type,
00218             my_cmp,
00219             BufferSize1,
00220             N,
00221             IntKMAX,
00222             IntLevels,
00223             BlockSize,
00224             ExtKMAX,
00225             ExtLevels
00226             >
00227         > pq_type;
00228 #elif MANUAL_PQ
00229     stxxl::STXXL_UNUSED(mem_for_queue);
00230     const unsigned BufferSize1 = 32;                    // equalize procedure call overheads etc.
00231     const unsigned N = (1 << 20) / sizeof(my_type);     // minimal sequence length
00232     const unsigned IntKMAX = 16;                        // maximal arity for internal mergersq
00233     const unsigned IntLevels = 2;                       // number of internal levels
00234     const unsigned BlockSize = (4 * mega);
00235     const unsigned ExtKMAX = 32;                        // maximal arity for external mergers
00236     const unsigned ExtLevels = 2;                       // number of external levels
00237     typedef stxxl::priority_queue<
00238         stxxl::priority_queue_config<
00239             my_type,
00240             my_cmp,
00241             BufferSize1,
00242             N,
00243             IntKMAX,
00244             IntLevels,
00245             BlockSize,
00246             ExtKMAX,
00247             ExtLevels
00248             >
00249         > pq_type;
00250 #else
00251     const stxxl::uint64 volume = stxxl::uint64(200000) * mega;     // in bytes
00252     typedef stxxl::PRIORITY_QUEUE_GENERATOR<my_type, my_cmp, mem_for_queue, volume / sizeof(my_type) / 1024 + 1> gen;
00253     typedef gen::result pq_type;
00254 //         BufferSize1 = Config::BufferSize1,
00255 //         N = Config::N,
00256 //         IntKMAX = Config::IntKMAX,
00257 //         IntLevels = Config::IntLevels,
00258 //         ExtLevels = Config::ExtLevels,
00259 //         Levels = Config::IntLevels + Config::ExtLevels,
00260 //         BlockSize = Config::BlockSize,
00261 //         ExtKMAX = Config::ExtKMAX
00262 
00263 
00264 /*  STXXL_MSG ( "Blocks fitting into internal memory m: "<<gen::m );
00265   STXXL_MSG ( "X : "<<gen::X );  //maximum number of internal elements //X = B * (settings::k - m) / settings::E,
00266   STXXL_MSG ( "Expected internal memory consumption: "<< (gen::EConsumption / 1048576) << " MiB");*/
00267 #endif
00268     typedef pq_type::block_type block_type;
00269 
00270     STXXL_MSG("Internal arity: " << pq_type::IntKMAX);
00271     STXXL_MSG("N : " << pq_type::N); //X / (AI * AI)
00272     STXXL_MSG("External arity: " << pq_type::ExtKMAX);
00273     STXXL_MSG("Block size B: " << pq_type::BlockSize);
00274     //EConsumption = X * settings::E + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
00275 
00276     STXXL_MSG("Data type size: " << sizeof(my_type));
00277     STXXL_MSG("");
00278 #ifdef __MCSTL__
00279     STXXL_MSG("multiway_merge_minimal_k: " << mcstl::SETTINGS::multiway_merge_minimal_k);
00280     STXXL_MSG("multiway_merge_minimal_n: " << mcstl::SETTINGS::multiway_merge_minimal_n);
00281 #endif
00282 
00283     stxxl::stats_data sd_start(*stxxl::stats::get_instance());
00284     stxxl::timer Timer;
00285     Timer.start();
00286 
00287     pq_type p(mem_for_pools / 2, mem_for_pools / 2);
00288     stxxl::int64 nelements = stxxl::int64(megabytes * mega / sizeof(my_type)), i;
00289 
00290 
00291     STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B");
00292     STXXL_MSG("Peak number of elements (n): " << nelements);
00293     STXXL_MSG("Max number of elements to contain: " << (stxxl::uint64(pq_type::N) * pq_type::IntKMAX * pq_type::IntKMAX * pq_type::ExtKMAX * pq_type::ExtKMAX));
00294     srand(5);
00295     my_cmp cmp;
00296     my_key_type r, sum_input = 0, sum_output = 0;
00297     my_type least(0), last_least(0);
00298 
00299     const my_key_type modulo = 0x10000000;
00300 
00301 #if SIDE_PQ
00302     std::priority_queue<my_type, std::vector<my_type>, my_cmp> side_pq;
00303 #endif
00304 
00305     my_type side_pq_least;
00306 
00307     STXXL_MSG("op-sequence(monotonic pq): ( push, pop, push ) * n");
00308     for (i = 0; i < nelements; ++i)
00309     {
00310         if ((i % mega) == 0)
00311             STXXL_MSG(
00312                 std::fixed << std::setprecision(2) << std::setw(5) << (100.0 * i / nelements) << "% "
00313                            << "Inserting element " << i << " top() == " << least.key << " @ "
00314                            << std::setprecision(3) << Timer.seconds() << " s"
00315                            << std::setprecision(6) << std::resetiosflags(std::ios_base::floatfield));
00316 
00317         //monotone priority queue
00318         r = least.key + rand() % modulo;
00319         sum_input += r;
00320         p.push(my_type(r));
00321 #if SIDE_PQ
00322         side_pq.push(my_type(r));
00323 #endif
00324 
00325         least = p.top();
00326         sum_output += least.key;
00327         p.pop();
00328 #if SIDE_PQ
00329         side_pq_least = side_pq.top();
00330         side_pq.pop();
00331         if (!(side_pq_least == least))
00332             STXXL_MSG("Wrong result at  " << i << "  " << side_pq_least.key << " != " << least.key);
00333 #endif
00334 
00335         if (cmp(last_least, least))
00336         {
00337             STXXL_MSG("Wrong order at  " << i << "  " << last_least.key << " > " << least.key);
00338         }
00339         else
00340             last_least = least;
00341 
00342         r = least.key + rand() % modulo;
00343         sum_input += r;
00344         p.push(my_type(r));
00345 #if SIDE_PQ
00346         side_pq.push(my_type(r));
00347 #endif
00348     }
00349     Timer.stop();
00350     STXXL_MSG("Time spent for filling: " << Timer.seconds() << " s");
00351 
00352     STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B");
00353     stxxl::stats_data sd_middle(*stxxl::stats::get_instance());
00354     std::cout << sd_middle - sd_start;
00355     Timer.reset();
00356     Timer.start();
00357 
00358     STXXL_MSG("op-sequence(monotonic pq): ( pop, push, pop ) * n");
00359     for (i = 0; i < (nelements); ++i)
00360     {
00361         assert(!p.empty());
00362 
00363         least = p.top();
00364         sum_output += least.key;
00365         p.pop();
00366 #if SIDE_PQ
00367         side_pq_least = side_pq.top();
00368         side_pq.pop();
00369         if (!(side_pq_least == least))
00370         {
00371             STXXL_VERBOSE1("" << side_pq_least << " != " << least);
00372         }
00373 #endif
00374         if (cmp(last_least, least))
00375         {
00376             STXXL_MSG("Wrong result at " << i << "  " << last_least.key << " > " << least.key);
00377         }
00378         else
00379             last_least = least;
00380 
00381         r = least.key + rand() % modulo;
00382         sum_input += r;
00383         p.push(my_type(r));
00384 #if SIDE_PQ
00385         side_pq.push(my_type(r));
00386 #endif
00387 
00388         least = p.top();
00389         sum_output += least.key;
00390         p.pop();
00391 #if SIDE_PQ
00392         side_pq_least = side_pq.top();
00393         side_pq.pop();
00394         if (!(side_pq_least == least))
00395         {
00396             STXXL_VERBOSE1("" << side_pq_least << " != " << least);
00397         }
00398 #endif
00399         if (cmp(last_least, least))
00400         {
00401             STXXL_MSG("Wrong result at " << i << "  " << last_least.key << " > " << least.key);
00402         }
00403         else
00404             last_least = least;
00405 
00406         if ((i % mega) == 0)
00407             STXXL_MSG(
00408                 std::fixed << std::setprecision(2) << std::setw(5) << (100.0 * i / nelements) << "% "
00409                            << "Popped element " << i << " == " << least.key << " @ "
00410                            << std::setprecision(3) << Timer.seconds() << " s"
00411                            << std::setprecision(6) << std::resetiosflags(std::ios_base::floatfield));
00412     }
00413     STXXL_MSG("Last element " << i << " popped");
00414     Timer.stop();
00415 
00416     if (sum_input != sum_output)
00417         STXXL_MSG("WRONG sum! " << sum_input << " - " << sum_output << " = " << (sum_output - sum_input) << " / " << (sum_input - sum_output));
00418 
00419     STXXL_MSG("Time spent for removing elements: " << Timer.seconds() << " s");
00420     STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B");
00421     std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - sd_middle;
00422     std::cout << *stxxl::stats::get_instance();
00423 
00424     assert(sum_input == sum_output);
00425 }
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines