Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/mng/block_prefetcher.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2009, 2010 Johannes Singler <singler@kit.edu> 00008 * Copyright (C) 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #ifndef STXXL_BLOCK_PREFETCHER_HEADER 00016 #define STXXL_BLOCK_PREFETCHER_HEADER 00017 00018 #include <vector> 00019 #include <queue> 00020 00021 #include <stxxl/bits/common/switch.h> 00022 #include <stxxl/bits/io/request_ptr.h> 00023 #include <stxxl/bits/io/iostats.h> 00024 00025 00026 __STXXL_BEGIN_NAMESPACE 00027 00028 //! \addtogroup schedlayer 00029 //! \{ 00030 00031 00032 class set_switch_handler 00033 { 00034 onoff_switch & switch_; 00035 completion_handler on_compl; 00036 00037 public: 00038 set_switch_handler(onoff_switch & switch__, const completion_handler & on_compl) 00039 : switch_(switch__), on_compl(on_compl) 00040 { } 00041 00042 void operator () (request * req) 00043 { 00044 on_compl(req); //call before setting switch to on, otherwise, user has no way to wait for the completion handler to be executed 00045 switch_.on(); 00046 } 00047 }; 00048 00049 //! \brief Encapsulates asynchronous prefetching engine 00050 //! 00051 //! \c block_prefetcher overlaps I/Os with consumption of read data. 00052 //! Utilizes optimal asynchronous prefetch scheduling (by Peter Sanders et.al.) 00053 template <typename block_type, typename bid_iterator_type> 00054 class block_prefetcher 00055 { 00056 block_prefetcher() { } 00057 typedef typename block_type::bid_type bid_type; 00058 00059 protected: 00060 bid_iterator_type consume_seq_begin; 00061 bid_iterator_type consume_seq_end; 00062 unsigned_type seq_length; 00063 00064 int_type * prefetch_seq; 00065 00066 unsigned_type nextread; 00067 unsigned_type nextconsume; 00068 00069 const int_type nreadblocks; 00070 00071 block_type * read_buffers; 00072 request_ptr * read_reqs; 00073 bid_type * read_bids; 00074 00075 onoff_switch * completed; 00076 int_type * pref_buffer; 00077 00078 completion_handler do_after_fetch; 00079 00080 block_type * wait(int_type iblock) 00081 { 00082 STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock); 00083 { 00084 stats::scoped_wait_timer wait_timer(stats::WAIT_OP_READ); 00085 00086 completed[iblock].wait_for_on(); 00087 } 00088 STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock); 00089 int_type ibuffer = pref_buffer[iblock]; 00090 STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer); 00091 assert(ibuffer >= 0 && ibuffer < nreadblocks); 00092 return (read_buffers + ibuffer); 00093 } 00094 00095 public: 00096 //! \brief Constructs an object and immediately starts prefetching 00097 //! \param _cons_begin \c bid_iterator pointing to the \c bid of the first block to be consumed 00098 //! \param _cons_end \c bid_iterator pointing to the \c bid of the ( \b last + 1 ) block of consumption sequence 00099 //! \param _pref_seq gives the prefetch order, is a pointer to the integer array that contains 00100 //! the indices of the blocks in the consumption sequence 00101 //! \param _prefetch_buf_size amount of prefetch buffers to use 00102 block_prefetcher( 00103 bid_iterator_type _cons_begin, 00104 bid_iterator_type _cons_end, 00105 int_type * _pref_seq, 00106 int_type _prefetch_buf_size, 00107 completion_handler do_after_fetch = default_completion_handler() 00108 ) : 00109 consume_seq_begin(_cons_begin), 00110 consume_seq_end(_cons_end), 00111 seq_length(_cons_end - _cons_begin), 00112 prefetch_seq(_pref_seq), 00113 nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)), 00114 nextconsume(0), 00115 nreadblocks(nextread), 00116 do_after_fetch(do_after_fetch) 00117 { 00118 STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length); 00119 STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size); 00120 assert(seq_length > 0); 00121 assert(_prefetch_buf_size > 0); 00122 int_type i; 00123 read_buffers = new block_type[nreadblocks]; 00124 read_reqs = new request_ptr[nreadblocks]; 00125 read_bids = new bid_type[nreadblocks]; 00126 pref_buffer = new int_type[seq_length]; 00127 00128 std::fill(pref_buffer, pref_buffer + seq_length, -1); 00129 00130 completed = new onoff_switch[seq_length]; 00131 00132 for (i = 0; i < nreadblocks; ++i) 00133 { 00134 assert(prefetch_seq[i] < int_type(seq_length)); 00135 assert(prefetch_seq[i] >= 0); 00136 read_bids[i] = *(consume_seq_begin + prefetch_seq[i]); 00137 STXXL_VERBOSE1("block_prefetcher: reading block " << i << 00138 " prefetch_seq[" << i << "]=" << prefetch_seq[i] << 00139 " @ " << &read_buffers[i] << 00140 " @ " << read_bids[i]); 00141 read_reqs[i] = read_buffers[i].read( 00142 read_bids[i], 00143 set_switch_handler(*(completed + prefetch_seq[i]), do_after_fetch)); 00144 pref_buffer[prefetch_seq[i]] = i; 00145 } 00146 } 00147 //! \brief Pulls next unconsumed block from the consumption sequence 00148 //! \return Pointer to the already prefetched block from the internal buffer pool 00149 block_type * pull_block() 00150 { 00151 STXXL_VERBOSE1("block_prefetcher: pulling a block"); 00152 return wait(nextconsume++); 00153 } 00154 //! \brief Exchanges buffers between prefetcher and application 00155 //! \param buffer pointer to the consumed buffer. After call if return value is true \c buffer 00156 //! contains valid pointer to the next unconsumed prefetched buffer. 00157 //! \remark parameter \c buffer must be value returned by \c pull_block() or \c block_consumed() methods 00158 //! \return \c false if there are no blocks to prefetch left, \c true if consumption sequence is not emptied 00159 bool block_consumed(block_type * & buffer) 00160 { 00161 int_type ibuffer = buffer - read_buffers; 00162 STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed"); 00163 if (read_reqs[ibuffer].valid()) 00164 read_reqs[ibuffer]->wait(); 00165 00166 read_reqs[ibuffer] = NULL; 00167 00168 if (nextread < seq_length) 00169 { 00170 assert(ibuffer >= 0 && ibuffer < nreadblocks); 00171 int_type next_2_prefetch = prefetch_seq[nextread++]; 00172 STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch); 00173 00174 assert((next_2_prefetch < int_type(seq_length)) && (next_2_prefetch >= 0)); 00175 assert(!completed[next_2_prefetch].is_on()); 00176 00177 pref_buffer[next_2_prefetch] = ibuffer; 00178 read_bids[ibuffer] = *(consume_seq_begin + next_2_prefetch); 00179 read_reqs[ibuffer] = read_buffers[ibuffer].read( 00180 read_bids[ibuffer], 00181 set_switch_handler(*(completed + next_2_prefetch), do_after_fetch) 00182 ); 00183 } 00184 00185 if (nextconsume >= seq_length) 00186 return false; 00187 00188 00189 buffer = wait(nextconsume++); 00190 00191 return true; 00192 } 00193 00194 // no more consumable blocks available, but can't delete the prefetcher, 00195 // because not all blocks may have been returned, yet 00196 bool empty() const 00197 { 00198 return nextconsume >= seq_length; 00199 } 00200 00201 // index of the next element in the consume sequence 00202 unsigned_type pos() const 00203 { 00204 return nextconsume; 00205 } 00206 00207 //! \brief Frees used memory 00208 ~block_prefetcher() 00209 { 00210 for (int_type i = 0; i < nreadblocks; ++i) 00211 if (read_reqs[i].valid()) 00212 read_reqs[i]->wait(); 00213 00214 00215 delete[] read_reqs; 00216 delete[] read_bids; 00217 delete[] completed; 00218 delete[] pref_buffer; 00219 delete[] read_buffers; 00220 } 00221 }; 00222 00223 //! \} 00224 00225 __STXXL_END_NAMESPACE 00226 00227 #endif // !STXXL_BLOCK_PREFETCHER_HEADER