Stxxl
1.4.0
|
This is an example of how to use the parallelized sorting algorithm. Setting all the parameters in optional, just compiling with either MCSTL or parallel mode suffices.
/*************************************************************************** * algo/test_parallel_sort.cpp * * Part of the STXXL. See http://stxxl.sourceforge.net * * Copyright (C) 2007, 2009 Johannes Singler <singler@ira.uka.de> * Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> * * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE_1_0.txt or copy at * http://www.boost.org/LICENSE_1_0.txt) **************************************************************************/ //! \example algo/test_parallel_sort.cpp //! This is an example of how to use the parallelized sorting algorithm. //! Setting all the parameters in optional, just compiling with either MCSTL //! or parallel mode suffices. #define MCSTL_QUICKSORT_WORKAROUND 0 #if !defined(STXXL_NOT_CONSIDER_SORT_MEMORY_OVERHEAD) #define STXXL_NOT_CONSIDER_SORT_MEMORY_OVERHEAD 0 #endif #include <algorithm> #include <functional> #include <limits> #include <stxxl/vector> #include <stxxl/stream> #include <stxxl/scan> #include <stxxl/sort> #ifdef __MCSTL__ #include <mcstl.h> #endif const unsigned long long megabyte = 1024 * 1024; //const int block_size = STXXL_DEFAULT_BLOCK_SIZE(my_type); const int block_size = 4 * megabyte; #define RECORD_SIZE 16 #define MAGIC 123 stxxl::unsigned_type run_size; stxxl::unsigned_type buffer_size; struct my_type { typedef unsigned long long key_type; key_type _key; key_type _load; char _data[RECORD_SIZE - 2 * sizeof(key_type)]; key_type key() const { return _key; } my_type() { } my_type(key_type __key) : _key(__key) { } my_type(key_type __key, key_type __load) : _key(__key), _load(__load) { } void operator = (const key_type & __key) { _key = __key; } void operator = (const my_type & mt) { _key = mt._key; _load = mt._load; } }; bool operator < (const my_type & a, const my_type & b); inline bool operator < (const my_type & a, const my_type & b) { return a.key() < b.key(); } inline bool operator == (const my_type & a, const my_type & b) { return a.key() == b.key(); } inline std::ostream & operator << (std::ostream & o, const my_type & obj) { o << obj._key << "/" << obj._load; return o; } struct cmp_less_key : public std::less<my_type> { my_type min_value() const { return my_type(std::numeric_limits<my_type::key_type>::min(), MAGIC); } my_type max_value() const { return my_type(std::numeric_limits<my_type::key_type>::max(), MAGIC); } }; typedef stxxl::vector<my_type, 4, stxxl::lru_pager<8>, block_size, STXXL_DEFAULT_ALLOC_STRATEGY> vector_type; stxxl::unsigned_type checksum(vector_type & input) { stxxl::unsigned_type sum = 0; for (vector_type::const_iterator i = input.begin(); i != input.end(); ++i) sum += (*i)._key; return sum; } void linear_sort_normal(vector_type & input) { stxxl::unsigned_type sum1 = checksum(input); stxxl::stats_data stats_begin(*stxxl::stats::get_instance()); double start = stxxl::timestamp(); stxxl::sort(input.begin(), input.end(), cmp_less_key(), run_size); double stop = stxxl::timestamp(); std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - stats_begin; stxxl::unsigned_type sum2 = checksum(input); std::cout << sum1 << " ?= " << sum2 << std::endl; STXXL_MSG((stxxl::is_sorted<vector_type::const_iterator>(input.begin(), input.end()) ? "OK" : "NOT SORTED")); std::cout << "Linear sorting normal took " << (stop - start) << " seconds." << std::endl; } void linear_sort_streamed(vector_type & input, vector_type & output) { stxxl::unsigned_type sum1 = checksum(input); stxxl::stats_data stats_begin(*stxxl::stats::get_instance()); double start = stxxl::timestamp(); typedef __typeof__(stxxl::stream::streamify(input.begin(), input.end())) input_stream_type; input_stream_type input_stream = stxxl::stream::streamify(input.begin(), input.end()); typedef cmp_less_key comparator_type; comparator_type cl; typedef stxxl::stream::sort<input_stream_type, comparator_type, block_size> sort_stream_type; sort_stream_type sort_stream(input_stream, cl, run_size); vector_type::iterator o = stxxl::stream::materialize(sort_stream, output.begin(), output.end()); assert(o == output.end()); double stop = stxxl::timestamp(); std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - stats_begin; stxxl::unsigned_type sum2 = checksum(output); std::cout << sum1 << " ?= " << sum2 << std::endl; if (sum1 != sum2) STXXL_MSG("WRONG DATA"); STXXL_MSG((stxxl::is_sorted<vector_type::const_iterator>(output.begin(), output.end(), comparator_type()) ? "OK" : "NOT SORTED")); std::cout << "Linear sorting streamed took " << (stop - start) << " seconds." << std::endl; } int main(int argc, const char ** argv) { if (argc < 6) { std::cout << "Usage: " << argv[0] << " [n in MiB] [p threads] [M in MiB] [sorting algorithm: m | q | qb | s] [merging algorithm: p | s | n]" << std::endl; return -1; } stxxl::config::get_instance(); #if STXXL_PARALLEL_MULTIWAY_MERGE STXXL_MSG("STXXL_PARALLEL_MULTIWAY_MERGE"); #endif unsigned long megabytes_to_process = atoi(argv[1]); int p = atoi(argv[2]); stxxl::unsigned_type memory_to_use = (stxxl::unsigned_type)atoi(argv[3]) * megabyte; run_size = memory_to_use; buffer_size = memory_to_use / 16; #ifdef STXXL_PARALLEL_MODE omp_set_num_threads(p); __gnu_parallel::_Settings parallel_settings(__gnu_parallel::_Settings::get()); parallel_settings.merge_splitting = __gnu_parallel::EXACT; parallel_settings.merge_minimal_n = 10000; parallel_settings.merge_oversampling = 10; parallel_settings.multiway_merge_algorithm = __gnu_parallel::LOSER_TREE; parallel_settings.multiway_merge_splitting = __gnu_parallel::EXACT; parallel_settings.multiway_merge_oversampling = 10; parallel_settings.multiway_merge_minimal_n = 10000; parallel_settings.multiway_merge_minimal_k = 2; if (!strcmp(argv[4], "q")) //quicksort parallel_settings.sort_algorithm = __gnu_parallel::QS; else if (!strcmp(argv[4], "qb")) //balanced quicksort parallel_settings.sort_algorithm = __gnu_parallel::QS_BALANCED; else if (!strcmp(argv[4], "m")) //merge sort parallel_settings.sort_algorithm = __gnu_parallel::MWMS; else /*if(!strcmp(argv[4], "s"))*/ //sequential (default) { parallel_settings.sort_algorithm = __gnu_parallel::QS; parallel_settings.sort_minimal_n = memory_to_use; } if (!strcmp(argv[5], "p")) //parallel { stxxl::SETTINGS::native_merge = false; //parallel_settings.multiway_merge_minimal_n = 1024; //leave as default } else if (!strcmp(argv[5], "s")) //sequential { stxxl::SETTINGS::native_merge = false; parallel_settings.multiway_merge_minimal_n = memory_to_use; //too much to be called } else /*if(!strcmp(argv[5], "n"))*/ //native (default) stxxl::SETTINGS::native_merge = true; parallel_settings.multiway_merge_minimal_k = 2; __gnu_parallel::_Settings::set(parallel_settings); assert(&__gnu_parallel::_Settings::get() != ¶llel_settings); if (0) printf("%d %p: mwms %d, q %d, qb %d", __gnu_parallel::_Settings::get().sort_algorithm, &__gnu_parallel::_Settings::get().sort_algorithm, __gnu_parallel::MWMS, __gnu_parallel::QS, __gnu_parallel::QS_BALANCED); #elif defined(__MCSTL__) mcstl::HEURISTIC::num_threads = p; mcstl::HEURISTIC::force_sequential = false; mcstl::HEURISTIC::merge_splitting = mcstl::HEURISTIC::EXACT; mcstl::HEURISTIC::merge_minimal_n = 10000; mcstl::HEURISTIC::merge_oversampling = 10; mcstl::HEURISTIC::multiway_merge_algorithm = mcstl::HEURISTIC::LOSER_TREE; mcstl::HEURISTIC::multiway_merge_splitting = mcstl::HEURISTIC::EXACT; mcstl::HEURISTIC::multiway_merge_oversampling = 10; mcstl::HEURISTIC::multiway_merge_minimal_n = 10000; mcstl::HEURISTIC::multiway_merge_minimal_k = 2; if (!strcmp(argv[4], "q")) //quicksort mcstl::HEURISTIC::sort_algorithm = mcstl::HEURISTIC::QS; else if (!strcmp(argv[4], "qb")) //balanced quicksort mcstl::HEURISTIC::sort_algorithm = mcstl::HEURISTIC::QS_BALANCED; else if (!strcmp(argv[4], "m")) //merge sort mcstl::HEURISTIC::sort_algorithm = mcstl::HEURISTIC::MWMS; else /*if(!strcmp(argv[4], "s"))*/ //sequential (default) { mcstl::HEURISTIC::sort_algorithm = mcstl::HEURISTIC::QS; mcstl::HEURISTIC::sort_minimal_n = memory_to_use; } if (!strcmp(argv[5], "p")) //parallel { stxxl::SETTINGS::native_merge = false; //mcstl::HEURISTIC::multiway_merge_minimal_n = 1024; //leave as default } else if (!strcmp(argv[5], "s")) //sequential { stxxl::SETTINGS::native_merge = false; mcstl::HEURISTIC::multiway_merge_minimal_n = memory_to_use; //too much to be called } else /*if(!strcmp(argv[5], "n"))*/ //native (default) stxxl::SETTINGS::native_merge = true; mcstl::HEURISTIC::multiway_merge_minimal_k = 2; #endif std::cout << "Sorting " << megabytes_to_process << " MiB of data (" << (megabytes_to_process * megabyte / sizeof(my_type)) << " elements) using " << (memory_to_use / megabyte) << " MiB of internal memory and " << p << " thread(s), block size " << block_size << ", element size " << sizeof(my_type) << std::endl; const stxxl::int64 n_records = stxxl::int64(megabytes_to_process) * stxxl::int64(megabyte) / sizeof(my_type); vector_type input(n_records); stxxl::stats_data stats_begin(*stxxl::stats::get_instance()); double generate_start = stxxl::timestamp(); stxxl::generate(input.begin(), input.end(), stxxl::random_number64(), memory_to_use / STXXL_DEFAULT_BLOCK_SIZE(my_type)); double generate_stop = stxxl::timestamp(); std::cout << stxxl::stats_data(*stxxl::stats::get_instance()) - stats_begin; std::cout << "Generating took " << (generate_stop - generate_start) << " seconds." << std::endl; STXXL_MSG(((stxxl::is_sorted<vector_type::const_iterator>(input.begin(), input.end())) ? "OK" : "NOT SORTED")); { vector_type output(n_records); linear_sort_streamed(input, output); linear_sort_normal(input); } return 0; } // vim: et:ts=4:sw=4