Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * io/wbtl_file.cpp 00003 * 00004 * a write-buffered-translation-layer pseudo file 00005 * 00006 * Part of the STXXL. See http://stxxl.sourceforge.net 00007 * 00008 * Copyright (C) 2008-2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * Copyright (C) 2009 Johannes Singler <singler@ira.uka.de> 00010 * 00011 * Distributed under the Boost Software License, Version 1.0. 00012 * (See accompanying file LICENSE_1_0.txt or copy at 00013 * http://www.boost.org/LICENSE_1_0.txt) 00014 **************************************************************************/ 00015 00016 #include <stxxl/bits/io/wbtl_file.h> 00017 00018 #if STXXL_HAVE_WBTL_FILE 00019 00020 #include <algorithm> 00021 #include <iomanip> 00022 #include <stxxl/bits/io/io.h> 00023 #include <stxxl/bits/parallel.h> 00024 #include <stxxl/aligned_alloc> 00025 00026 #ifndef STXXL_VERBOSE_WBTL 00027 #define STXXL_VERBOSE_WBTL STXXL_VERBOSE2 00028 #endif 00029 00030 00031 __STXXL_BEGIN_NAMESPACE 00032 00033 00034 wbtl_file::wbtl_file( 00035 file * backend_file, 00036 size_type write_buffer_size, 00037 int write_buffers, 00038 int queue_id, int allocator_id) : 00039 disk_queued_file(queue_id, allocator_id), storage(backend_file), sz(0), write_block_size(write_buffer_size), 00040 free_bytes(0), curbuf(1), curpos(write_block_size) 00041 { 00042 assert(write_buffers == 2); // currently hardcoded 00043 write_buffer[0] = static_cast<char *>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size)); 00044 write_buffer[1] = static_cast<char *>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size)); 00045 buffer_address[0] = offset_type(-1); 00046 buffer_address[1] = offset_type(-1); 00047 } 00048 00049 wbtl_file::~wbtl_file() 00050 { 00051 stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[1]); 00052 stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[0]); 00053 delete storage; 00054 storage = 0; 00055 } 00056 00057 void wbtl_file::serve(const request * req) throw (io_error) 00058 { 00059 assert(req->get_file() == this); 00060 offset_type offset = req->get_offset(); 00061 void * buffer = req->get_buffer(); 00062 size_type bytes = req->get_size(); 00063 request::request_type type = req->get_type(); 00064 00065 if (type == request::READ) 00066 { 00067 //stats::scoped_read_timer read_timer(size()); 00068 sread(buffer, offset, bytes); 00069 } 00070 else 00071 { 00072 //stats::scoped_write_timer write_timer(size()); 00073 swrite(buffer, offset, bytes); 00074 } 00075 } 00076 00077 void wbtl_file::lock() 00078 { 00079 storage->lock(); 00080 } 00081 00082 wbtl_file::offset_type wbtl_file::size() 00083 { 00084 return sz; 00085 } 00086 00087 void wbtl_file::set_size(offset_type newsize) 00088 { 00089 scoped_mutex_lock mapping_lock(mapping_mutex); 00090 assert(sz <= newsize); // may not shrink 00091 if (sz < newsize) { 00092 _add_free_region(sz, newsize - sz); 00093 storage->set_size(newsize); 00094 sz = newsize; 00095 assert(sz == storage->size()); 00096 } 00097 } 00098 00099 #define FMT_A_S(_addr_, _size_) "0x" << std::hex << std::setfill('0') << std::setw(8) << (_addr_) << "/0x" << std::setw(8) << (_size_) 00100 #define FMT_A_C(_addr_, _size_) "0x" << std::setw(8) << (_addr_) << "(" << std::dec << (_size_) << ")" 00101 #define FMT_A(_addr_) "0x" << std::setw(8) << (_addr_) 00102 00103 // logical address 00104 void wbtl_file::discard(offset_type offset, offset_type size) 00105 { 00106 scoped_mutex_lock mapping_lock(mapping_mutex); 00107 sortseq::iterator physical = address_mapping.find(offset); 00108 STXXL_VERBOSE_WBTL("wbtl:discard l" << FMT_A_S(offset, size) << " @ p" << FMT_A(physical != address_mapping.end() ? physical->second : 0xffffffff)); 00109 if (physical == address_mapping.end()) { 00110 // could be OK if the block was never written ... 00111 //STXXL_ERRMSG("discard: mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???"); 00112 } else { 00113 offset_type physical_offset = physical->second; 00114 address_mapping.erase(physical); 00115 _add_free_region(physical_offset, size); 00116 place_map::iterator reverse = reverse_mapping.find(physical_offset); 00117 if (reverse == reverse_mapping.end()) { 00118 STXXL_ERRMSG("discard: reverse mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???"); 00119 } else { 00120 assert(offset == (reverse->second).first); 00121 reverse_mapping.erase(reverse); 00122 } 00123 storage->discard(physical_offset, size); 00124 } 00125 } 00126 00127 // physical address 00128 void wbtl_file::_add_free_region(offset_type offset, offset_type size) 00129 { 00130 // mapping_lock has to be aquired by caller 00131 STXXL_VERBOSE_WBTL("wbtl:addfre p" << FMT_A_S(offset, size) << " F <= f" << FMT_A_C(free_bytes, free_space.size())); 00132 offset_type region_pos = offset; 00133 offset_type region_size = size; 00134 if (!free_space.empty()) 00135 { 00136 sortseq::iterator succ = free_space.upper_bound(region_pos); 00137 sortseq::iterator pred = succ; 00138 pred--; 00139 check_corruption(region_pos, region_size, pred, succ); 00140 if (succ == free_space.end()) 00141 { 00142 if (pred == free_space.end()) 00143 { 00144 //dump(); 00145 assert(pred != free_space.end()); 00146 } 00147 if ((*pred).first + (*pred).second == region_pos) 00148 { 00149 // coalesce with predecessor 00150 region_size += (*pred).second; 00151 region_pos = (*pred).first; 00152 free_space.erase(pred); 00153 } 00154 } 00155 else 00156 { 00157 if (free_space.size() > 1) 00158 { 00159 bool succ_is_not_the_first = (succ != free_space.begin()); 00160 if ((*succ).first == region_pos + region_size) 00161 { 00162 // coalesce with successor 00163 region_size += (*succ).second; 00164 free_space.erase(succ); 00165 } 00166 if (succ_is_not_the_first) 00167 { 00168 if (pred == free_space.end()) 00169 { 00170 //dump(); 00171 assert(pred != free_space.end()); 00172 } 00173 if ((*pred).first + (*pred).second == region_pos) 00174 { 00175 // coalesce with predecessor 00176 region_size += (*pred).second; 00177 region_pos = (*pred).first; 00178 free_space.erase(pred); 00179 } 00180 } 00181 } 00182 else 00183 { 00184 if ((*succ).first == region_pos + region_size) 00185 { 00186 // coalesce with successor 00187 region_size += (*succ).second; 00188 free_space.erase(succ); 00189 } 00190 } 00191 } 00192 } 00193 00194 free_space[region_pos] = region_size; 00195 free_bytes += size; 00196 STXXL_VERBOSE_WBTL("wbtl:free p" << FMT_A_S(region_pos, region_size) << " F => f" << FMT_A_C(free_bytes, free_space.size())); 00197 } 00198 00199 void wbtl_file::sread(void * buffer, offset_type offset, size_type bytes) 00200 { 00201 scoped_mutex_lock buffer_lock(buffer_mutex); 00202 int cached = -1; 00203 offset_type physical_offset; 00204 // map logical to physical address 00205 { 00206 scoped_mutex_lock mapping_lock(mapping_mutex); 00207 sortseq::iterator physical = address_mapping.find(offset); 00208 if (physical == address_mapping.end()) { 00209 STXXL_ERRMSG("wbtl_read: mapping not found: " << FMT_A_S(offset, bytes) << " ==> " << "???"); 00210 //STXXL_THROW2(io_error, "wbtl_read of unmapped memory"); 00211 physical_offset = 0xffffffff; 00212 } else { 00213 physical_offset = physical->second; 00214 } 00215 } 00216 00217 if (buffer_address[curbuf] <= physical_offset && 00218 physical_offset < buffer_address[curbuf] + write_block_size) 00219 { 00220 // block is in current write buffer 00221 assert(physical_offset + bytes <= buffer_address[curbuf] + write_block_size); 00222 memcpy(buffer, write_buffer[curbuf] + (physical_offset - buffer_address[curbuf]), bytes); 00223 stats::get_instance()->read_cached(bytes); 00224 cached = curbuf; 00225 } 00226 else if (buffer_address[1 - curbuf] <= physical_offset && 00227 physical_offset < buffer_address[1 - curbuf] + write_block_size) 00228 { 00229 // block is in previous write buffer 00230 assert(physical_offset + bytes <= buffer_address[1 - curbuf] + write_block_size); 00231 memcpy(buffer, write_buffer[1 - curbuf] + (physical_offset - buffer_address[1 - curbuf]), bytes); 00232 stats::get_instance()->read_cached(bytes); 00233 cached = curbuf; 00234 } 00235 else if (physical_offset == 0xffffffff) { 00236 // block was deleted or never written before 00237 char * uninitialized = (char *)malloc(sizeof(char)); 00238 memset(buffer, *uninitialized, bytes); 00239 free(uninitialized); 00240 } 00241 else 00242 { 00243 // block is not cached 00244 request_ptr req = storage->aread(buffer, physical_offset, bytes, default_completion_handler()); 00245 req->wait(false); 00246 } 00247 STXXL_VERBOSE_WBTL("wbtl:sread l" << FMT_A_S(offset, bytes) << " @ p" << FMT_A(physical_offset) << " " << std::dec << cached); 00248 STXXL_UNUSED(cached); 00249 } 00250 00251 void wbtl_file::swrite(void * buffer, offset_type offset, size_type bytes) 00252 { 00253 scoped_mutex_lock buffer_lock(buffer_mutex); 00254 // is the block already mapped? 00255 { 00256 scoped_mutex_lock mapping_lock(mapping_mutex); 00257 sortseq::iterator physical = address_mapping.find(offset); 00258 STXXL_VERBOSE_WBTL("wbtl:swrite l" << FMT_A_S(offset, bytes) << " @ <= p" << 00259 FMT_A_C(physical != address_mapping.end() ? physical->second : 0xffffffff, address_mapping.size())); 00260 if (physical != address_mapping.end()) { 00261 mapping_lock.unlock(); 00262 // FIXME: special case if we can replace it in the current writing block 00263 discard(offset, bytes); 00264 } 00265 } 00266 00267 if (bytes > write_block_size - curpos) 00268 { 00269 // not enough space in the current write buffer 00270 00271 if (buffer_address[curbuf] != offset_type(-1)) { 00272 STXXL_VERBOSE_WBTL("wbtl:w2disk p" << FMT_A_S(buffer_address[curbuf], write_block_size)); 00273 00274 // mark remaining part as free 00275 if (curpos < write_block_size) 00276 _add_free_region(buffer_address[curbuf] + curpos, write_block_size - curpos); 00277 00278 if (backend_request.get()) { 00279 backend_request->wait(false); 00280 } 00281 00282 backend_request = storage->awrite(write_buffer[curbuf], buffer_address[curbuf], write_block_size, default_completion_handler()); 00283 } 00284 00285 curbuf = 1 - curbuf; 00286 00287 buffer_address[curbuf] = get_next_write_block(); 00288 curpos = 0; 00289 } 00290 assert(bytes <= write_block_size - curpos); 00291 00292 // write block into buffer 00293 memcpy(write_buffer[curbuf] + curpos, buffer, bytes); 00294 stats::get_instance()->write_cached(bytes); 00295 00296 scoped_mutex_lock mapping_lock(mapping_mutex); 00297 address_mapping[offset] = buffer_address[curbuf] + curpos; 00298 reverse_mapping[buffer_address[curbuf] + curpos] = place(offset, bytes); 00299 STXXL_VERBOSE_WBTL("wbtl:swrite l" << FMT_A_S(offset, bytes) << " @ => p" << FMT_A_C(buffer_address[curbuf] + curpos, address_mapping.size())); 00300 curpos += bytes; 00301 } 00302 00303 wbtl_file::offset_type wbtl_file::get_next_write_block() 00304 { 00305 // mapping_lock has to be aquired by caller 00306 sortseq::iterator space = 00307 std::find_if(free_space.begin(), free_space.end(), 00308 bind2nd(FirstFit(), write_block_size) _STXXL_FORCE_SEQUENTIAL); 00309 00310 if (space != free_space.end()) 00311 { 00312 offset_type region_pos = (*space).first; 00313 offset_type region_size = (*space).second; 00314 free_space.erase(space); 00315 if (region_size > write_block_size) 00316 free_space[region_pos + write_block_size] = region_size - write_block_size; 00317 00318 free_bytes -= write_block_size; 00319 00320 STXXL_VERBOSE_WBTL("wbtl:nextwb p" << FMT_A_S(region_pos, write_block_size) << " F f" << FMT_A_C(free_bytes, free_space.size())); 00321 return region_pos; 00322 } 00323 00324 STXXL_THROW2(io_error, "OutOfSpace, probably fragmented"); 00325 00326 return offset_type(-1); 00327 } 00328 00329 void wbtl_file::check_corruption(offset_type region_pos, offset_type region_size, 00330 sortseq::iterator pred, sortseq::iterator succ) 00331 { 00332 if (pred != free_space.end()) 00333 { 00334 if (pred->first <= region_pos && pred->first + pred->second > region_pos) 00335 { 00336 STXXL_THROW(bad_ext_alloc, "wbtl_file::check_corruption", "Error: double deallocation of external memory " << 00337 "System info: P " << pred->first << " " << pred->second << " " << region_pos); 00338 } 00339 } 00340 if (succ != free_space.end()) 00341 { 00342 if (region_pos <= succ->first && region_pos + region_size > succ->first) 00343 { 00344 STXXL_THROW(bad_ext_alloc, "wbtl_file::check_corruption", "Error: double deallocation of external memory " 00345 << "System info: S " << region_pos << " " << region_size << " " << succ->first); 00346 } 00347 } 00348 } 00349 00350 const char * wbtl_file::io_type() const 00351 { 00352 return "wbtl"; 00353 } 00354 00355 __STXXL_END_NAMESPACE 00356 00357 #endif // #if STXXL_HAVE_WBTL_FILE 00358 // vim: et:ts=4:sw=4