Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/containers/queue.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2005 Roman Dementiev <dementiev@ira.uka.de> 00007 * Copyright (C) 2009, 2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * 00009 * Distributed under the Boost Software License, Version 1.0. 00010 * (See accompanying file LICENSE_1_0.txt or copy at 00011 * http://www.boost.org/LICENSE_1_0.txt) 00012 **************************************************************************/ 00013 00014 #ifndef STXXL_QUEUE_HEADER 00015 #define STXXL_QUEUE_HEADER 00016 00017 #include <vector> 00018 #include <queue> 00019 #include <deque> 00020 00021 #include <stxxl/bits/deprecated.h> 00022 #include <stxxl/bits/mng/mng.h> 00023 #include <stxxl/bits/mng/typed_block.h> 00024 #include <stxxl/bits/common/simple_vector.h> 00025 #include <stxxl/bits/common/tmeta.h> 00026 #include <stxxl/bits/mng/read_write_pool.h> 00027 #include <stxxl/bits/mng/write_pool.h> 00028 #include <stxxl/bits/mng/prefetch_pool.h> 00029 00030 00031 __STXXL_BEGIN_NAMESPACE 00032 00033 #ifndef STXXL_VERBOSE_QUEUE 00034 #define STXXL_VERBOSE_QUEUE STXXL_VERBOSE2 00035 #endif 00036 00037 //! \addtogroup stlcont 00038 //! \{ 00039 00040 00041 //! \brief External FIFO queue container 00042 00043 //! \tparam ValTp type of the contained objects (POD with no references to internal memory) 00044 //! \tparam BlkSz size of the external memory block in bytes, default is \c STXXL_DEFAULT_BLOCK_SIZE(ValTp) 00045 //! \tparam AllocStr parallel disk allocation strategy, default is \c STXXL_DEFAULT_ALLOC_STRATEGY 00046 //! \tparam SzTp size data type, default is \c stxxl::uint64 00047 template <class ValTp, 00048 unsigned BlkSz = STXXL_DEFAULT_BLOCK_SIZE(ValTp), 00049 class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY, 00050 class SzTp = stxxl::uint64> 00051 class queue : private noncopyable 00052 { 00053 public: 00054 typedef ValTp value_type; 00055 typedef AllocStr alloc_strategy_type; 00056 typedef SzTp size_type; 00057 enum { 00058 block_size = BlkSz 00059 }; 00060 00061 typedef typed_block<block_size, value_type> block_type; 00062 typedef BID<block_size> bid_type; 00063 00064 private: 00065 typedef read_write_pool<block_type> pool_type; 00066 00067 size_type size_; 00068 bool delete_pool; 00069 pool_type * pool; 00070 block_type * front_block; 00071 block_type * back_block; 00072 value_type * front_element; 00073 value_type * back_element; 00074 alloc_strategy_type alloc_strategy; 00075 unsigned_type alloc_count; 00076 std::deque<bid_type> bids; 00077 block_manager * bm; 00078 unsigned_type blocks2prefetch; 00079 00080 public: 00081 //! \brief Constructs empty queue with own write and prefetch block pool 00082 00083 //! \param D number of parallel disks, defaulting to the configured number of scratch disks, 00084 //! memory consumption will be 2 * D + 2 blocks 00085 //! (first and last block, D blocks as write cache, D block for prefetching) 00086 explicit queue(int D = -1) : 00087 size_(0), 00088 delete_pool(true), 00089 alloc_count(0), 00090 bm(block_manager::get_instance()) 00091 { 00092 if (D < 1) 00093 D = config::get_instance()->disks_number(); 00094 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(D)"); 00095 pool = new pool_type(D, D + 2); 00096 init(); 00097 } 00098 00099 //! \brief Constructs empty queue with own write and prefetch block pool 00100 00101 //! \param w_pool_size number of blocks in the write pool, must be at least 2, recommended at least 3 00102 //! \param p_pool_size number of blocks in the prefetch pool, recommended at least 1 00103 //! \param blocks2prefetch_ defines the number of blocks to prefetch (\c front side), 00104 //! default is number of block in the prefetch pool 00105 explicit queue(unsigned_type w_pool_size, unsigned_type p_pool_size, int blocks2prefetch_ = -1) : 00106 size_(0), 00107 delete_pool(true), 00108 alloc_count(0), 00109 bm(block_manager::get_instance()) 00110 { 00111 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(sizes)"); 00112 pool = new pool_type(p_pool_size, w_pool_size); 00113 init(blocks2prefetch_); 00114 } 00115 00116 //! \brief Constructs empty queue 00117 00118 //! \param w_pool write pool 00119 //! \param p_pool prefetch pool 00120 //! \param blocks2prefetch_ defines the number of blocks to prefetch (\c front side), 00121 //! default is number of blocks in the prefetch pool 00122 //! \warning Number of blocks in the write pool must be at least 2, recommended at least 3 00123 //! \warning Number of blocks in the prefetch pool recommended at least 1 00124 _STXXL_DEPRECATED( 00125 queue(write_pool<block_type> & w_pool, prefetch_pool<block_type> & p_pool, int blocks2prefetch_ = -1)) : 00126 size_(0), 00127 delete_pool(true), 00128 alloc_count(0), 00129 bm(block_manager::get_instance()) 00130 { 00131 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pools)"); 00132 pool = new pool_type(w_pool, p_pool); 00133 init(blocks2prefetch_); 00134 } 00135 00136 //! \brief Constructs empty queue 00137 00138 //! \param pool_ block write/prefetch pool 00139 //! \param blocks2prefetch_ defines the number of blocks to prefetch (\c front side), 00140 //! default is number of blocks in the prefetch pool 00141 //! \warning Number of blocks in the write pool must be at least 2, recommended at least 3 00142 //! \warning Number of blocks in the prefetch pool recommended at least 1 00143 queue(pool_type & pool_, int blocks2prefetch_ = -1) : 00144 size_(0), 00145 delete_pool(false), 00146 pool(&pool_), 00147 alloc_count(0), 00148 bm(block_manager::get_instance()) 00149 { 00150 STXXL_VERBOSE_QUEUE("queue[" << this << "]::queue(pool)"); 00151 init(blocks2prefetch_); 00152 } 00153 00154 void swap(queue& obj) 00155 { 00156 std::swap(size_, obj.size_); 00157 std::swap(delete_pool, obj.delete_pool); 00158 std::swap(pool, obj.pool); 00159 std::swap(front_block, obj.front_block); 00160 std::swap(back_block, obj.back_block); 00161 std::swap(front_element, obj.front_element); 00162 std::swap(back_element, obj.back_element); 00163 std::swap(alloc_strategy, obj.alloc_strategy); 00164 std::swap(alloc_count, obj.alloc_count); 00165 std::swap(bids, obj.bids); 00166 std::swap(bm, obj.bm); 00167 std::swap(blocks2prefetch, obj.blocks2prefetch); 00168 } 00169 00170 private: 00171 void init(int blocks2prefetch_ = -1) 00172 { 00173 if (pool->size_write() < 2) { 00174 STXXL_ERRMSG("queue: invalid configuration, not enough blocks (" << pool->size_write() << 00175 ") in write pool, at least 2 are needed, resizing to 3"); 00176 pool->resize_write(3); 00177 } 00178 00179 if (pool->size_write() < 3) { 00180 STXXL_MSG("queue: inefficient configuration, no blocks for buffered writing available"); 00181 } 00182 00183 if (pool->size_prefetch() < 1) { 00184 STXXL_MSG("queue: inefficient configuration, no blocks for prefetching available"); 00185 } 00186 00187 front_block = back_block = pool->steal(); 00188 back_element = back_block->begin() - 1; 00189 front_element = back_block->begin(); 00190 set_prefetch_aggr(blocks2prefetch_); 00191 } 00192 00193 public: 00194 //! \brief Defines the number of blocks to prefetch (\c front side) 00195 //! This method should be called whenever the prefetch pool is resized 00196 //! \param blocks2prefetch_ defines the number of blocks to prefetch (\c front side), 00197 //! a negative value means to use the number of blocks in the prefetch pool 00198 void set_prefetch_aggr(int_type blocks2prefetch_) 00199 { 00200 if (blocks2prefetch_ < 0) 00201 blocks2prefetch = pool->size_prefetch(); 00202 else 00203 blocks2prefetch = blocks2prefetch_; 00204 } 00205 00206 //! \brief Returns the number of blocks prefetched from the \c front side 00207 unsigned_type get_prefetch_aggr() const 00208 { 00209 return blocks2prefetch; 00210 } 00211 00212 //! \brief Adds an element in the queue 00213 void push(const value_type & val) 00214 { 00215 if (UNLIKELY(back_element == back_block->begin() + (block_type::size - 1))) 00216 { 00217 // back block is filled 00218 if (front_block == back_block) 00219 { // can not write the back block because it 00220 // is the same as the front block, must keep it memory 00221 STXXL_VERBOSE1("queue::push Case 1"); 00222 } 00223 else if (size() < 2 * block_type::size) 00224 { 00225 STXXL_VERBOSE1("queue::push Case 1.5"); 00226 // only two blocks with a gap in the beginning, move elements within memory 00227 assert(bids.empty()); 00228 size_t gap = front_element - front_block->begin(); 00229 assert(gap > 0); 00230 std::copy(front_element, front_block->end(), front_block->begin()); 00231 std::copy(back_block->begin(), back_block->begin() + gap, front_block->begin() + (block_type::size - gap)); 00232 std::copy(back_block->begin() + gap, back_block->end(), back_block->begin()); 00233 front_element -= gap; 00234 back_element -= gap; 00235 00236 ++back_element; 00237 *back_element = val; 00238 ++size_; 00239 return; 00240 } 00241 else 00242 { 00243 STXXL_VERBOSE1("queue::push Case 2"); 00244 // write the back block 00245 // need to allocate new block 00246 bid_type newbid; 00247 00248 bm->new_block(alloc_strategy, newbid, alloc_count++); 00249 00250 STXXL_VERBOSE_QUEUE("queue[" << this << "]: push block " << back_block << " @ " << FMT_BID(newbid)); 00251 bids.push_back(newbid); 00252 pool->write(back_block, newbid); 00253 if (bids.size() <= blocks2prefetch) { 00254 STXXL_VERBOSE1("queue::push Case Hints"); 00255 pool->hint(newbid); 00256 } 00257 } 00258 back_block = pool->steal(); 00259 00260 back_element = back_block->begin(); 00261 *back_element = val; 00262 ++size_; 00263 return; 00264 } 00265 ++back_element; 00266 *back_element = val; 00267 ++size_; 00268 } 00269 00270 //! \brief Removes element from the queue 00271 void pop() 00272 { 00273 assert(!empty()); 00274 00275 if (UNLIKELY(front_element == front_block->begin() + (block_type::size - 1))) 00276 { 00277 // if there is only one block, it implies ... 00278 if (back_block == front_block) 00279 { 00280 STXXL_VERBOSE1("queue::pop Case 3"); 00281 assert(size() == 1); 00282 assert(back_element == front_element); 00283 assert(bids.empty()); 00284 // reset everything 00285 back_element = back_block->begin() - 1; 00286 front_element = back_block->begin(); 00287 size_ = 0; 00288 return; 00289 } 00290 00291 --size_; 00292 if (size_ <= block_type::size) 00293 { 00294 STXXL_VERBOSE1("queue::pop Case 4"); 00295 assert(bids.empty()); 00296 // the back_block is the next block 00297 pool->add(front_block); 00298 front_block = back_block; 00299 front_element = back_block->begin(); 00300 return; 00301 } 00302 STXXL_VERBOSE1("queue::pop Case 5"); 00303 00304 assert(!bids.empty()); 00305 request_ptr req = pool->read(front_block, bids.front()); 00306 STXXL_VERBOSE_QUEUE("queue[" << this << "]: pop block " << front_block << " @ " << FMT_BID(bids.front())); 00307 00308 // give prefetching hints 00309 for (unsigned_type i = 0; i < blocks2prefetch && i < bids.size() - 1; ++i) 00310 { 00311 STXXL_VERBOSE1("queue::pop Case Hints"); 00312 pool->hint(bids[i + 1]); 00313 } 00314 00315 front_element = front_block->begin(); 00316 req->wait(); 00317 00318 bm->delete_block(bids.front()); 00319 bids.pop_front(); 00320 return; 00321 } 00322 00323 ++front_element; 00324 --size_; 00325 } 00326 00327 //! \brief Returns the size of the queue 00328 size_type size() const 00329 { 00330 return size_; 00331 } 00332 00333 //! \brief Returns \c true if queue is empty 00334 bool empty() const 00335 { 00336 return (size_ == 0); 00337 } 00338 00339 //! \brief Returns a mutable reference at the back of the queue 00340 value_type & back() 00341 { 00342 assert(!empty()); 00343 return *back_element; 00344 } 00345 00346 //! \brief Returns a const reference at the back of the queue 00347 const value_type & back() const 00348 { 00349 assert(!empty()); 00350 return *back_element; 00351 } 00352 00353 //! \brief Returns a mutable reference at the front of the queue 00354 value_type & front() 00355 { 00356 assert(!empty()); 00357 return *front_element; 00358 } 00359 00360 //! \brief Returns a const reference at the front of the queue 00361 const value_type & front() const 00362 { 00363 assert(!empty()); 00364 return *front_element; 00365 } 00366 00367 ~queue() 00368 { 00369 if (front_block != back_block) 00370 pool->add(back_block); 00371 pool->add(front_block); 00372 00373 if (delete_pool) 00374 { 00375 delete pool; 00376 } 00377 00378 if (!bids.empty()) 00379 bm->delete_blocks(bids.begin(), bids.end()); 00380 } 00381 }; 00382 00383 //! \} 00384 00385 __STXXL_END_NAMESPACE 00386 00387 #endif // !STXXL_QUEUE_HEADER 00388 // vim: et:ts=4:sw=4