Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * containers/monotonic_pq.cpp 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2007, 2009 Johannes Singler <singler@ira.uka.de> 00008 * Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #include <queue> 00016 #include <limits> 00017 00018 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1 00019 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1 00020 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1 00021 00022 #define TINY_PQ 0 00023 #define MANUAL_PQ 0 00024 00025 #define SIDE_PQ 1 // compare with second, in-memory PQ (needs a lot of memory) 00026 00027 #include <stxxl/priority_queue> 00028 #include <stxxl/stats> 00029 #include <stxxl/timer> 00030 00031 const stxxl::unsigned_type mega = 1024 * 1024; //1 * 1024 does not work here 00032 00033 //const int block_size = STXXL_DEFAULT_BLOCK_SIZE(my_type); 00034 const stxxl::unsigned_type block_size = 4 * mega; 00035 00036 #define RECORD_SIZE 16 00037 #define LOAD 0 00038 00039 typedef stxxl::uint64 my_key_type; 00040 00041 #define MAGIC 123 00042 00043 struct my_type 00044 { 00045 typedef my_key_type key_type; 00046 00047 key_type key; 00048 #if LOAD 00049 key_type load; 00050 char data[RECORD_SIZE - 2 * sizeof(key_type)]; 00051 #else 00052 char data[RECORD_SIZE - sizeof(key_type)]; 00053 #endif 00054 00055 my_type() { } 00056 my_type(key_type __key) : key(__key) { } 00057 #if LOAD 00058 my_type(key_type __key, key_type __load) : key(__key), load(__load) { } 00059 #endif 00060 00061 void operator = (const key_type & __key) { key = __key; } 00062 #if LOAD 00063 void operator = (const my_type & mt) 00064 { 00065 key = mt.key; 00066 load = mt.load; 00067 } 00068 bool operator == (const my_type & mt) { return (key == mt.key) && (load = mt.load); } 00069 #else 00070 void operator = (const my_type & mt) { key = mt.key; } 00071 bool operator == (const my_type & mt) { return key == mt.key; } 00072 #endif 00073 }; 00074 00075 std::ostream & operator << (std::ostream & o, const my_type & obj) 00076 { 00077 o << obj.key; 00078 #if LOAD 00079 o << "/" << obj.load; 00080 #endif 00081 return o; 00082 } 00083 00084 //STXXL priority queue is a _maximum_ PQ. "Greater" comparator makes this a "minimum" PQ again. 00085 00086 struct my_cmp /*: public std::binary_function<my_type, my_type, bool>*/ // greater 00087 { 00088 typedef my_type first_argument_type; 00089 typedef my_type second_argument_type; 00090 typedef bool result_type; 00091 00092 bool operator () (const my_type & a, const my_type & b) const 00093 { 00094 return a.key > b.key; 00095 } 00096 00097 my_type min_value() const 00098 { 00099 #if LOAD 00100 return my_type((std::numeric_limits<my_type::key_type>::max)(), MAGIC); 00101 #else 00102 return my_type((std::numeric_limits<my_type::key_type>::max)()); 00103 #endif 00104 } 00105 my_type max_value() const 00106 { 00107 #if LOAD 00108 return my_type((std::numeric_limits<my_type::key_type>::min)(), MAGIC); 00109 #else 00110 return my_type((std::numeric_limits<my_type::key_type>::min)()); 00111 #endif 00112 } 00113 }; 00114 00115 int main(int argc, char * argv[]) 00116 { 00117 if (argc < 3) 00118 { 00119 std::cout << "Usage: " << argv[0] << " [n in MiB]" 00120 #if defined(__MCSTL__) || defined(STXXL_PARALLEL_MODE) 00121 << " [p threads]" 00122 #endif 00123 << std::endl; 00124 return -1; 00125 } 00126 00127 STXXL_MSG("----------------------------------------"); 00128 00129 stxxl::config::get_instance(); 00130 std::string Flags = std::string("") 00131 #if STXXL_CHECK_ORDER_IN_SORTS 00132 + " STXXL_CHECK_ORDER_IN_SORTS" 00133 #endif 00134 #ifdef NDEBUG 00135 + " NDEBUG" 00136 #endif 00137 #if TINY_PQ 00138 + " TINY_PQ" 00139 #endif 00140 #if MANUAL_PQ 00141 + " MANUAL_PQ" 00142 #endif 00143 #if SIDE_PQ 00144 + " SIDE_PQ" 00145 #endif 00146 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 00147 + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL" 00148 #endif 00149 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 00150 + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL" 00151 #endif 00152 #if STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 00153 + " STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER" 00154 #endif 00155 ; 00156 STXXL_MSG("Flags:" << Flags); 00157 00158 unsigned long megabytes = atoi(argv[1]); 00159 #ifdef __MCSTL__ 00160 int num_threads = atoi(argv[2]); 00161 STXXL_MSG("Threads: " << num_threads); 00162 00163 mcstl::SETTINGS::num_threads = num_threads; 00164 mcstl::SETTINGS::force_sequential = false; 00165 00166 mcstl::SETTINGS::sort_algorithm = mcstl::SETTINGS::QS_BALANCED; 00167 mcstl::SETTINGS::sort_splitting = mcstl::SETTINGS::SAMPLING; 00168 mcstl::SETTINGS::sort_minimal_n = 1000; 00169 mcstl::SETTINGS::sort_mwms_oversampling = 10; 00170 00171 mcstl::SETTINGS::merge_splitting = mcstl::SETTINGS::SAMPLING; 00172 mcstl::SETTINGS::merge_minimal_n = 1000; 00173 mcstl::SETTINGS::merge_oversampling = 10; 00174 00175 mcstl::SETTINGS::multiway_merge_algorithm = mcstl::SETTINGS::LOSER_TREE; 00176 mcstl::SETTINGS::multiway_merge_splitting = mcstl::SETTINGS::EXACT; 00177 mcstl::SETTINGS::multiway_merge_oversampling = 10; 00178 mcstl::SETTINGS::multiway_merge_minimal_n = 1000; 00179 mcstl::SETTINGS::multiway_merge_minimal_k = 2; 00180 #elif defined(STXXL_PARALLEL_MODE) 00181 int num_threads = atoi(argv[2]); 00182 STXXL_MSG("Threads: " << num_threads); 00183 00184 omp_set_num_threads(num_threads); 00185 __gnu_parallel::_Settings parallel_settings(__gnu_parallel::_Settings::get()); 00186 parallel_settings.sort_algorithm = __gnu_parallel::QS_BALANCED; 00187 parallel_settings.sort_splitting = __gnu_parallel::SAMPLING; 00188 parallel_settings.sort_minimal_n = 1000; 00189 parallel_settings.sort_mwms_oversampling = 10; 00190 00191 parallel_settings.merge_splitting = __gnu_parallel::SAMPLING; 00192 parallel_settings.merge_minimal_n = 1000; 00193 parallel_settings.merge_oversampling = 10; 00194 00195 parallel_settings.multiway_merge_algorithm = __gnu_parallel::LOSER_TREE; 00196 parallel_settings.multiway_merge_splitting = __gnu_parallel::EXACT; 00197 parallel_settings.multiway_merge_oversampling = 10; 00198 parallel_settings.multiway_merge_minimal_n = 1000; 00199 parallel_settings.multiway_merge_minimal_k = 2; 00200 __gnu_parallel::_Settings::set(parallel_settings); 00201 #endif 00202 00203 const stxxl::unsigned_type mem_for_queue = 512 * mega; 00204 const stxxl::unsigned_type mem_for_pools = 512 * mega; 00205 00206 #if TINY_PQ 00207 stxxl::STXXL_UNUSED(mem_for_queue); 00208 const unsigned BufferSize1 = 32; // equalize procedure call overheads etc. 00209 const unsigned N = (1 << 9) / sizeof(my_type); // minimal sequence length 00210 const unsigned IntKMAX = 8; // maximal arity for internal mergersq 00211 const unsigned IntLevels = 2; // number of internal levels 00212 const unsigned BlockSize = (4 * mega); 00213 const unsigned ExtKMAX = 8; // maximal arity for external mergers 00214 const unsigned ExtLevels = 2; // number of external levels 00215 typedef stxxl::priority_queue< 00216 stxxl::priority_queue_config< 00217 my_type, 00218 my_cmp, 00219 BufferSize1, 00220 N, 00221 IntKMAX, 00222 IntLevels, 00223 BlockSize, 00224 ExtKMAX, 00225 ExtLevels 00226 > 00227 > pq_type; 00228 #elif MANUAL_PQ 00229 stxxl::STXXL_UNUSED(mem_for_queue); 00230 const unsigned BufferSize1 = 32; // equalize procedure call overheads etc. 00231 const unsigned N = (1 << 20) / sizeof(my_type); // minimal sequence length 00232 const unsigned IntKMAX = 16; // maximal arity for internal mergersq 00233 const unsigned IntLevels = 2; // number of internal levels 00234 const unsigned BlockSize = (4 * mega); 00235 const unsigned ExtKMAX = 32; // maximal arity for external mergers 00236 const unsigned ExtLevels = 2; // number of external levels 00237 typedef stxxl::priority_queue< 00238 stxxl::priority_queue_config< 00239 my_type, 00240 my_cmp, 00241 BufferSize1, 00242 N, 00243 IntKMAX, 00244 IntLevels, 00245 BlockSize, 00246 ExtKMAX, 00247 ExtLevels 00248 > 00249 > pq_type; 00250 #else 00251 const stxxl::uint64 volume = stxxl::uint64(200000) * mega; // in bytes 00252 typedef stxxl::PRIORITY_QUEUE_GENERATOR<my_type, my_cmp, mem_for_queue, volume / sizeof(my_type) / 1024 + 1> gen; 00253 typedef gen::result pq_type; 00254 // BufferSize1 = Config::BufferSize1, 00255 // N = Config::N, 00256 // IntKMAX = Config::IntKMAX, 00257 // IntLevels = Config::IntLevels, 00258 // ExtLevels = Config::ExtLevels, 00259 // Levels = Config::IntLevels + Config::ExtLevels, 00260 // BlockSize = Config::BlockSize, 00261 // ExtKMAX = Config::ExtKMAX 00262 00263 00264 /* STXXL_MSG ( "Blocks fitting into internal memory m: "<<gen::m ); 00265 STXXL_MSG ( "X : "<<gen::X ); //maximum number of internal elements //X = B * (settings::k - m) / settings::E, 00266 STXXL_MSG ( "Expected internal memory consumption: "<< (gen::EConsumption / 1048576) << " MiB");*/ 00267 #endif 00268 typedef pq_type::block_type block_type; 00269 00270 STXXL_MSG("Internal arity: " << pq_type::IntKMAX); 00271 STXXL_MSG("N : " << pq_type::N); //X / (AI * AI) 00272 STXXL_MSG("External arity: " << pq_type::ExtKMAX); 00273 STXXL_MSG("Block size B: " << pq_type::BlockSize); 00274 //EConsumption = X * settings::E + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024 00275 00276 STXXL_MSG("Data type size: " << sizeof(my_type)); 00277 STXXL_MSG(""); 00278 #ifdef __MCSTL__ 00279 STXXL_MSG("multiway_merge_minimal_k: " << mcstl::SETTINGS::multiway_merge_minimal_k); 00280 STXXL_MSG("multiway_merge_minimal_n: " << mcstl::SETTINGS::multiway_merge_minimal_n); 00281 #endif 00282 00283 stxxl::stats_data sd_start(*stxxl::stats::get_instance()); 00284 stxxl::timer Timer; 00285 Timer.start(); 00286 00287 pq_type p(mem_for_pools / 2, mem_for_pools / 2); 00288 stxxl::int64 nelements = stxxl::int64(megabytes * mega / sizeof(my_type)), i; 00289 00290 00291 STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B"); 00292 STXXL_MSG("Peak number of elements (n): " << nelements); 00293 STXXL_MSG("Max number of elements to contain: " << (stxxl::uint64(pq_type::N) * pq_type::IntKMAX * pq_type::IntKMAX * pq_type::ExtKMAX * pq_type::ExtKMAX)); 00294 srand(5); 00295 my_cmp cmp; 00296 my_key_type r, sum_input = 0, sum_output = 0; 00297 my_type least(0), last_least(0); 00298 00299 const my_key_type modulo = 0x10000000; 00300 00301 #if SIDE_PQ 00302 std::priority_queue<my_type, std::vector<my_type>, my_cmp> side_pq; 00303 #endif 00304 00305 my_type side_pq_least; 00306 00307 STXXL_MSG("op-sequence(monotonic pq): ( push, pop, push ) * n"); 00308 for (i = 0; i < nelements; ++i) 00309 { 00310 if ((i % mega) == 0) 00311 STXXL_MSG( 00312 std::fixed << std::setprecision(2) << std::setw(5) << (100.0 * i / nelements) << "% " 00313 << "Inserting element " << i << " top() == " << least.key << " @ " 00314 << std::setprecision(3) << Timer.seconds() << " s" 00315 << std::setprecision(6) << std::resetiosflags(std::ios_base::floatfield)); 00316 00317 //monotone priority queue 00318 r = least.key + rand() % modulo; 00319 sum_input += r; 00320 p.push(my_type(r)); 00321 #if SIDE_PQ 00322 side_pq.push(my_type(r)); 00323 #endif 00324 00325 least = p.top(); 00326 sum_output += least.key; 00327 p.pop(); 00328 #if SIDE_PQ 00329 side_pq_least = side_pq.top(); 00330 side_pq.pop(); 00331 if (!(side_pq_least == least)) 00332 STXXL_MSG("Wrong result at " << i << " " << side_pq_least.key << " != " << least.key); 00333 #endif 00334 00335 if (cmp(last_least, least)) 00336 { 00337 STXXL_MSG("Wrong order at " << i << " " << last_least.key << " > " << least.key); 00338 } 00339 else 00340 last_least = least; 00341 00342 r = least.key + rand() % modulo; 00343 sum_input += r; 00344 p.push(my_type(r)); 00345 #if SIDE_PQ 00346 side_pq.push(my_type(r)); 00347 #endif 00348 } 00349 Timer.stop(); 00350 STXXL_MSG("Time spent for filling: " << Timer.seconds() << " s"); 00351 00352 STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B"); 00353 stxxl::stats_data sd_middle(*stxxl::stats::get_instance()); 00354 std::cout << sd_middle - sd_start; 00355 Timer.reset(); 00356 Timer.start(); 00357 00358 STXXL_MSG("op-sequence(monotonic pq): ( pop, push, pop ) * n"); 00359 for (i = 0; i < (nelements); ++i) 00360 { 00361 assert(!p.empty()); 00362 00363 least = p.top(); 00364 sum_output += least.key; 00365 p.pop(); 00366 #if SIDE_PQ 00367 side_pq_least = side_pq.top(); 00368 side_pq.pop(); 00369 if (!(side_pq_least == least)) 00370 { 00371 STXXL_VERBOSE1("" << side_pq_least << " != " << least); 00372 } 00373 #endif 00374 if (cmp(last_least, least)) 00375 { 00376 STXXL_MSG("Wrong result at " << i << " " << last_least.key << " > " << least.key); 00377 } 00378 else 00379 last_least = least; 00380 00381 r = least.key + rand() % modulo; 00382 sum_input += r; 00383 p.push(my_type(r)); 00384 #if SIDE_PQ 00385 side_pq.push(my_type(r)); 00386 #endif 00387 00388 least = p.top(); 00389 sum_output += least.key; 00390 p.pop(); 00391 #if SIDE_PQ 00392 side_pq_least = side_pq.top(); 00393 side_pq.pop(); 00394 if (!(side_pq_least == least)) 00395 { 00396 STXXL_VERBOSE1("" << side_pq_least << " != " << least); 00397 } 00398 #endif 00399 if (cmp(last_least, least)) 00400 { 00401 STXXL_MSG("Wrong result at " << i << " " << last_least.key << " > " << least.key); 00402 } 00403 else 00404 last_least = least; 00405 00406 if ((i % mega) == 0) 00407 STXXL_MSG( 00408 std::fixed << std::setprecision(2) << std::setw(5) << (100.0 * i / nelements) << "% " 00409 << "Popped element " << i << " == " << least.key << " @ " 00410 << std::setprecision(3) << Timer.seconds() << " s" 00411 << std::setprecision(6) << std::resetiosflags(std::ios_base::floatfield)); 00412 } 00413 STXXL_MSG("Last element " << i << " popped"); 00414 Timer.stop(); 00415 00416 if (sum_input != sum_output) 00417 STXXL_MSG("WRONG sum! " << sum_input << " - " << sum_output << " = " << (sum_output - sum_input) << " / " << (sum_input - sum_output)); 00418 00419 STXXL_MSG("Time spent for removing elements: " << Timer.seconds() << " s"); 00420 STXXL_MSG("Internal memory consumption of the priority queue: " << p.mem_cons() << " B"); 00421 std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - sd_middle; 00422 std::cout << *stxxl::stats::get_instance(); 00423 00424 assert(sum_input == sum_output); 00425 }