http://stxxl.sourceforge.net
<dementiev@ira.uka.de>
<beckmann@cs.uni-frankfurt.de>
http://www.boost.org/LICENSE_1_0.txt
#ifndef STXXL_QUEUE_HEADER
#define STXXL_QUEUE_HEADER
#include <vector>
#include <queue>
#include <deque>
#include <stxxl/bits/deprecated.h>
#include <stxxl/bits/mng/mng.h>
#include <stxxl/bits/mng/typed_block.h>
#include <stxxl/bits/common/simple_vector.h>
#include <stxxl/bits/common/tmeta.h>
#include <stxxl/bits/mng/read_write_pool.h>
#include <stxxl/bits/mng/write_pool.h>
#include <stxxl/bits/mng/prefetch_pool.h>
__STXXL_BEGIN_NAMESPACE
#ifndef STXXL_VERBOSE_QUEUE
#define STXXL_VERBOSE_QUEUE STXXL_VERBOSE2
#endif
template <class ValTp,
unsigned BlkSz = STXXL_DEFAULT_BLOCK_SIZE(ValTp),
class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
class SzTp = stxxl::uint64>
class queue : private noncopyable
{
public:
typedef ValTp value_type;
typedef AllocStr alloc_strategy_type;
typedef SzTp size_type;
enum {
block_size = BlkSz
};
typedef typed_block<block_size, value_type> block_type;
typedef BID<block_size> bid_type;
private:
typedef read_write_pool<block_type> pool_type;
size_type size_;
bool delete_pool;
pool_type * pool;
block_type * front_block;
block_type * back_block;
value_type * front_element;
value_type * back_element;
alloc_strategy_type alloc_strategy;
unsigned_type alloc_count;
std::deque<bid_type> bids;
block_manager * bm;
unsigned_type blocks2prefetch;
public:
explicit queue(int D = -1) :
size_(0),
delete_pool(true),
alloc_count(0),
bm(block_manager::get_instance())
{
if (D < 1)
D = config::get_instance()->disks_number();
STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(D)");
pool = new pool_type(D, D + 2);
init();
}
explicit queue(unsigned_type w_pool_size, unsigned_type p_pool_size, int blocks2prefetch_ = -1) :
size_(0),
delete_pool(true),
alloc_count(0),
bm(block_manager::get_instance())
{
STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(sizes)");
pool = new pool_type(p_pool_size, w_pool_size);
init(blocks2prefetch_);
}
_STXXL_DEPRECATED(
queue(write_pool<block_type> & w_pool, prefetch_pool<block_type> & p_pool, int blocks2prefetch_ = -1)) :
size_(0),
delete_pool(true),
alloc_count(0),
bm(block_manager::get_instance())
{
STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pools)");
pool = new pool_type(w_pool, p_pool);
init(blocks2prefetch_);
}
queue(pool_type & pool_, int blocks2prefetch_ = -1) :
size_(0),
delete_pool(false),
pool(&pool_),
alloc_count(0),
bm(block_manager::get_instance())
{
STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pool)");
init(blocks2prefetch_);
}
void swap(queue& obj)
{
std::swap(size_, obj.size_);
std::swap(delete_pool, obj.delete_pool);
std::swap(pool, obj.pool);
std::swap(front_block, obj.front_block);
std::swap(back_block, obj.back_block);
std::swap(front_element, obj.front_element);
std::swap(back_element, obj.back_element);
std::swap(alloc_strategy, obj.alloc_strategy);
std::swap(alloc_count, obj.alloc_count);
std::swap(bids, obj.bids);
std::swap(bm, obj.bm);
std::swap(blocks2prefetch, obj.blocks2prefetch);
}
private:
void init(int blocks2prefetch_ = -1)
{
if (pool->size_write() < 2) {
STXXL_ERRMSG("queue: invalid configuration, not enough blocks (" << pool->size_write() <<
") in write pool, at least 2 are needed, resizing to 3");
pool->resize_write(3);
}
if (pool->size_write() < 3) {
STXXL_MSG("queue: inefficient configuration, no blocks for buffered writing available");
}
if (pool->size_prefetch() < 1) {
STXXL_MSG("queue: inefficient configuration, no blocks for prefetching available");
}
front_block = back_block = pool->steal();
back_element = back_block->begin() - 1;
front_element = back_block->begin();
set_prefetch_aggr(blocks2prefetch_);
}
public:
void set_prefetch_aggr(int_type blocks2prefetch_)
{
if (blocks2prefetch_ < 0)
blocks2prefetch = pool->size_prefetch();
else
blocks2prefetch = blocks2prefetch_;
}
unsigned_type get_prefetch_aggr() const
{
return blocks2prefetch;
}
void push(const value_type & val)
{
if (UNLIKELY(back_element == back_block->begin() + (block_type::size - 1)))
{
if (front_block == back_block)
{
STXXL_VERBOSE1("queue::push Case 1");
}
else if (size() < 2 * block_type::size)
{
STXXL_VERBOSE1("queue::push Case 1.5");
assert(bids.empty());
size_t gap = front_element - front_block->begin();
assert(gap > 0);
std::copy(front_element, front_block->end(), front_block->begin());
std::copy(back_block->begin(), back_block->begin() + gap, front_block->begin() + (block_type::size - gap));
std::copy(back_block->begin() + gap, back_block->end(), back_block->begin());
front_element -= gap;
back_element -= gap;
++back_element;
*back_element = val;
++size_;
return;
}
else
{
STXXL_VERBOSE1("queue::push Case 2");
bid_type newbid;
bm->new_block(alloc_strategy, newbid, alloc_count++);
STXXL_VERBOSE_QUEUE("queue[" << this << "]: push block " << back_block << " @ " << FMT_BID(newbid));
bids.push_back(newbid);
pool->write(back_block, newbid);
if (bids.size() <= blocks2prefetch) {
STXXL_VERBOSE1("queue::push Case Hints");
pool->hint(newbid);
}
}
back_block = pool->steal();
back_element = back_block->begin();
*back_element = val;
++size_;
return;
}
++back_element;
*back_element = val;
++size_;
}
void pop()
{
assert(!empty());
if (UNLIKELY(front_element == front_block->begin() + (block_type::size - 1)))
{
if (back_block == front_block)
{
STXXL_VERBOSE1("queue::pop Case 3");
assert(size() == 1);
assert(back_element == front_element);
assert(bids.empty());
back_element = back_block->begin() - 1;
front_element = back_block->begin();
size_ = 0;
return;
}
--size_;
if (size_ <= block_type::size)
{
STXXL_VERBOSE1("queue::pop Case 4");
assert(bids.empty());
pool->add(front_block);
front_block = back_block;
front_element = back_block->begin();
return;
}
STXXL_VERBOSE1("queue::pop Case 5");
assert(!bids.empty());
request_ptr req = pool->read(front_block, bids.front());
STXXL_VERBOSE_QUEUE("queue[" << this << "]: pop block " << front_block << " @ " << FMT_BID(bids.front()));
for (unsigned_type i = 0; i < blocks2prefetch && i < bids.size() - 1; ++i)
{
STXXL_VERBOSE1("queue::pop Case Hints");
pool->hint(bids[i + 1]);
}
front_element = front_block->begin();
req->wait();
bm->delete_block(bids.front());
bids.pop_front();
return;
}
++front_element;
--size_;
}
size_type size() const
{
return size_;
}
bool empty() const
{
return (size_ == 0);
}
value_type & back()
{
assert(!empty());
return *back_element;
}
const value_type & back() const
{
assert(!empty());
return *back_element;
}
value_type & front()
{
assert(!empty());
return *front_element;
}
const value_type & front() const
{
assert(!empty());
return *front_element;
}
~queue()
{
if (front_block != back_block)
pool->add(back_block);
pool->add(front_block);
if (delete_pool)
{
delete pool;
}
if (!bids.empty())
bm->delete_blocks(bids.begin(), bids.end());
}
};
__STXXL_END_NAMESPACE
#endif