Stxxl  1.4.0
io/wbtl_file.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  io/wbtl_file.cpp
00003  *
00004  *  a write-buffered-translation-layer pseudo file
00005  *
00006  *  Part of the STXXL. See http://stxxl.sourceforge.net
00007  *
00008  *  Copyright (C) 2008-2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00009  *  Copyright (C) 2009 Johannes Singler <singler@ira.uka.de>
00010  *
00011  *  Distributed under the Boost Software License, Version 1.0.
00012  *  (See accompanying file LICENSE_1_0.txt or copy at
00013  *  http://www.boost.org/LICENSE_1_0.txt)
00014  **************************************************************************/
00015 
00016 #include <stxxl/bits/io/wbtl_file.h>
00017 
00018 #if STXXL_HAVE_WBTL_FILE
00019 
00020 #include <algorithm>
00021 #include <iomanip>
00022 #include <stxxl/bits/io/io.h>
00023 #include <stxxl/bits/parallel.h>
00024 #include <stxxl/aligned_alloc>
00025 
00026 #ifndef STXXL_VERBOSE_WBTL
00027 #define STXXL_VERBOSE_WBTL STXXL_VERBOSE2
00028 #endif
00029 
00030 
00031 __STXXL_BEGIN_NAMESPACE
00032 
00033 
00034 wbtl_file::wbtl_file(
00035     file * backend_file,
00036     size_type write_buffer_size,
00037     int write_buffers,
00038     int queue_id, int allocator_id) :
00039     disk_queued_file(queue_id, allocator_id), storage(backend_file), sz(0), write_block_size(write_buffer_size),
00040     free_bytes(0), curbuf(1), curpos(write_block_size)
00041 {
00042     assert(write_buffers == 2); // currently hardcoded
00043     write_buffer[0] = static_cast<char *>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size));
00044     write_buffer[1] = static_cast<char *>(stxxl::aligned_alloc<BLOCK_ALIGN>(write_block_size));
00045     buffer_address[0] = offset_type(-1);
00046     buffer_address[1] = offset_type(-1);
00047 }
00048 
00049 wbtl_file::~wbtl_file()
00050 {
00051     stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[1]);
00052     stxxl::aligned_dealloc<BLOCK_ALIGN>(write_buffer[0]);
00053     delete storage;
00054     storage = 0;
00055 }
00056 
00057 void wbtl_file::serve(const request * req) throw (io_error)
00058 {
00059     assert(req->get_file() == this);
00060     offset_type offset = req->get_offset();
00061     void * buffer = req->get_buffer();
00062     size_type bytes = req->get_size();
00063     request::request_type type = req->get_type();
00064 
00065     if (type == request::READ)
00066     {
00067         //stats::scoped_read_timer read_timer(size());
00068         sread(buffer, offset, bytes);
00069     }
00070     else
00071     {
00072         //stats::scoped_write_timer write_timer(size());
00073         swrite(buffer, offset, bytes);
00074     }
00075 }
00076 
00077 void wbtl_file::lock()
00078 {
00079     storage->lock();
00080 }
00081 
00082 wbtl_file::offset_type wbtl_file::size()
00083 {
00084     return sz;
00085 }
00086 
00087 void wbtl_file::set_size(offset_type newsize)
00088 {
00089     scoped_mutex_lock mapping_lock(mapping_mutex);
00090     assert(sz <= newsize); // may not shrink
00091     if (sz < newsize) {
00092         _add_free_region(sz, newsize - sz);
00093         storage->set_size(newsize);
00094         sz = newsize;
00095         assert(sz == storage->size());
00096     }
00097 }
00098 
00099 #define FMT_A_S(_addr_, _size_) "0x" << std::hex << std::setfill('0') << std::setw(8) << (_addr_) << "/0x" << std::setw(8) << (_size_)
00100 #define FMT_A_C(_addr_, _size_) "0x" << std::setw(8) << (_addr_) << "(" << std::dec << (_size_) << ")"
00101 #define FMT_A(_addr_) "0x" << std::setw(8) << (_addr_)
00102 
00103 // logical address
00104 void wbtl_file::discard(offset_type offset, offset_type size)
00105 {
00106     scoped_mutex_lock mapping_lock(mapping_mutex);
00107     sortseq::iterator physical = address_mapping.find(offset);
00108     STXXL_VERBOSE_WBTL("wbtl:discard l" << FMT_A_S(offset, size) << " @    p" << FMT_A(physical != address_mapping.end() ? physical->second : 0xffffffff));
00109     if (physical == address_mapping.end()) {
00110         // could be OK if the block was never written ...
00111         //STXXL_ERRMSG("discard: mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???");
00112     } else {
00113         offset_type physical_offset = physical->second;
00114         address_mapping.erase(physical);
00115         _add_free_region(physical_offset, size);
00116         place_map::iterator reverse = reverse_mapping.find(physical_offset);
00117         if (reverse == reverse_mapping.end()) {
00118             STXXL_ERRMSG("discard: reverse mapping not found: " << FMT_A_S(offset, size) << " ==> " << "???");
00119         } else {
00120             assert(offset == (reverse->second).first);
00121             reverse_mapping.erase(reverse);
00122         }
00123         storage->discard(physical_offset, size);
00124     }
00125 }
00126 
00127 // physical address
00128 void wbtl_file::_add_free_region(offset_type offset, offset_type size)
00129 {
00130     // mapping_lock has to be aquired by caller
00131     STXXL_VERBOSE_WBTL("wbtl:addfre  p" << FMT_A_S(offset, size) << " F <= f" << FMT_A_C(free_bytes, free_space.size()));
00132     offset_type region_pos = offset;
00133     offset_type region_size = size;
00134     if (!free_space.empty())
00135     {
00136         sortseq::iterator succ = free_space.upper_bound(region_pos);
00137         sortseq::iterator pred = succ;
00138         pred--;
00139         check_corruption(region_pos, region_size, pred, succ);
00140         if (succ == free_space.end())
00141         {
00142             if (pred == free_space.end())
00143             {
00144                 //dump();
00145                 assert(pred != free_space.end());
00146             }
00147             if ((*pred).first + (*pred).second == region_pos)
00148             {
00149                 // coalesce with predecessor
00150                 region_size += (*pred).second;
00151                 region_pos = (*pred).first;
00152                 free_space.erase(pred);
00153             }
00154         }
00155         else
00156         {
00157             if (free_space.size() > 1)
00158             {
00159                 bool succ_is_not_the_first = (succ != free_space.begin());
00160                 if ((*succ).first == region_pos + region_size)
00161                 {
00162                     // coalesce with successor
00163                     region_size += (*succ).second;
00164                     free_space.erase(succ);
00165                 }
00166                 if (succ_is_not_the_first)
00167                 {
00168                     if (pred == free_space.end())
00169                     {
00170                         //dump();
00171                         assert(pred != free_space.end());
00172                     }
00173                     if ((*pred).first + (*pred).second == region_pos)
00174                     {
00175                         // coalesce with predecessor
00176                         region_size += (*pred).second;
00177                         region_pos = (*pred).first;
00178                         free_space.erase(pred);
00179                     }
00180                 }
00181             }
00182             else
00183             {
00184                 if ((*succ).first == region_pos + region_size)
00185                 {
00186                     // coalesce with successor
00187                     region_size += (*succ).second;
00188                     free_space.erase(succ);
00189                 }
00190             }
00191         }
00192     }
00193 
00194     free_space[region_pos] = region_size;
00195     free_bytes += size;
00196     STXXL_VERBOSE_WBTL("wbtl:free    p" << FMT_A_S(region_pos, region_size) << " F => f" << FMT_A_C(free_bytes, free_space.size()));
00197 }
00198 
00199 void wbtl_file::sread(void * buffer, offset_type offset, size_type bytes)
00200 {
00201     scoped_mutex_lock buffer_lock(buffer_mutex);
00202     int cached = -1;
00203     offset_type physical_offset;
00204     // map logical to physical address
00205     {
00206         scoped_mutex_lock mapping_lock(mapping_mutex);
00207         sortseq::iterator physical = address_mapping.find(offset);
00208         if (physical == address_mapping.end()) {
00209             STXXL_ERRMSG("wbtl_read: mapping not found: " << FMT_A_S(offset, bytes) << " ==> " << "???");
00210             //STXXL_THROW2(io_error, "wbtl_read of unmapped memory");
00211             physical_offset = 0xffffffff;
00212         } else {
00213             physical_offset = physical->second;
00214         }
00215     }
00216 
00217     if (buffer_address[curbuf] <= physical_offset &&
00218         physical_offset < buffer_address[curbuf] + write_block_size)
00219     {
00220         // block is in current write buffer
00221         assert(physical_offset + bytes <= buffer_address[curbuf] + write_block_size);
00222         memcpy(buffer, write_buffer[curbuf] + (physical_offset - buffer_address[curbuf]), bytes);
00223         stats::get_instance()->read_cached(bytes);
00224         cached = curbuf;
00225     }
00226     else if (buffer_address[1 - curbuf] <= physical_offset &&
00227              physical_offset < buffer_address[1 - curbuf] + write_block_size)
00228     {
00229         // block is in previous write buffer
00230         assert(physical_offset + bytes <= buffer_address[1 - curbuf] + write_block_size);
00231         memcpy(buffer, write_buffer[1 - curbuf] + (physical_offset - buffer_address[1 - curbuf]), bytes);
00232         stats::get_instance()->read_cached(bytes);
00233         cached = curbuf;
00234     }
00235     else if (physical_offset == 0xffffffff) {
00236         // block was deleted or never written before
00237         char * uninitialized = (char *)malloc(sizeof(char));
00238         memset(buffer, *uninitialized, bytes);
00239         free(uninitialized);
00240     }
00241     else
00242     {
00243         // block is not cached
00244         request_ptr req = storage->aread(buffer, physical_offset, bytes, default_completion_handler());
00245         req->wait(false);
00246     }
00247     STXXL_VERBOSE_WBTL("wbtl:sread   l" << FMT_A_S(offset, bytes) << " @    p" << FMT_A(physical_offset) << " " << std::dec << cached);
00248     STXXL_UNUSED(cached);
00249 }
00250 
00251 void wbtl_file::swrite(void * buffer, offset_type offset, size_type bytes)
00252 {
00253     scoped_mutex_lock buffer_lock(buffer_mutex);
00254     // is the block already mapped?
00255     {
00256         scoped_mutex_lock mapping_lock(mapping_mutex);
00257         sortseq::iterator physical = address_mapping.find(offset);
00258         STXXL_VERBOSE_WBTL("wbtl:swrite  l" << FMT_A_S(offset, bytes) << " @ <= p" <<
00259                            FMT_A_C(physical != address_mapping.end() ? physical->second : 0xffffffff, address_mapping.size()));
00260         if (physical != address_mapping.end()) {
00261             mapping_lock.unlock();
00262             // FIXME: special case if we can replace it in the current writing block
00263             discard(offset, bytes);
00264         }
00265     }
00266 
00267     if (bytes > write_block_size - curpos)
00268     {
00269         // not enough space in the current write buffer
00270 
00271         if (buffer_address[curbuf] != offset_type(-1)) {
00272             STXXL_VERBOSE_WBTL("wbtl:w2disk  p" << FMT_A_S(buffer_address[curbuf], write_block_size));
00273 
00274             // mark remaining part as free
00275             if (curpos < write_block_size)
00276                 _add_free_region(buffer_address[curbuf] + curpos, write_block_size - curpos);
00277 
00278             if (backend_request.get()) {
00279                 backend_request->wait(false);
00280             }
00281 
00282             backend_request = storage->awrite(write_buffer[curbuf], buffer_address[curbuf], write_block_size, default_completion_handler());
00283         }
00284 
00285         curbuf = 1 - curbuf;
00286 
00287         buffer_address[curbuf] = get_next_write_block();
00288         curpos = 0;
00289     }
00290     assert(bytes <= write_block_size - curpos);
00291 
00292     // write block into buffer
00293     memcpy(write_buffer[curbuf] + curpos, buffer, bytes);
00294     stats::get_instance()->write_cached(bytes);
00295 
00296     scoped_mutex_lock mapping_lock(mapping_mutex);
00297     address_mapping[offset] = buffer_address[curbuf] + curpos;
00298     reverse_mapping[buffer_address[curbuf] + curpos] = place(offset, bytes);
00299     STXXL_VERBOSE_WBTL("wbtl:swrite  l" << FMT_A_S(offset, bytes) << " @ => p" << FMT_A_C(buffer_address[curbuf] + curpos, address_mapping.size()));
00300     curpos += bytes;
00301 }
00302 
00303 wbtl_file::offset_type wbtl_file::get_next_write_block()
00304 {
00305     // mapping_lock has to be aquired by caller
00306     sortseq::iterator space =
00307         std::find_if(free_space.begin(), free_space.end(),
00308                      bind2nd(FirstFit(), write_block_size) _STXXL_FORCE_SEQUENTIAL);
00309 
00310     if (space != free_space.end())
00311     {
00312         offset_type region_pos = (*space).first;
00313         offset_type region_size = (*space).second;
00314         free_space.erase(space);
00315         if (region_size > write_block_size)
00316             free_space[region_pos + write_block_size] = region_size - write_block_size;
00317 
00318         free_bytes -= write_block_size;
00319 
00320         STXXL_VERBOSE_WBTL("wbtl:nextwb  p" << FMT_A_S(region_pos, write_block_size) << " F    f" << FMT_A_C(free_bytes, free_space.size()));
00321         return region_pos;
00322     }
00323 
00324     STXXL_THROW2(io_error, "OutOfSpace, probably fragmented");
00325 
00326     return offset_type(-1);
00327 }
00328 
00329 void wbtl_file::check_corruption(offset_type region_pos, offset_type region_size,
00330                                  sortseq::iterator pred, sortseq::iterator succ)
00331 {
00332     if (pred != free_space.end())
00333     {
00334         if (pred->first <= region_pos && pred->first + pred->second > region_pos)
00335         {
00336             STXXL_THROW(bad_ext_alloc, "wbtl_file::check_corruption", "Error: double deallocation of external memory " <<
00337                         "System info: P " << pred->first << " " << pred->second << " " << region_pos);
00338         }
00339     }
00340     if (succ != free_space.end())
00341     {
00342         if (region_pos <= succ->first && region_pos + region_size > succ->first)
00343         {
00344             STXXL_THROW(bad_ext_alloc, "wbtl_file::check_corruption", "Error: double deallocation of external memory "
00345                         << "System info: S " << region_pos << " " << region_size << " " << succ->first);
00346         }
00347     }
00348 }
00349 
00350 const char * wbtl_file::io_type() const
00351 {
00352     return "wbtl";
00353 }
00354 
00355 __STXXL_END_NAMESPACE
00356 
00357 #endif  // #if STXXL_HAVE_WBTL_FILE
00358 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines