Stxxl  1.4.0
include/stxxl/bits/mng/buf_writer.h
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  include/stxxl/bits/mng/buf_writer.h
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *
00008  *  Distributed under the Boost Software License, Version 1.0.
00009  *  (See accompanying file LICENSE_1_0.txt or copy at
00010  *  http://www.boost.org/LICENSE_1_0.txt)
00011  **************************************************************************/
00012 
00013 #ifndef STXXL_BUFFERED_WRITER_HEADER
00014 #define STXXL_BUFFERED_WRITER_HEADER
00015 
00016 #include <vector>
00017 #include <queue>
00018 
00019 #include <stxxl/bits/io/request_operations.h>
00020 #include <stxxl/bits/io/disk_queues.h>
00021 
00022 
00023 __STXXL_BEGIN_NAMESPACE
00024 
00025 //! \weakgroup schedlayer Block scheduling sublayer
00026 //! \ingroup mnglayer
00027 //! Group of classes which help in scheduling
00028 //! sequences of read and write requests
00029 //! via prefetching and buffered writing
00030 //! \{
00031 
00032 
00033 //! \brief Encapsulates asynchronous buffered block writing engine
00034 //!
00035 //! \c buffered_writer overlaps I/Os with filling of output buffer.
00036 template <typename block_type>
00037 class buffered_writer
00038 {
00039     buffered_writer() { }
00040 
00041 protected:
00042     typedef typename block_type::bid_type bid_type;
00043 
00044     const unsigned_type nwriteblocks;
00045     block_type * write_buffers;
00046     bid_type * write_bids;
00047     request_ptr * write_reqs;
00048     const unsigned_type writebatchsize;
00049 
00050     std::vector<int_type> free_write_blocks;            // contains free write blocks
00051     std::vector<int_type> busy_write_blocks;            // blocks that are in writing, notice that if block is not in free_
00052     // an not in busy then block is not yet filled
00053 
00054     struct batch_entry
00055     {
00056         stxxl::int64 offset;
00057         int_type ibuffer;
00058         batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { }
00059     };
00060     struct batch_entry_cmp
00061     {
00062         bool operator () (const batch_entry & a, const batch_entry & b) const
00063         {
00064             return (a.offset > b.offset);
00065         }
00066     };
00067 
00068     typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type;
00069     batch_type batch_write_blocks;      // sorted sequence of blocks to write
00070 
00071 public:
00072     //! \brief Constructs an object
00073     //! \param write_buf_size number of write buffers to use
00074     //! \param write_batch_size number of blocks to accumulate in
00075     //!        order to flush write requests (bulk buffered writing)
00076     buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) :
00077         nwriteblocks((write_buf_size > 2) ? write_buf_size : 2),
00078         writebatchsize(write_batch_size ? write_batch_size : 1)
00079     {
00080         write_buffers = new block_type[nwriteblocks];
00081         write_reqs = new request_ptr[nwriteblocks];
00082         write_bids = new bid_type[nwriteblocks];
00083 
00084         for (unsigned_type i = 0; i < nwriteblocks; i++)
00085             free_write_blocks.push_back(i);
00086 
00087         disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00088     }
00089     //! \brief Returns free block from the internal buffer pool
00090     //! \return pointer to the block from the internal buffer pool
00091     block_type * get_free_block()
00092     {
00093         int_type ibuffer;
00094         for (std::vector<int_type>::iterator it = busy_write_blocks.begin();
00095              it != busy_write_blocks.end(); ++it)
00096         {
00097             if (write_reqs[ibuffer = (*it)]->poll())
00098             {
00099                 busy_write_blocks.erase(it);
00100                 free_write_blocks.push_back(ibuffer);
00101 
00102                 break;
00103             }
00104         }
00105         if (UNLIKELY(free_write_blocks.empty()))
00106         {
00107             int_type size = busy_write_blocks.size();
00108             request_ptr * reqs = new request_ptr[size];
00109             int_type i = 0;
00110             for ( ; i < size; ++i)
00111             {
00112                 reqs[i] = write_reqs[busy_write_blocks[i]];
00113             }
00114             int_type completed = wait_any(reqs, size);
00115             int_type completed_global = busy_write_blocks[completed];
00116             delete[] reqs;
00117             busy_write_blocks.erase(busy_write_blocks.begin() + completed);
00118 
00119             return (write_buffers + completed_global);
00120         }
00121         ibuffer = free_write_blocks.back();
00122         free_write_blocks.pop_back();
00123 
00124         return (write_buffers + ibuffer);
00125     }
00126     //! \brief Submits block for writing
00127     //! \param filled_block pointer to the block
00128     //! \remark parameter \c filled_block must be value returned by \c get_free_block() or \c write() methods
00129     //! \param bid block identifier, a place to write data of the \c filled_block
00130     //! \return pointer to the new free block from the pool
00131     block_type * write(block_type * filled_block, const bid_type & bid)        // writes filled_block and returns a new block
00132     {
00133         if (batch_write_blocks.size() >= writebatchsize)
00134         {
00135             // flush batch
00136             while (!batch_write_blocks.empty())
00137             {
00138                 int_type ibuffer = batch_write_blocks.top().ibuffer;
00139                 batch_write_blocks.pop();
00140 
00141                 if (write_reqs[ibuffer].valid())
00142                     write_reqs[ibuffer]->wait();
00143 
00144                 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00145 
00146                 busy_write_blocks.push_back(ibuffer);
00147             }
00148         }
00149         //    STXXL_MSG("Adding write request to batch");
00150 
00151         int_type ibuffer = filled_block - write_buffers;
00152         write_bids[ibuffer] = bid;
00153         batch_write_blocks.push(batch_entry(bid.offset, ibuffer));
00154 
00155         return get_free_block();
00156     }
00157     //! \brief Flushes not yet written buffers
00158     void flush()
00159     {
00160         int_type ibuffer;
00161         while (!batch_write_blocks.empty())
00162         {
00163             ibuffer = batch_write_blocks.top().ibuffer;
00164             batch_write_blocks.pop();
00165 
00166             if (write_reqs[ibuffer].valid())
00167                 write_reqs[ibuffer]->wait();
00168 
00169             write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00170 
00171             busy_write_blocks.push_back(ibuffer);
00172         }
00173         for (std::vector<int_type>::const_iterator it =
00174                  busy_write_blocks.begin();
00175              it != busy_write_blocks.end(); it++)
00176         {
00177             ibuffer = *it;
00178             write_reqs[ibuffer]->wait();
00179         }
00180 
00181         assert(batch_write_blocks.empty());
00182         free_write_blocks.clear();
00183         busy_write_blocks.clear();
00184 
00185         for (unsigned_type i = 0; i < nwriteblocks; i++)
00186             free_write_blocks.push_back(i);
00187     }
00188 
00189     //! \brief Flushes not yet written buffers and frees used memory
00190     virtual ~buffered_writer()
00191     {
00192         int_type ibuffer;
00193         while (!batch_write_blocks.empty())
00194         {
00195             ibuffer = batch_write_blocks.top().ibuffer;
00196             batch_write_blocks.pop();
00197 
00198             if (write_reqs[ibuffer].valid())
00199                 write_reqs[ibuffer]->wait();
00200 
00201             write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]);
00202 
00203             busy_write_blocks.push_back(ibuffer);
00204         }
00205         for (std::vector<int_type>::const_iterator it =
00206                  busy_write_blocks.begin();
00207              it != busy_write_blocks.end(); it++)
00208         {
00209             ibuffer = *it;
00210             write_reqs[ibuffer]->wait();
00211         }
00212 
00213         delete[] write_reqs;
00214         delete[] write_buffers;
00215         delete[] write_bids;
00216     }
00217 };
00218 
00219 //! \}
00220 
00221 __STXXL_END_NAMESPACE
00222 
00223 #endif // !STXXL_BUFFERED_WRITER_HEADER
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines