Stxxl  1.4.0
include/stxxl/bits/mng/block_prefetcher.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/mng/block_prefetcher.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2009, 2010 Johannes Singler <singler@kit.edu>
00008  *  Copyright (C) 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
00016 #define STXXL_BLOCK_PREFETCHER_HEADER
00017 
00018 #include <vector>
00019 #include <queue>
00020 
00021 #include <stxxl/bits/common/switch.h>
00022 #include <stxxl/bits/io/request_ptr.h>
00023 #include <stxxl/bits/io/iostats.h>
00024 
00025 
00026 __STXXL_BEGIN_NAMESPACE
00027 
00028 //! \addtogroup schedlayer
00029 //! \{
00030 
00031 
00032 class set_switch_handler
00033 {
00034     onoff_switch & switch_;
00035     completion_handler on_compl;
00036 
00037 public:
00038     set_switch_handler(onoff_switch & switch__, const completion_handler & on_compl)
00039         : switch_(switch__), on_compl(on_compl)
00040     { }
00041 
00042     void operator () (request * req)
00043     {
00044         on_compl(req);  //call before setting switch to on, otherwise, user has no way to wait for the completion handler to be executed
00045         switch_.on();
00046     }
00047 };
00048 
00049 //! \brief Encapsulates asynchronous prefetching engine
00050 //!
00051 //! \c block_prefetcher overlaps I/Os with consumption of read data.
00052 //! Utilizes optimal asynchronous prefetch scheduling (by Peter Sanders et.al.)
00053 template <typename block_type, typename bid_iterator_type>
00054 class block_prefetcher
00055 {
00056     block_prefetcher() { }
00057     typedef typename block_type::bid_type bid_type;
00058 
00059 protected:
00060     bid_iterator_type consume_seq_begin;
00061     bid_iterator_type consume_seq_end;
00062     unsigned_type seq_length;
00063 
00064     int_type * prefetch_seq;
00065 
00066     unsigned_type nextread;
00067     unsigned_type nextconsume;
00068 
00069     const int_type nreadblocks;
00070 
00071     block_type * read_buffers;
00072     request_ptr * read_reqs;
00073     bid_type * read_bids;
00074 
00075     onoff_switch * completed;
00076     int_type * pref_buffer;
00077 
00078     completion_handler do_after_fetch;
00079 
00080     block_type * wait(int_type iblock)
00081     {
00082         STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
00083         {
00084             stats::scoped_wait_timer wait_timer(stats::WAIT_OP_READ);
00085 
00086             completed[iblock].wait_for_on();
00087         }
00088         STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
00089         int_type ibuffer = pref_buffer[iblock];
00090         STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
00091         assert(ibuffer >= 0 && ibuffer < nreadblocks);
00092         return (read_buffers + ibuffer);
00093     }
00094 
00095 public:
00096     //! \brief Constructs an object and immediately starts prefetching
00097     //! \param _cons_begin \c bid_iterator pointing to the \c bid of the first block to be consumed
00098     //! \param _cons_end \c bid_iterator pointing to the \c bid of the ( \b last + 1 ) block of consumption sequence
00099     //! \param _pref_seq gives the prefetch order, is a pointer to the integer array that contains
00100     //!        the indices of the blocks in the consumption sequence
00101     //! \param _prefetch_buf_size amount of prefetch buffers to use
00102     block_prefetcher(
00103         bid_iterator_type _cons_begin,
00104         bid_iterator_type _cons_end,
00105         int_type * _pref_seq,
00106         int_type _prefetch_buf_size,
00107         completion_handler do_after_fetch = default_completion_handler()
00108         ) :
00109         consume_seq_begin(_cons_begin),
00110         consume_seq_end(_cons_end),
00111         seq_length(_cons_end - _cons_begin),
00112         prefetch_seq(_pref_seq),
00113         nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
00114         nextconsume(0),
00115         nreadblocks(nextread),
00116         do_after_fetch(do_after_fetch)
00117     {
00118         STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
00119         STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
00120         assert(seq_length > 0);
00121         assert(_prefetch_buf_size > 0);
00122         int_type i;
00123         read_buffers = new block_type[nreadblocks];
00124         read_reqs = new request_ptr[nreadblocks];
00125         read_bids = new bid_type[nreadblocks];
00126         pref_buffer = new int_type[seq_length];
00127 
00128         std::fill(pref_buffer, pref_buffer + seq_length, -1);
00129 
00130         completed = new onoff_switch[seq_length];
00131 
00132         for (i = 0; i < nreadblocks; ++i)
00133         {
00134             assert(prefetch_seq[i] < int_type(seq_length));
00135             assert(prefetch_seq[i] >= 0);
00136             read_bids[i] = *(consume_seq_begin + prefetch_seq[i]);
00137             STXXL_VERBOSE1("block_prefetcher: reading block " << i <<
00138                            " prefetch_seq[" << i << "]=" << prefetch_seq[i] <<
00139                            " @ " << &read_buffers[i] <<
00140                            " @ " << read_bids[i]);
00141             read_reqs[i] = read_buffers[i].read(
00142                 read_bids[i],
00143                 set_switch_handler(*(completed + prefetch_seq[i]), do_after_fetch));
00144             pref_buffer[prefetch_seq[i]] = i;
00145         }
00146     }
00147     //! \brief Pulls next unconsumed block from the consumption sequence
00148     //! \return Pointer to the already prefetched block from the internal buffer pool
00149     block_type * pull_block()
00150     {
00151         STXXL_VERBOSE1("block_prefetcher: pulling a block");
00152         return wait(nextconsume++);
00153     }
00154     //! \brief Exchanges buffers between prefetcher and application
00155     //! \param buffer pointer to the consumed buffer. After call if return value is true \c buffer
00156     //!        contains valid pointer to the next unconsumed prefetched buffer.
00157     //! \remark parameter \c buffer must be value returned by \c pull_block() or \c block_consumed() methods
00158     //! \return \c false if there are no blocks to prefetch left, \c true if consumption sequence is not emptied
00159     bool block_consumed(block_type * & buffer)
00160     {
00161         int_type ibuffer = buffer - read_buffers;
00162         STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
00163         if (read_reqs[ibuffer].valid())
00164             read_reqs[ibuffer]->wait();
00165 
00166         read_reqs[ibuffer] = NULL;
00167 
00168         if (nextread < seq_length)
00169         {
00170             assert(ibuffer >= 0 && ibuffer < nreadblocks);
00171             int_type next_2_prefetch = prefetch_seq[nextread++];
00172             STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
00173 
00174             assert((next_2_prefetch < int_type(seq_length)) && (next_2_prefetch >= 0));
00175             assert(!completed[next_2_prefetch].is_on());
00176 
00177             pref_buffer[next_2_prefetch] = ibuffer;
00178             read_bids[ibuffer] = *(consume_seq_begin + next_2_prefetch);
00179             read_reqs[ibuffer] = read_buffers[ibuffer].read(
00180                 read_bids[ibuffer],
00181                 set_switch_handler(*(completed + next_2_prefetch), do_after_fetch)
00182                 );
00183         }
00184 
00185         if (nextconsume >= seq_length)
00186             return false;
00187 
00188 
00189         buffer = wait(nextconsume++);
00190 
00191         return true;
00192     }
00193 
00194     // no more consumable blocks available, but can't delete the prefetcher,
00195     // because not all blocks may have been returned, yet
00196     bool empty() const
00197     {
00198         return nextconsume >= seq_length;
00199     }
00200 
00201     // index of the next element in the consume sequence
00202     unsigned_type pos() const
00203     {
00204         return nextconsume;
00205     }
00206 
00207     //! \brief Frees used memory
00208     ~block_prefetcher()
00209     {
00210         for (int_type i = 0; i < nreadblocks; ++i)
00211             if (read_reqs[i].valid())
00212                 read_reqs[i]->wait();
00213 
00214 
00215         delete[] read_reqs;
00216         delete[] read_bids;
00217         delete[] completed;
00218         delete[] pref_buffer;
00219         delete[] read_buffers;
00220     }
00221 };
00222 
00223 //! \}
00224 
00225 __STXXL_END_NAMESPACE
00226 
00227 #endif // !STXXL_BLOCK_PREFETCHER_HEADER
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines