Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/mng/buf_writer.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * 00008 * Distributed under the Boost Software License, Version 1.0. 00009 * (See accompanying file LICENSE_1_0.txt or copy at 00010 * http://www.boost.org/LICENSE_1_0.txt) 00011 **************************************************************************/ 00012 00013 #ifndef STXXL_BUFFERED_WRITER_HEADER 00014 #define STXXL_BUFFERED_WRITER_HEADER 00015 00016 #include <vector> 00017 #include <queue> 00018 00019 #include <stxxl/bits/io/request_operations.h> 00020 #include <stxxl/bits/io/disk_queues.h> 00021 00022 00023 __STXXL_BEGIN_NAMESPACE 00024 00025 //! \weakgroup schedlayer Block scheduling sublayer 00026 //! \ingroup mnglayer 00027 //! Group of classes which help in scheduling 00028 //! sequences of read and write requests 00029 //! via prefetching and buffered writing 00030 //! \{ 00031 00032 00033 //! \brief Encapsulates asynchronous buffered block writing engine 00034 //! 00035 //! \c buffered_writer overlaps I/Os with filling of output buffer. 00036 template <typename block_type> 00037 class buffered_writer 00038 { 00039 buffered_writer() { } 00040 00041 protected: 00042 typedef typename block_type::bid_type bid_type; 00043 00044 const unsigned_type nwriteblocks; 00045 block_type * write_buffers; 00046 bid_type * write_bids; 00047 request_ptr * write_reqs; 00048 const unsigned_type writebatchsize; 00049 00050 std::vector<int_type> free_write_blocks; // contains free write blocks 00051 std::vector<int_type> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_ 00052 // an not in busy then block is not yet filled 00053 00054 struct batch_entry 00055 { 00056 stxxl::int64 offset; 00057 int_type ibuffer; 00058 batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { } 00059 }; 00060 struct batch_entry_cmp 00061 { 00062 bool operator () (const batch_entry & a, const batch_entry & b) const 00063 { 00064 return (a.offset > b.offset); 00065 } 00066 }; 00067 00068 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type; 00069 batch_type batch_write_blocks; // sorted sequence of blocks to write 00070 00071 public: 00072 //! \brief Constructs an object 00073 //! \param write_buf_size number of write buffers to use 00074 //! \param write_batch_size number of blocks to accumulate in 00075 //! order to flush write requests (bulk buffered writing) 00076 buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) : 00077 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2), 00078 writebatchsize(write_batch_size ? write_batch_size : 1) 00079 { 00080 write_buffers = new block_type[nwriteblocks]; 00081 write_reqs = new request_ptr[nwriteblocks]; 00082 write_bids = new bid_type[nwriteblocks]; 00083 00084 for (unsigned_type i = 0; i < nwriteblocks; i++) 00085 free_write_blocks.push_back(i); 00086 00087 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00088 } 00089 //! \brief Returns free block from the internal buffer pool 00090 //! \return pointer to the block from the internal buffer pool 00091 block_type * get_free_block() 00092 { 00093 int_type ibuffer; 00094 for (std::vector<int_type>::iterator it = busy_write_blocks.begin(); 00095 it != busy_write_blocks.end(); ++it) 00096 { 00097 if (write_reqs[ibuffer = (*it)]->poll()) 00098 { 00099 busy_write_blocks.erase(it); 00100 free_write_blocks.push_back(ibuffer); 00101 00102 break; 00103 } 00104 } 00105 if (UNLIKELY(free_write_blocks.empty())) 00106 { 00107 int_type size = busy_write_blocks.size(); 00108 request_ptr * reqs = new request_ptr[size]; 00109 int_type i = 0; 00110 for ( ; i < size; ++i) 00111 { 00112 reqs[i] = write_reqs[busy_write_blocks[i]]; 00113 } 00114 int_type completed = wait_any(reqs, size); 00115 int_type completed_global = busy_write_blocks[completed]; 00116 delete[] reqs; 00117 busy_write_blocks.erase(busy_write_blocks.begin() + completed); 00118 00119 return (write_buffers + completed_global); 00120 } 00121 ibuffer = free_write_blocks.back(); 00122 free_write_blocks.pop_back(); 00123 00124 return (write_buffers + ibuffer); 00125 } 00126 //! \brief Submits block for writing 00127 //! \param filled_block pointer to the block 00128 //! \remark parameter \c filled_block must be value returned by \c get_free_block() or \c write() methods 00129 //! \param bid block identifier, a place to write data of the \c filled_block 00130 //! \return pointer to the new free block from the pool 00131 block_type * write(block_type * filled_block, const bid_type & bid) // writes filled_block and returns a new block 00132 { 00133 if (batch_write_blocks.size() >= writebatchsize) 00134 { 00135 // flush batch 00136 while (!batch_write_blocks.empty()) 00137 { 00138 int_type ibuffer = batch_write_blocks.top().ibuffer; 00139 batch_write_blocks.pop(); 00140 00141 if (write_reqs[ibuffer].valid()) 00142 write_reqs[ibuffer]->wait(); 00143 00144 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]); 00145 00146 busy_write_blocks.push_back(ibuffer); 00147 } 00148 } 00149 // STXXL_MSG("Adding write request to batch"); 00150 00151 int_type ibuffer = filled_block - write_buffers; 00152 write_bids[ibuffer] = bid; 00153 batch_write_blocks.push(batch_entry(bid.offset, ibuffer)); 00154 00155 return get_free_block(); 00156 } 00157 //! \brief Flushes not yet written buffers 00158 void flush() 00159 { 00160 int_type ibuffer; 00161 while (!batch_write_blocks.empty()) 00162 { 00163 ibuffer = batch_write_blocks.top().ibuffer; 00164 batch_write_blocks.pop(); 00165 00166 if (write_reqs[ibuffer].valid()) 00167 write_reqs[ibuffer]->wait(); 00168 00169 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]); 00170 00171 busy_write_blocks.push_back(ibuffer); 00172 } 00173 for (std::vector<int_type>::const_iterator it = 00174 busy_write_blocks.begin(); 00175 it != busy_write_blocks.end(); it++) 00176 { 00177 ibuffer = *it; 00178 write_reqs[ibuffer]->wait(); 00179 } 00180 00181 assert(batch_write_blocks.empty()); 00182 free_write_blocks.clear(); 00183 busy_write_blocks.clear(); 00184 00185 for (unsigned_type i = 0; i < nwriteblocks; i++) 00186 free_write_blocks.push_back(i); 00187 } 00188 00189 //! \brief Flushes not yet written buffers and frees used memory 00190 virtual ~buffered_writer() 00191 { 00192 int_type ibuffer; 00193 while (!batch_write_blocks.empty()) 00194 { 00195 ibuffer = batch_write_blocks.top().ibuffer; 00196 batch_write_blocks.pop(); 00197 00198 if (write_reqs[ibuffer].valid()) 00199 write_reqs[ibuffer]->wait(); 00200 00201 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]); 00202 00203 busy_write_blocks.push_back(ibuffer); 00204 } 00205 for (std::vector<int_type>::const_iterator it = 00206 busy_write_blocks.begin(); 00207 it != busy_write_blocks.end(); it++) 00208 { 00209 ibuffer = *it; 00210 write_reqs[ibuffer]->wait(); 00211 } 00212 00213 delete[] write_reqs; 00214 delete[] write_buffers; 00215 delete[] write_bids; 00216 } 00217 }; 00218 00219 //! \} 00220 00221 __STXXL_END_NAMESPACE 00222 00223 #endif // !STXXL_BUFFERED_WRITER_HEADER