http://stxxl.sourceforge.net
<dementiev@mpi-sb.mpg.de>
<singler@ira.uka.de>
<beckmann@cs.uni-frankfurt.de>
http://www.boost.org/LICENSE_1_0.txt
#ifndef STXXL_SORT_STREAM_HEADER
#define STXXL_SORT_STREAM_HEADER
#ifdef STXXL_BOOST_CONFIG
#include <boost/config.hpp>
#endif
#include <stxxl/bits/stream/stream.h>
#include <stxxl/bits/mng/mng.h>
#include <stxxl/bits/algo/sort_base.h>
#include <stxxl/bits/algo/sort_helper.h>
#include <stxxl/bits/algo/adaptor.h>
#include <stxxl/bits/algo/run_cursor.h>
#include <stxxl/bits/algo/losertree.h>
#include <stxxl/bits/stream/sorted_runs.h>
#include <stxxl/bits/counting_ptr.h>
__STXXL_BEGIN_NAMESPACE
namespace stream
{
template <
class Input_,
class CompareType_,
unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
class basic_runs_creator : private noncopyable
{
public:
typedef Input_ input_type;
typedef CompareType_ cmp_type;
static const unsigned block_size = BlockSize_;
typedef AllocStr_ allocation_strategy_type;
public:
typedef typename Input_::value_type value_type;
typedef typed_block<BlockSize_, value_type> block_type;
typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type;
typedef typename sorted_runs_data_type::run_type run_type;
typedef counting_ptr<sorted_runs_data_type> sorted_runs_type;
protected:
Input_ & m_input;
CompareType_ m_cmp;
private:
sorted_runs_type m_result;
unsigned_type m_memsize;
bool m_result_computed;
unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
{
typename element_iterator_traits<block_type>::element_iterator output =
make_element_iterator(blocks, first_idx);
unsigned_type curr_idx = first_idx;
while (!m_input.empty() && curr_idx != last_idx) {
*output = *m_input;
++m_input;
++output;
++curr_idx;
}
return curr_idx;
}
void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
{
unsigned_type last_idx = num_blocks * block_type::size;
if (first_idx < last_idx) {
typename element_iterator_traits<block_type>::element_iterator curr =
make_element_iterator(blocks, first_idx);
while (first_idx != last_idx) {
*curr = m_cmp.max_value();
++curr;
++first_idx;
}
}
}
void sort_run(block_type * run, unsigned_type elements)
{
check_sort_settings();
potentially_parallel::sort(make_element_iterator(run, 0),
make_element_iterator(run, elements),
m_cmp);
}
void compute_result();
public:
basic_runs_creator(Input_ & input, CompareType_ cmp, unsigned_type memory_to_use)
: m_input(input),
m_cmp(cmp),
m_result(new sorted_runs_data_type),
m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
m_result_computed(false)
{
sort_helper::verify_sentinel_strict_weak_ordering(cmp);
if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
}
assert(m_memsize > 0);
}
sorted_runs_type & result()
{
if (!m_result_computed)
{
compute_result();
m_result_computed = true;
#ifdef STXXL_PRINT_STAT_AFTER_RF
STXXL_MSG(*stats::get_instance());
#endif
}
return m_result;
}
};
template <class Input_, class CompareType_, unsigned BlockSize_, class AllocStr_>
void basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_>::compute_result()
{
unsigned_type i = 0;
unsigned_type m2 = m_memsize / 2;
const unsigned_type el_in_run = m2 * block_type::size;
STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
unsigned_type blocks1_length = 0, blocks2_length = 0;
block_type * Blocks1 = NULL;
#ifndef STXXL_SMALL_INPUT_PSORT_OPT
Blocks1 = new block_type[m2 * 2];
#else
while (!input.empty() && blocks1_length != block_type::size)
{
m_result->small_run.push_back(*input);
++input;
++blocks1_length;
}
if (blocks1_length == block_type::size && !input.empty())
{
Blocks1 = new block_type[m2 * 2];
std::copy(m_result->small_run.begin(), m_result->small_run.end(), Blocks1[0].begin());
m_result->small_run.clear();
}
else
{
STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
m_result->elements = blocks1_length;
check_sort_settings();
potentially_parallel::sort(m_result->small_run.begin(), m_result->small_run.end(), cmp);
return;
}
#endif
blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
sort_run(Blocks1, blocks1_length);
if (blocks1_length <= block_type::size && m_input.empty())
{
STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
assert(m_result->small_run.empty());
m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
m_result->elements = blocks1_length;
delete[] Blocks1;
return;
}
block_type * Blocks2 = Blocks1 + m2;
block_manager * bm = block_manager::get_instance();
request_ptr * write_reqs = new request_ptr[m2];
run_type run;
unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);
run.resize(cur_run_size);
bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
for (i = 0; i < cur_run_size; ++i)
{
run[i].value = Blocks1[i][0];
write_reqs[i] = Blocks1[i].write(run[i].bid);
}
m_result->runs.push_back(run);
m_result->runs_sizes.push_back(blocks1_length);
m_result->elements += blocks1_length;
if (m_input.empty())
{
wait_all(write_reqs, write_reqs + cur_run_size);
delete[] write_reqs;
delete[] Blocks1;
return;
}
STXXL_VERBOSE1("Filling the second part of the allocated blocks");
blocks2_length = fetch(Blocks2, 0, el_in_run);
if (m_input.empty())
{
blocks2_length += el_in_run;
sort_run(Blocks1, blocks2_length);
wait_all(write_reqs, write_reqs + cur_run_size);
bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
cur_run_size = div_ceil(blocks2_length, block_type::size);
run.resize(cur_run_size);
bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
assert(cur_run_size > m2);
for (i = 0; i < m2; ++i)
{
run[i].value = Blocks1[i][0];
write_reqs[i]->wait();
write_reqs[i] = Blocks1[i].write(run[i].bid);
}
request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
for ( ; i < cur_run_size; ++i)
{
run[i].value = Blocks1[i][0];
write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
}
m_result->runs[0] = run;
m_result->runs_sizes[0] = blocks2_length;
m_result->elements = blocks2_length;
wait_all(write_reqs, write_reqs + m2);
delete[] write_reqs;
wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
delete[] write_reqs1;
delete[] Blocks1;
return;
}
sort_run(Blocks2, blocks2_length);
cur_run_size = div_ceil(blocks2_length, block_type::size);
run.resize(cur_run_size);
bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
for (i = 0; i < cur_run_size; ++i)
{
run[i].value = Blocks2[i][0];
write_reqs[i]->wait();
write_reqs[i] = Blocks2[i].write(run[i].bid);
}
assert((blocks2_length % el_in_run) == 0);
m_result->add_run(run, blocks2_length);
while (!m_input.empty())
{
blocks1_length = fetch(Blocks1, 0, el_in_run);
sort_run(Blocks1, blocks1_length);
cur_run_size = div_ceil(blocks1_length, block_type::size);
run.resize(cur_run_size);
bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
for (i = 0; i < cur_run_size; ++i)
{
run[i].value = Blocks1[i][0];
write_reqs[i]->wait();
write_reqs[i] = Blocks1[i].write(run[i].bid);
}
m_result->add_run(run, blocks1_length);
std::swap(Blocks1, Blocks2);
std::swap(blocks1_length, blocks2_length);
}
wait_all(write_reqs, write_reqs + m2);
delete[] write_reqs;
delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
}
template <
class Input_,
class CompareType_,
unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
class runs_creator : public basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_>
{
private:
typedef basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> base;
public:
typedef typename base::cmp_type cmp_type;
typedef typename base::value_type value_type;
typedef typename base::block_type block_type;
typedef typename base::sorted_runs_data_type sorted_runs_data_type;
typedef typename base::sorted_runs_type sorted_runs_type;
public:
runs_creator(Input_ & input, CompareType_ cmp, unsigned_type memory_to_use)
: base(input, cmp, memory_to_use)
{ }
};
template <class ValueType_>
struct use_push
{
typedef ValueType_ value_type;
};
template <
class ValueType_,
class CompareType_,
unsigned BlockSize_,
class AllocStr_>
class runs_creator<
use_push<ValueType_>,
CompareType_,
BlockSize_,
AllocStr_>
: private noncopyable
{
public:
typedef CompareType_ cmp_type;
typedef ValueType_ value_type;
typedef typed_block<BlockSize_, value_type> block_type;
typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type;
typedef counting_ptr<sorted_runs_data_type> sorted_runs_type;
typedef sorted_runs_type result_type;
private:
CompareType_ m_cmp;
typedef typename sorted_runs_data_type::run_type run_type;
sorted_runs_type m_result;
const unsigned_type m_memory_to_use;
const unsigned_type m_memsize;
const unsigned_type m_m2;
bool m_result_computed;
const unsigned_type m_el_in_run;
unsigned_type m_cur_el;
block_type * m_blocks1;
block_type * m_blocks2;
request_ptr * m_write_reqs;
run_type run;
protected:
void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
{
unsigned_type last_idx = num_blocks * block_type::size;
if (first_idx < last_idx) {
typename element_iterator_traits<block_type>::element_iterator curr =
make_element_iterator(blocks, first_idx);
while (first_idx != last_idx) {
*curr = m_cmp.max_value();
++curr;
++first_idx;
}
}
}
void sort_run(block_type * run, unsigned_type elements)
{
check_sort_settings();
potentially_parallel::sort(make_element_iterator(run, 0),
make_element_iterator(run, elements),
m_cmp);
}
void compute_result()
{
if (m_cur_el == 0)
return;
sort_run(m_blocks1, m_cur_el);
if (m_cur_el <= block_type::size && m_result->elements == 0)
{
STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
m_result->elements = m_cur_el;
return;
}
const unsigned_type cur_run_size = div_ceil(m_cur_el, block_type::size);
run.resize(cur_run_size);
block_manager * bm = block_manager::get_instance();
bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
unsigned_type i = 0;
for ( ; i < cur_run_size; ++i)
{
run[i].value = m_blocks1[i][0];
if (m_write_reqs[i].get())
m_write_reqs[i]->wait();
m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
}
m_result->add_run(run, m_cur_el);
for (i = 0; i < m_m2; ++i)
{
if (m_write_reqs[i].get())
m_write_reqs[i]->wait();
}
}
public:
runs_creator(CompareType_ cmp, unsigned_type memory_to_use) :
m_cmp(cmp),
m_memory_to_use(memory_to_use),
m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
m_m2(m_memsize / 2),
m_el_in_run(m_m2 * block_type::size),
m_blocks1(NULL), m_blocks2(NULL),
m_write_reqs(NULL)
{
sort_helper::verify_sentinel_strict_weak_ordering(m_cmp);
if (!(2 * BlockSize_ * sort_memory_usage_factor() <= m_memory_to_use)) {
throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
}
assert(m_m2 > 0);
allocate();
}
~runs_creator()
{
m_result_computed = 1;
deallocate();
}
void clear()
{
if (!m_result)
m_result = new sorted_runs_data_type;
else
m_result->clear();
m_result_computed = false;
m_cur_el = 0;
for (unsigned_type i = 0; i < m_m2; ++i)
{
if (m_write_reqs[i].get())
m_write_reqs[i]->cancel();
}
}
void allocate()
{
if (!m_blocks1)
{
m_blocks1 = new block_type[m_m2 * 2];
m_blocks2 = m_blocks1 + m_m2;
m_write_reqs = new request_ptr[m_m2];
}
clear();
}
void deallocate()
{
result();
if (m_blocks1)
{
delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
m_blocks1 = m_blocks2 = NULL;
delete[] m_write_reqs;
m_write_reqs = NULL;
}
}
void push(const value_type & val)
{
assert(m_result_computed == false);
if (LIKELY(m_cur_el < m_el_in_run))
{
m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
++m_cur_el;
return;
}
assert(m_el_in_run == m_cur_el);
m_cur_el = 0;
sort_run(m_blocks1, m_el_in_run);
const unsigned_type cur_run_blocks = div_ceil(m_el_in_run, block_type::size);
run.resize(cur_run_blocks);
block_manager * bm = block_manager::get_instance();
bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
for (unsigned_type i = 0; i < cur_run_blocks; ++i)
{
run[i].value = m_blocks1[i][0];
if (m_write_reqs[i].get())
m_write_reqs[i]->wait();
m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
}
m_result->add_run(run, m_el_in_run);
std::swap(m_blocks1, m_blocks2);
push(val);
}
sorted_runs_type & result()
{
if (!m_result_computed)
{
compute_result();
m_result_computed = true;
#ifdef STXXL_PRINT_STAT_AFTER_RF
STXXL_MSG(*stats::get_instance());
#endif
}
return m_result;
}
unsigned_type size() const
{
return m_result->elements + m_cur_el;
}
const cmp_type& cmp() const
{
return m_cmp;
}
unsigned_type memory_used() const
{
return m_memory_to_use;
}
};
template <class ValueType_>
struct from_sorted_sequences
{
typedef ValueType_ value_type;
};
template <
class ValueType_,
class CompareType_,
unsigned BlockSize_,
class AllocStr_>
class runs_creator<
from_sorted_sequences<ValueType_>,
CompareType_,
BlockSize_,
AllocStr_>
: private noncopyable
{
public:
typedef ValueType_ value_type;
typedef typed_block<BlockSize_, value_type> block_type;
typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
typedef AllocStr_ alloc_strategy_type;
public:
typedef CompareType_ cmp_type;
typedef sorted_runs<trigger_entry_type,cmp_type> sorted_runs_data_type;
typedef counting_ptr<sorted_runs_data_type> sorted_runs_type;
typedef sorted_runs_type result_type;
private:
typedef typename sorted_runs_data_type::run_type run_type;
CompareType_ cmp;
sorted_runs_type result_;
unsigned_type m_;
buffered_writer<block_type> writer;
block_type * cur_block;
unsigned_type offset;
unsigned_type iblock;
unsigned_type irun;
alloc_strategy_type alloc_strategy;
public:
runs_creator(CompareType_ c, unsigned_type memory_to_use) :
cmp(c),
result_(new sorted_runs_data_type),
m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
writer(m_, m_ / 2),
cur_block(writer.get_free_block()),
offset(0),
iblock(0),
irun(0)
{
sort_helper::verify_sentinel_strict_weak_ordering(cmp);
assert(m_ > 0);
if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
}
}
void push(const value_type & val)
{
assert(offset < block_type::size);
(*cur_block)[offset] = val;
++offset;
if (offset == block_type::size)
{
block_manager * bm = block_manager::get_instance();
result_->runs.resize(irun + 1);
result_->runs[irun].resize(iblock + 1);
bm->new_blocks(
alloc_strategy,
make_bid_iterator(result_->runs[irun].begin() + iblock),
make_bid_iterator(result_->runs[irun].end()),
iblock
);
result_->runs[irun][iblock].value = (*cur_block)[0];
cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
++iblock;
offset = 0;
}
++result_->elements;
}
void finish()
{
if (offset == 0 && iblock == 0)
return;
result_->runs_sizes.resize(irun + 1);
result_->runs_sizes.back() = iblock * block_type::size + offset;
if (offset)
{
while (offset != block_type::size)
{
(*cur_block)[offset] = cmp.max_value();
++offset;
}
offset = 0;
block_manager * bm = block_manager::get_instance();
result_->runs.resize(irun + 1);
result_->runs[irun].resize(iblock + 1);
bm->new_blocks(
alloc_strategy,
make_bid_iterator(result_->runs[irun].begin() + iblock),
make_bid_iterator(result_->runs[irun].end()),
iblock
);
result_->runs[irun][iblock].value = (*cur_block)[0];
cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
}
else
{ }
alloc_strategy = alloc_strategy_type();
iblock = 0;
++irun;
}
sorted_runs_type & result()
{
finish();
writer.flush();
return result_;
}
};
template <class RunsType_, class CompareType_>
bool check_sorted_runs(const RunsType_ & sruns, CompareType_ cmp)
{
sort_helper::verify_sentinel_strict_weak_ordering(cmp);
typedef typename RunsType_::element_type::block_type block_type;
typedef typename block_type::value_type value_type;
STXXL_VERBOSE2("Elements: " << sruns->elements);
unsigned_type nruns = sruns->runs.size();
STXXL_VERBOSE2("Runs: " << nruns);
unsigned_type irun = 0;
for (irun = 0; irun < nruns; ++irun)
{
const unsigned_type nblocks = sruns->runs[irun].size();
block_type * blocks = new block_type[nblocks];
request_ptr * reqs = new request_ptr[nblocks];
for (unsigned_type j = 0; j < nblocks; ++j)
{
reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
}
wait_all(reqs, reqs + nblocks);
for (unsigned_type j = 0; j < nblocks; ++j)
{
if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
cmp(sruns->runs[irun][j].value, blocks[j][0]))
{
STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
return false;
}
}
if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
make_element_iterator(blocks, sruns->runs_sizes[irun]),
cmp))
{
STXXL_ERRMSG("check_sorted_runs wrong order in the run");
return false;
}
delete[] reqs;
delete[] blocks;
}
STXXL_MSG("Checking runs finished successfully");
return true;
}
template <class RunsType_,
class CompareType_,
class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
class basic_runs_merger : private noncopyable
{
public:
typedef RunsType_ sorted_runs_type;
typedef CompareType_ value_cmp;
typedef AllocStr_ alloc_strategy;
typedef typename sorted_runs_type::element_type sorted_runs_data_type;
typedef typename sorted_runs_data_type::size_type size_type;
typedef typename sorted_runs_data_type::run_type run_type;
typedef typename sorted_runs_data_type::block_type block_type;
typedef block_type out_block_type;
typedef typename run_type::value_type trigger_entry_type;
typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
typedef stxxl::int64 diff_type;
typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
typedef typename std::vector<sequence>::size_type seqs_size_type;
public:
typedef typename sorted_runs_data_type::value_type value_type;
private:
value_cmp m_cmp;
unsigned_type m_memory_to_use;
sorted_runs_type m_sruns;
size_type m_elements_remaining;
out_block_type* m_buffer_block;
const value_type* m_current_ptr;
const value_type* m_current_end;
run_type m_consume_seq;
int_type* m_prefetch_seq;
prefetcher_type * m_prefetcher;
loser_tree_type * m_losers;
#if STXXL_PARALLEL_MULTIWAY_MERGE
std::vector<sequence> * seqs;
std::vector<block_type *> * buffers;
diff_type num_currently_mergeable;
#endif
#if STXXL_CHECK_ORDER_IN_SORTS
value_type m_last_element;
#endif
void merge_recursively();
void deallocate_prefetcher()
{
if (m_prefetcher)
{
delete m_losers;
#if STXXL_PARALLEL_MULTIWAY_MERGE
delete seqs;
delete buffers;
#endif
delete m_prefetcher;
delete[] m_prefetch_seq;
m_prefetcher = NULL;
}
}
void fill_buffer_block()
{
STXXL_VERBOSE1("fill_buffer_block");
if (do_parallel_merge())
{
#if STXXL_PARALLEL_MULTIWAY_MERGE
diff_type rest = out_block_type::size;
do
{
if (num_currently_mergeable < rest)
{
if (!m_prefetcher || m_prefetcher->empty())
{
num_currently_mergeable = m_elements_remaining;
}
else
{
num_currently_mergeable = sort_helper::count_elements_less_equal(
*seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
}
}
diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);
STXXL_VERBOSE1("before merge " << output_size);
stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size);
rest -= output_size;
num_currently_mergeable -= output_size;
STXXL_VERBOSE1("after merge");
sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *m_prefetcher);
} while (rest > 0 && (*seqs).size() > 0);
#if STXXL_CHECK_ORDER_IN_SORTS
if (!stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
{
for (value_type * i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
if (cmp(*i, *(i - 1)))
{
STXXL_VERBOSE1("Error at position " << (i - m_buffer_block->begin()));
}
assert(false);
}
#endif
#else
STXXL_THROW_UNREACHABLE();
#endif
}
else
{
m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
}
STXXL_VERBOSE1("current block filled");
m_current_ptr = m_buffer_block->elem;
m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
if (m_elements_remaining <= out_block_type::size)
deallocate_prefetcher();
}
public:
basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
: m_cmp(c),
m_memory_to_use(memory_to_use),
m_buffer_block(new out_block_type),
m_prefetch_seq(NULL),
m_prefetcher(NULL),
m_losers(NULL)
#if STXXL_PARALLEL_MULTIWAY_MERGE
, seqs(NULL),
buffers(NULL),
num_currently_mergeable(0)
#endif
#if STXXL_CHECK_ORDER_IN_SORTS
, m_last_element(m_cmp.min_value())
#endif
{
sort_helper::verify_sentinel_strict_weak_ordering(m_cmp);
}
void set_memory_to_use(unsigned_type memory_to_use)
{
m_memory_to_use = memory_to_use;
}
void initialize(const sorted_runs_type & sruns)
{
m_sruns = sruns;
m_elements_remaining = m_sruns->elements;
if (empty())
return;
if (!m_sruns->small_run.empty())
{
STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
assert(m_elements_remaining == size_type(m_sruns->small_run.size()));
m_current_ptr = &m_sruns->small_run[0];
m_current_end = m_current_ptr + m_sruns->small_run.size();
return;
}
#if STXXL_CHECK_ORDER_IN_SORTS
assert(check_sorted_runs(m_sruns, m_cmp));
#endif
disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
int_type disks_number = config::get_instance()->disks_number();
unsigned_type min_prefetch_buffers = 2 * disks_number;
unsigned_type input_buffers = (m_memory_to_use > sizeof(out_block_type) ? m_memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
unsigned_type nruns = m_sruns->runs.size();
if (input_buffers < nruns + min_prefetch_buffers)
{
STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
STXXL_WARNMSG_RECURSIVE_SORT("memory_to_use=" << m_memory_to_use << " bytes block_type::raw_size=" << block_type::raw_size << " bytes");
unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
<< min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
}
merge_recursively();
nruns = m_sruns->runs.size();
}
assert(nruns + min_prefetch_buffers <= input_buffers);
deallocate_prefetcher();
unsigned_type prefetch_seq_size = 0;
for (unsigned_type i = 0; i < nruns; ++i)
{
prefetch_seq_size += m_sruns->runs[i].size();
}
m_consume_seq.resize(prefetch_seq_size);
m_prefetch_seq = new int_type[prefetch_seq_size];
typename run_type::iterator copy_start = m_consume_seq.begin();
for (unsigned_type i = 0; i < nruns; ++i)
{
copy_start = std::copy(m_sruns->runs[i].begin(),
m_sruns->runs[i].end(),
copy_start);
}
std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(m_cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
#if STXXL_SORT_OPTIMAL_PREFETCHING
const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
compute_prefetch_schedule(
m_consume_seq,
m_prefetch_seq,
n_opt_prefetch_buffers,
disks_number);
#else
for (unsigned_type i = 0; i < prefetch_seq_size; ++i)
m_prefetch_seq[i] = i;
#endif
m_prefetcher = new prefetcher_type(
m_consume_seq.begin(),
m_consume_seq.end(),
m_prefetch_seq,
STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
if (do_parallel_merge())
{
#if STXXL_PARALLEL_MULTIWAY_MERGE
seqs = new std::vector<sequence>(nruns);
buffers = new std::vector<block_type *>(nruns);
for (unsigned_type i = 0; i < nruns; ++i)
{
(*buffers)[i] = m_prefetcher->pull_block();
(*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
}
#else
STXXL_THROW_UNREACHABLE();
#endif
}
else
{
m_losers = new loser_tree_type(m_prefetcher, nruns, run_cursor2_cmp_type(m_cmp));
}
fill_buffer_block();
}
void deallocate()
{
deallocate_prefetcher();
m_sruns = NULL;
}
public:
bool empty() const
{
return (m_elements_remaining == 0);
}
size_type size() const
{
return m_elements_remaining;
}
const value_type & operator * () const
{
assert(!empty());
return *m_current_ptr;
}
const value_type * operator -> () const
{
return &(operator * ());
}
basic_runs_merger & operator ++ ()
{
assert(!empty());
assert(m_current_ptr != m_current_end);
--m_elements_remaining;
++m_current_ptr;
if (LIKELY(m_current_ptr == m_current_end && !empty()))
{
fill_buffer_block();
#if STXXL_CHECK_ORDER_IN_SORTS
assert(stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
#endif
}
#if STXXL_CHECK_ORDER_IN_SORTS
if (!empty())
{
assert(!m_cmp(operator*(), m_last_element));
m_last_element = operator*();
}
#endif
return *this;
}
virtual ~basic_runs_merger()
{
deallocate_prefetcher();
delete m_buffer_block;
}
};
template <class RunsType_, class CompareType_, class AllocStr_>
void basic_runs_merger<RunsType_, CompareType_, AllocStr_>::merge_recursively()
{
block_manager * bm = block_manager::get_instance();
unsigned_type ndisks = config::get_instance()->disks_number();
unsigned_type nwrite_buffers = 2 * ndisks;
unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
unsigned_type memory_for_buffers = memory_for_write_buffers
+ recursive_merger_memory_prefetch_buffers
+ recursive_merger_memory_out_block;
unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
unsigned_type nruns = m_sruns->runs.size();
const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
assert(merge_factor > 1);
assert(merge_factor <= max_arity);
while (nruns > max_arity)
{
unsigned_type new_nruns = div_ceil(nruns, merge_factor);
STXXL_MSG("Starting new merge phase: nruns: " << nruns <<
" opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
sorted_runs_data_type new_runs;
new_runs.runs.resize(new_nruns);
new_runs.runs_sizes.resize(new_nruns);
new_runs.elements = m_sruns->elements;
unsigned_type runs_left = nruns;
unsigned_type cur_out_run = 0;
size_type elements_left = m_sruns->elements;
while (runs_left > 0)
{
unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
STXXL_MSG("Merging " << runs2merge << " runs");
if (runs2merge > 1)
{
unsigned_type elements_in_new_run = 0;
for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
{
elements_in_new_run += m_sruns->runs_sizes[i];
}
new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
const unsigned_type blocks_in_new_run = div_ceil(elements_in_new_run, block_type::size);
new_runs.runs[cur_out_run].resize(blocks_in_new_run);
bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[cur_out_run].begin()), make_bid_iterator(new_runs.runs[cur_out_run].end()));
sorted_runs_data_type cur_runs;
cur_runs.runs.resize(runs2merge);
cur_runs.runs_sizes.resize(runs2merge);
cur_runs.inc_ref();
std::copy(m_sruns->runs.begin() + nruns - runs_left,
m_sruns->runs.begin() + nruns - runs_left + runs2merge,
cur_runs.runs.begin());
std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
cur_runs.runs_sizes.begin());
cur_runs.elements = elements_in_new_run;
elements_left -= elements_in_new_run;
basic_runs_merger<RunsType_, CompareType_, AllocStr_> merger(m_cmp, m_memory_to_use - memory_for_write_buffers);
merger.initialize(&cur_runs);
{
buf_ostream<block_type, typename run_type::iterator> out(
new_runs.runs[cur_out_run].begin(),
nwrite_buffers);
size_type cnt = 0;
const size_type cnt_max = cur_runs.elements;
while (cnt != cnt_max)
{
*out = *merger;
if ((cnt % block_type::size) == 0)
new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
++cnt, ++out, ++merger;
}
assert(merger.empty());
while (cnt % block_type::size)
{
*out = m_cmp.max_value();
++out, ++cnt;
}
}
}
else
{
assert( cur_out_run+1 == new_runs.runs.size() );
elements_left -= m_sruns->runs_sizes.back();
new_runs.runs.back() = m_sruns->runs.back();
new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
}
runs_left -= runs2merge;
++cur_out_run;
}
assert(elements_left == 0);
m_sruns->runs.clear();
std::swap(nruns, new_nruns);
m_sruns->swap(new_runs);
}
}
template <class RunsType_,
class CompareType_ = typename RunsType_::element_type::cmp_type,
class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
class runs_merger : public basic_runs_merger<RunsType_, CompareType_, AllocStr_>
{
protected:
typedef basic_runs_merger<RunsType_, CompareType_, AllocStr_> base;
public:
typedef RunsType_ sorted_runs_type;
typedef typename base::value_cmp value_cmp;
typedef typename base::value_cmp cmp_type;
typedef typename base::block_type block_type;
public:
runs_merger(sorted_runs_type & sruns, value_cmp cmp, unsigned_type memory_to_use)
: base(cmp, memory_to_use)
{
initialize(sruns);
}
runs_merger(value_cmp cmp, unsigned_type memory_to_use)
: base(cmp, memory_to_use)
{
}
};
template <class Input_,
class CompareType_,
unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
class runs_creator_type = runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> >
class sort : public noncopyable
{
typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
typedef runs_merger<sorted_runs_type, CompareType_, AllocStr_> runs_merger_type;
runs_creator_type creator;
runs_merger_type merger;
public:
typedef typename Input_::value_type value_type;
sort(Input_ & in, CompareType_ c, unsigned_type memory_to_use) :
creator(in, c, memory_to_use),
merger(creator.result(), c, memory_to_use)
{
sort_helper::verify_sentinel_strict_weak_ordering(c);
}
sort(Input_ & in, CompareType_ c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use) :
creator(in, c, m_memory_to_userc),
merger(creator.result(), c, m_memory_to_use)
{
sort_helper::verify_sentinel_strict_weak_ordering(c);
}
bool empty() const
{
return merger.empty();
}
const value_type & operator * () const
{
assert(!empty());
return *merger;
}
const value_type * operator -> () const
{
assert(!empty());
return merger.operator -> ();
}
sort & operator ++ ()
{
++merger;
return *this;
}
};
template <
class ValueType_,
unsigned BlockSize_>
class compute_sorted_runs_type
{
typedef ValueType_ value_type;
typedef BID<BlockSize_> bid_type;
typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
public:
typedef sorted_runs<trigger_entry_type,std::less<value_type> > result;
};
}
template <unsigned BlockSize,
class RandomAccessIterator,
class CmpType,
class AllocStr>
void sort(RandomAccessIterator begin,
RandomAccessIterator end,
CmpType cmp,
unsigned_type MemSize,
AllocStr AS)
{
STXXL_UNUSED(AS);
#ifdef BOOST_MSVC
typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
#else
typedef __typeof__(stream::streamify(begin, end)) InputType;
#endif
InputType Input(begin, end);
typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
sorter_type Sort(Input, cmp, MemSize);
stream::materialize(Sort, begin);
}
__STXXL_END_NAMESPACE
#endif