Stxxl  1.4.0
include/stxxl/bits/containers/queue.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/containers/queue.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2005 Roman Dementiev <dementiev@ira.uka.de>
00007  *  Copyright (C) 2009, 2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00008  *
00009  *  Distributed under the Boost Software License, Version 1.0.
00010  *  (See accompanying file LICENSE_1_0.txt or copy at
00011  *  http://www.boost.org/LICENSE_1_0.txt)
00012  **************************************************************************/
00013 
00014 #ifndef STXXL_QUEUE_HEADER
00015 #define STXXL_QUEUE_HEADER
00016 
00017 #include <vector>
00018 #include <queue>
00019 #include <deque>
00020 
00021 #include <stxxl/bits/deprecated.h>
00022 #include <stxxl/bits/mng/mng.h>
00023 #include <stxxl/bits/mng/typed_block.h>
00024 #include <stxxl/bits/common/simple_vector.h>
00025 #include <stxxl/bits/common/tmeta.h>
00026 #include <stxxl/bits/mng/read_write_pool.h>
00027 #include <stxxl/bits/mng/write_pool.h>
00028 #include <stxxl/bits/mng/prefetch_pool.h>
00029 
00030 
00031 __STXXL_BEGIN_NAMESPACE
00032 
00033 #ifndef STXXL_VERBOSE_QUEUE
00034 #define STXXL_VERBOSE_QUEUE STXXL_VERBOSE2
00035 #endif
00036 
00037 //! \addtogroup stlcont
00038 //! \{
00039 
00040 
00041 //! \brief External FIFO queue container
00042 
00043 //! \tparam ValTp type of the contained objects (POD with no references to internal memory)
00044 //! \tparam BlkSz size of the external memory block in bytes, default is \c STXXL_DEFAULT_BLOCK_SIZE(ValTp)
00045 //! \tparam AllocStr parallel disk allocation strategy, default is \c STXXL_DEFAULT_ALLOC_STRATEGY
00046 //! \tparam SzTp size data type, default is \c stxxl::uint64
00047 template <class ValTp,
00048           unsigned BlkSz = STXXL_DEFAULT_BLOCK_SIZE(ValTp),
00049           class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
00050           class SzTp = stxxl::uint64>
00051 class queue : private noncopyable
00052 {
00053 public:
00054     typedef ValTp value_type;
00055     typedef AllocStr alloc_strategy_type;
00056     typedef SzTp size_type;
00057     enum {
00058         block_size = BlkSz
00059     };
00060 
00061     typedef typed_block<block_size, value_type> block_type;
00062     typedef BID<block_size> bid_type;
00063 
00064 private:
00065     typedef read_write_pool<block_type> pool_type;
00066 
00067     size_type size_;
00068     bool delete_pool;
00069     pool_type * pool;
00070     block_type * front_block;
00071     block_type * back_block;
00072     value_type * front_element;
00073     value_type * back_element;
00074     alloc_strategy_type alloc_strategy;
00075     unsigned_type alloc_count;
00076     std::deque<bid_type> bids;
00077     block_manager * bm;
00078     unsigned_type blocks2prefetch;
00079 
00080 public:
00081     //! \brief Constructs empty queue with own write and prefetch block pool
00082 
00083     //! \param D  number of parallel disks, defaulting to the configured number of scratch disks,
00084     //!           memory consumption will be 2 * D + 2 blocks 
00085     //!           (first and last block, D blocks as write cache, D block for prefetching)
00086     explicit queue(int D = -1) :
00087         size_(0),
00088         delete_pool(true),
00089         alloc_count(0),
00090         bm(block_manager::get_instance())
00091     {
00092         if (D < 1)
00093             D = config::get_instance()->disks_number();
00094         STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(D)");
00095         pool = new pool_type(D, D + 2);
00096         init();
00097     }
00098 
00099     //! \brief Constructs empty queue with own write and prefetch block pool
00100 
00101     //! \param w_pool_size  number of blocks in the write pool, must be at least 2, recommended at least 3
00102     //! \param p_pool_size  number of blocks in the prefetch pool, recommended at least 1
00103     //! \param blocks2prefetch_  defines the number of blocks to prefetch (\c front side),
00104     //!                          default is number of block in the prefetch pool
00105     explicit queue(unsigned_type w_pool_size, unsigned_type p_pool_size, int blocks2prefetch_ = -1) :
00106         size_(0),
00107         delete_pool(true),
00108         alloc_count(0),
00109         bm(block_manager::get_instance())
00110     {
00111         STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(sizes)");
00112         pool = new pool_type(p_pool_size, w_pool_size);
00113         init(blocks2prefetch_);
00114     }
00115 
00116     //! \brief Constructs empty queue
00117 
00118     //! \param w_pool write pool
00119     //! \param p_pool prefetch pool
00120     //! \param blocks2prefetch_  defines the number of blocks to prefetch (\c front side),
00121     //!                          default is number of blocks in the prefetch pool
00122     //!  \warning Number of blocks in the write pool must be at least 2, recommended at least 3
00123     //!  \warning Number of blocks in the prefetch pool recommended at least 1
00124     _STXXL_DEPRECATED(
00125     queue(write_pool<block_type> & w_pool, prefetch_pool<block_type> & p_pool, int blocks2prefetch_ = -1)) :
00126         size_(0),
00127         delete_pool(true),
00128         alloc_count(0),
00129         bm(block_manager::get_instance())
00130     {
00131         STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pools)");
00132         pool = new pool_type(w_pool, p_pool);
00133         init(blocks2prefetch_);
00134     }
00135 
00136     //! \brief Constructs empty queue
00137 
00138     //! \param pool_ block write/prefetch pool
00139     //! \param blocks2prefetch_  defines the number of blocks to prefetch (\c front side),
00140     //!                          default is number of blocks in the prefetch pool
00141     //!  \warning Number of blocks in the write pool must be at least 2, recommended at least 3
00142     //!  \warning Number of blocks in the prefetch pool recommended at least 1
00143     queue(pool_type & pool_, int blocks2prefetch_ = -1) :
00144         size_(0),
00145         delete_pool(false),
00146         pool(&pool_),
00147         alloc_count(0),
00148         bm(block_manager::get_instance())
00149     {
00150         STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pool)");
00151         init(blocks2prefetch_);
00152     }
00153 
00154     void swap(queue& obj)
00155     {
00156         std::swap(size_, obj.size_);
00157         std::swap(delete_pool, obj.delete_pool);
00158         std::swap(pool, obj.pool);
00159         std::swap(front_block, obj.front_block);
00160         std::swap(back_block, obj.back_block);
00161         std::swap(front_element, obj.front_element);
00162         std::swap(back_element, obj.back_element);
00163         std::swap(alloc_strategy, obj.alloc_strategy);
00164         std::swap(alloc_count, obj.alloc_count);
00165         std::swap(bids, obj.bids);
00166         std::swap(bm, obj.bm);
00167         std::swap(blocks2prefetch, obj.blocks2prefetch);
00168     }
00169 
00170 private:
00171     void init(int blocks2prefetch_ = -1)
00172     {
00173         if (pool->size_write() < 2) {
00174             STXXL_ERRMSG("queue: invalid configuration, not enough blocks (" << pool->size_write() <<
00175                          ") in write pool, at least 2 are needed, resizing to 3");
00176             pool->resize_write(3);
00177         }
00178 
00179         if (pool->size_write() < 3) {
00180             STXXL_MSG("queue: inefficient configuration, no blocks for buffered writing available");
00181         }
00182 
00183         if (pool->size_prefetch() < 1) {
00184             STXXL_MSG("queue: inefficient configuration, no blocks for prefetching available");
00185         }
00186 
00187         front_block = back_block = pool->steal();
00188         back_element = back_block->begin() - 1;
00189         front_element = back_block->begin();
00190         set_prefetch_aggr(blocks2prefetch_);
00191     }
00192 
00193 public:
00194     //! \brief Defines the number of blocks to prefetch (\c front side)
00195     //!        This method should be called whenever the prefetch pool is resized
00196     //! \param blocks2prefetch_  defines the number of blocks to prefetch (\c front side),
00197     //!                          a negative value means to use the number of blocks in the prefetch pool
00198     void set_prefetch_aggr(int_type blocks2prefetch_)
00199     {
00200         if (blocks2prefetch_ < 0)
00201             blocks2prefetch = pool->size_prefetch();
00202         else
00203             blocks2prefetch = blocks2prefetch_;
00204     }
00205 
00206     //! \brief Returns the number of blocks prefetched from the \c front side
00207     unsigned_type get_prefetch_aggr() const
00208     {
00209         return blocks2prefetch;
00210     }
00211 
00212     //! \brief Adds an element in the queue
00213     void push(const value_type & val)
00214     {
00215         if (UNLIKELY(back_element == back_block->begin() + (block_type::size - 1)))
00216         {
00217             // back block is filled
00218             if (front_block == back_block)
00219             {             // can not write the back block because it
00220                 // is the same as the front block, must keep it memory
00221                 STXXL_VERBOSE1("queue::push Case 1");
00222             }
00223             else if (size() < 2 * block_type::size)
00224             {
00225                 STXXL_VERBOSE1("queue::push Case 1.5");
00226                 // only two blocks with a gap in the beginning, move elements within memory
00227                 assert(bids.empty());
00228                 size_t gap = front_element - front_block->begin();
00229                 assert(gap > 0);
00230                 std::copy(front_element, front_block->end(), front_block->begin());
00231                 std::copy(back_block->begin(), back_block->begin() + gap, front_block->begin() + (block_type::size - gap));
00232                 std::copy(back_block->begin() + gap, back_block->end(), back_block->begin());
00233                 front_element -= gap;
00234                 back_element -= gap;
00235 
00236                 ++back_element;
00237                 *back_element = val;
00238                 ++size_;
00239                 return;
00240             }
00241             else
00242             {
00243                 STXXL_VERBOSE1("queue::push Case 2");
00244                 // write the back block
00245                 // need to allocate new block
00246                 bid_type newbid;
00247 
00248                 bm->new_block(alloc_strategy, newbid, alloc_count++);
00249 
00250                 STXXL_VERBOSE_QUEUE("queue[" << this << "]: push block " << back_block << " @ " << FMT_BID(newbid));
00251                 bids.push_back(newbid);
00252                 pool->write(back_block, newbid);
00253                 if (bids.size() <= blocks2prefetch) {
00254                     STXXL_VERBOSE1("queue::push Case Hints");
00255                     pool->hint(newbid);
00256                 }
00257             }
00258             back_block = pool->steal();
00259 
00260             back_element = back_block->begin();
00261             *back_element = val;
00262             ++size_;
00263             return;
00264         }
00265         ++back_element;
00266         *back_element = val;
00267         ++size_;
00268     }
00269 
00270     //! \brief Removes element from the queue
00271     void pop()
00272     {
00273         assert(!empty());
00274 
00275         if (UNLIKELY(front_element == front_block->begin() + (block_type::size - 1)))
00276         {
00277             // if there is only one block, it implies ...
00278             if (back_block == front_block)
00279             {
00280                 STXXL_VERBOSE1("queue::pop Case 3");
00281                 assert(size() == 1);
00282                 assert(back_element == front_element);
00283                 assert(bids.empty());
00284                 // reset everything
00285                 back_element = back_block->begin() - 1;
00286                 front_element = back_block->begin();
00287                 size_ = 0;
00288                 return;
00289             }
00290 
00291             --size_;
00292             if (size_ <= block_type::size)
00293             {
00294                 STXXL_VERBOSE1("queue::pop Case 4");
00295                 assert(bids.empty());
00296                 // the back_block is the next block
00297                 pool->add(front_block);
00298                 front_block = back_block;
00299                 front_element = back_block->begin();
00300                 return;
00301             }
00302             STXXL_VERBOSE1("queue::pop Case 5");
00303 
00304             assert(!bids.empty());
00305             request_ptr req = pool->read(front_block, bids.front());
00306             STXXL_VERBOSE_QUEUE("queue[" << this << "]: pop block  " << front_block << " @ " << FMT_BID(bids.front()));
00307 
00308             // give prefetching hints
00309             for (unsigned_type i = 0; i < blocks2prefetch && i < bids.size() - 1; ++i)
00310             {
00311                 STXXL_VERBOSE1("queue::pop Case Hints");
00312                 pool->hint(bids[i + 1]);
00313             }
00314 
00315             front_element = front_block->begin();
00316             req->wait();
00317 
00318             bm->delete_block(bids.front());
00319             bids.pop_front();
00320             return;
00321         }
00322 
00323         ++front_element;
00324         --size_;
00325     }
00326 
00327     //! \brief Returns the size of the queue
00328     size_type size() const
00329     {
00330         return size_;
00331     }
00332 
00333     //! \brief Returns \c true if queue is empty
00334     bool empty() const
00335     {
00336         return (size_ == 0);
00337     }
00338 
00339     //! \brief Returns a mutable reference at the back of the queue
00340     value_type & back()
00341     {
00342         assert(!empty());
00343         return *back_element;
00344     }
00345 
00346     //! \brief Returns a const reference at the back of the queue
00347     const value_type & back() const
00348     {
00349         assert(!empty());
00350         return *back_element;
00351     }
00352 
00353     //! \brief Returns a mutable reference at the front of the queue
00354     value_type & front()
00355     {
00356         assert(!empty());
00357         return *front_element;
00358     }
00359 
00360     //! \brief Returns a const reference at the front of the queue
00361     const value_type & front() const
00362     {
00363         assert(!empty());
00364         return *front_element;
00365     }
00366 
00367     ~queue()
00368     {
00369         if (front_block != back_block)
00370             pool->add(back_block);
00371         pool->add(front_block);
00372 
00373         if (delete_pool)
00374         {
00375             delete pool;
00376         }
00377 
00378         if (!bids.empty())
00379             bm->delete_blocks(bids.begin(), bids.end());
00380     }
00381 };
00382 
00383 //! \}
00384 
00385 __STXXL_END_NAMESPACE
00386 
00387 #endif // !STXXL_QUEUE_HEADER
00388 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines