Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/stable_ksort.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2003 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * 00009 * Distributed under the Boost Software License, Version 1.0. 00010 * (See accompanying file LICENSE_1_0.txt or copy at 00011 * http://www.boost.org/LICENSE_1_0.txt) 00012 **************************************************************************/ 00013 00014 #ifndef STXXL_STABLE_KSORT_HEADER 00015 #define STXXL_STABLE_KSORT_HEADER 00016 00017 // it is a first try: distribution sort without sampling 00018 // I rework the stable_ksort when I would have a time 00019 00020 00021 #include <stxxl/bits/mng/mng.h> 00022 #include <stxxl/bits/mng/buf_istream.h> 00023 #include <stxxl/bits/mng/buf_ostream.h> 00024 #include <stxxl/bits/common/simple_vector.h> 00025 #include <stxxl/bits/algo/intksort.h> 00026 #include <stxxl/bits/algo/sort_base.h> 00027 #include <stxxl/bits/common/utils.h> 00028 00029 00030 #ifndef STXXL_VERBOSE_STABLE_KSORT 00031 #define STXXL_VERBOSE_STABLE_KSORT STXXL_VERBOSE1 00032 #endif 00033 00034 00035 __STXXL_BEGIN_NAMESPACE 00036 00037 //! \addtogroup stlalgo 00038 //! \{ 00039 00040 /*! \internal 00041 */ 00042 namespace stable_ksort_local 00043 { 00044 template <class type_, class type_key> 00045 void classify_block(type_ * begin, type_ * end, type_key * & out, int_type * bucket, unsigned_type offset, unsigned shift) 00046 { 00047 for (type_ * p = begin; p < end; p++, out++) // count & create references 00048 { 00049 out->ptr = p; 00050 typename type_::key_type key = p->key(); 00051 int_type ibucket = (key - offset) >> shift; 00052 out->key = key; 00053 bucket[ibucket]++; 00054 } 00055 } 00056 00057 template <typename type> 00058 struct type_key 00059 { 00060 typedef typename type::key_type key_type; 00061 key_type key; 00062 type * ptr; 00063 00064 type_key() { } 00065 type_key(key_type k, type * p) : key(k), ptr(p) 00066 { } 00067 }; 00068 00069 template <typename type> 00070 bool operator < (const type_key<type> & a, const type_key<type> & b) 00071 { 00072 return a.key < b.key; 00073 } 00074 00075 template <typename type> 00076 bool operator > (const type_key<type> & a, const type_key<type> & b) 00077 { 00078 return a.key > b.key; 00079 } 00080 00081 00082 template <typename BIDType_, typename AllocStrategy_> 00083 class bid_sequence 00084 { 00085 public: 00086 typedef BIDType_ bid_type; 00087 typedef bid_type & reference; 00088 typedef AllocStrategy_ alloc_strategy; 00089 typedef typename simple_vector<bid_type>::size_type size_type; 00090 typedef typename simple_vector<bid_type>::iterator iterator; 00091 00092 protected: 00093 simple_vector<bid_type> * bids; 00094 alloc_strategy alloc_strategy_; 00095 00096 public: 00097 bid_sequence() { } 00098 bid_sequence(size_type size_) 00099 { 00100 bids = new simple_vector<bid_type>(size_); 00101 block_manager * mng = block_manager::get_instance(); 00102 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end()); 00103 } 00104 void init(size_type size_) 00105 { 00106 bids = new simple_vector<bid_type>(size_); 00107 block_manager * mng = block_manager::get_instance(); 00108 mng->new_blocks(alloc_strategy_, bids->begin(), bids->end()); 00109 } 00110 reference operator [] (size_type i) 00111 { 00112 size_type size_ = size(); // cache size in a register 00113 if (i < size_) 00114 return *(bids->begin() + i); 00115 00116 block_manager * mng = block_manager::get_instance(); 00117 simple_vector<bid_type> * larger_bids = new simple_vector<bid_type>((i + 1) * 2); 00118 std::copy(bids->begin(), bids->end(), larger_bids->begin()); 00119 mng->new_blocks(alloc_strategy_, larger_bids->begin() + size_, larger_bids->end()); 00120 delete bids; 00121 bids = larger_bids; 00122 return *(larger_bids->begin() + i); 00123 } 00124 size_type size() { return bids->size(); } 00125 iterator begin() { return bids->begin(); } 00126 ~bid_sequence() 00127 { 00128 block_manager::get_instance()->delete_blocks(bids->begin(), bids->end()); 00129 delete bids; 00130 } 00131 }; 00132 00133 template <typename ExtIterator_> 00134 void distribute( 00135 bid_sequence<typename ExtIterator_::vector_type::block_type::bid_type, 00136 typename ExtIterator_::vector_type::alloc_strategy_type> * bucket_bids, 00137 int64 * bucket_sizes, 00138 const int_type nbuckets, 00139 const int_type lognbuckets, 00140 ExtIterator_ first, 00141 ExtIterator_ last, 00142 const int_type nread_buffers, 00143 const int_type nwrite_buffers) 00144 { 00145 typedef typename ExtIterator_::vector_type::value_type value_type; 00146 typedef typename value_type::key_type key_type; 00147 typedef typename ExtIterator_::block_type block_type; 00148 typedef typename block_type::bid_type bid_type; 00149 typedef buf_istream<typename ExtIterator_::block_type, 00150 typename ExtIterator_::bids_container_iterator> buf_istream_type; 00151 00152 int_type i = 0; 00153 00154 buf_istream_type in(first.bid(), last.bid() + ((first.block_offset()) ? 1 : 0), 00155 nread_buffers); 00156 00157 buffered_writer<block_type> out( 00158 nbuckets + nwrite_buffers, 00159 nwrite_buffers); 00160 00161 unsigned_type * bucket_block_offsets = new unsigned_type[nbuckets]; 00162 unsigned_type * bucket_iblock = new unsigned_type[nbuckets]; 00163 block_type ** bucket_blocks = new block_type *[nbuckets]; 00164 00165 std::fill(bucket_sizes, bucket_sizes + nbuckets, 0); 00166 std::fill(bucket_iblock, bucket_iblock + nbuckets, 0); 00167 std::fill(bucket_block_offsets, bucket_block_offsets + nbuckets, 0); 00168 00169 for (i = 0; i < nbuckets; i++) 00170 bucket_blocks[i] = out.get_free_block(); 00171 00172 00173 ExtIterator_ cur = first - first.block_offset(); 00174 00175 // skip part of the block before first untouched 00176 for ( ; cur != first; cur++) 00177 ++in; 00178 00179 00180 const int_type shift = sizeof(key_type) * 8 - lognbuckets; 00181 // search in the the range [_begin,_end) 00182 STXXL_VERBOSE_STABLE_KSORT("Shift by: " << shift << " bits, lognbuckets: " << lognbuckets); 00183 for ( ; cur != last; cur++) 00184 { 00185 key_type cur_key = in.current().key(); 00186 int_type ibucket = cur_key >> shift; 00187 00188 int_type block_offset = bucket_block_offsets[ibucket]; 00189 in >> (bucket_blocks[ibucket]->elem[block_offset++]); 00190 if (block_offset == block_type::size) 00191 { 00192 block_offset = 0; 00193 int_type iblock = bucket_iblock[ibucket]++; 00194 bucket_blocks[ibucket] = out.write(bucket_blocks[ibucket], bucket_bids[ibucket][iblock]); 00195 } 00196 bucket_block_offsets[ibucket] = block_offset; 00197 } 00198 for (i = 0; i < nbuckets; i++) 00199 { 00200 if (bucket_block_offsets[i]) 00201 { 00202 out.write(bucket_blocks[i], bucket_bids[i][bucket_iblock[i]]); 00203 } 00204 bucket_sizes[i] = int64(block_type::size) * bucket_iblock[i] + 00205 bucket_block_offsets[i]; 00206 STXXL_VERBOSE_STABLE_KSORT("Bucket " << i << " has size " << bucket_sizes[i] << 00207 ", estimated size: " << ((last - first) / int64(nbuckets))); 00208 } 00209 00210 delete[] bucket_blocks; 00211 delete[] bucket_block_offsets; 00212 delete[] bucket_iblock; 00213 } 00214 } 00215 00216 //! \brief Sort records with integer keys 00217 //! \param first object of model of \c ext_random_access_iterator concept 00218 //! \param last object of model of \c ext_random_access_iterator concept 00219 //! \param M amount of memory for internal use (in bytes) 00220 //! \remark Elements must provide a method key() which returns the integer key. 00221 //! \remark Not yet fully implemented, it assumes that the keys are uniformly 00222 //! distributed between [0,(std::numeric_limits<key_type>::max)(). 00223 template <typename ExtIterator_> 00224 void stable_ksort(ExtIterator_ first, ExtIterator_ last, unsigned_type M) 00225 { 00226 STXXL_MSG("Warning: stable_ksort is not yet fully implemented, it assumes that the keys are uniformly distributed between [0,(std::numeric_limits<key_type>::max)()]"); 00227 typedef typename ExtIterator_::vector_type::value_type value_type; 00228 typedef typename value_type::key_type key_type; 00229 typedef typename ExtIterator_::block_type block_type; 00230 typedef typename block_type::bid_type bid_type; 00231 typedef typename ExtIterator_::vector_type::alloc_strategy_type alloc_strategy; 00232 typedef stable_ksort_local::bid_sequence<bid_type, alloc_strategy> bucket_bids_type; 00233 typedef stable_ksort_local::type_key<value_type> type_key_; 00234 00235 first.flush(); // flush container 00236 00237 double begin = timestamp(); 00238 00239 unsigned_type i = 0; 00240 config * cfg = config::get_instance(); 00241 const unsigned_type m = M / block_type::raw_size; 00242 assert(2 * block_type::raw_size <= M); 00243 const unsigned_type write_buffers_multiple = 2; 00244 const unsigned_type read_buffers_multiple = 2; 00245 const unsigned_type ndisks = cfg->disks_number(); 00246 const unsigned_type min_num_read_write_buffers = (write_buffers_multiple + read_buffers_multiple) * ndisks; 00247 const unsigned_type nmaxbuckets = m - min_num_read_write_buffers; 00248 const unsigned_type lognbuckets = log2_floor(nmaxbuckets); 00249 const unsigned_type nbuckets = 1 << lognbuckets; 00250 const unsigned_type est_bucket_size = div_ceil((last - first) / nbuckets, block_type::size); //in blocks 00251 00252 if (m < min_num_read_write_buffers + 2 || nbuckets < 2) { 00253 STXXL_ERRMSG("stxxl::stable_ksort: Not enough memory. Blocks available: " << m << 00254 ", required for r/w buffers: " << min_num_read_write_buffers << 00255 ", required for buckets: 2, nbuckets: " << nbuckets); 00256 throw bad_parameter("stxxl::stable_ksort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'"); 00257 } 00258 STXXL_VERBOSE_STABLE_KSORT("Elements to sort: " << (last - first)); 00259 STXXL_VERBOSE_STABLE_KSORT("Number of buckets has to be reduced from " << nmaxbuckets << " to " << nbuckets); 00260 const unsigned_type nread_buffers = (m - nbuckets) * read_buffers_multiple / (read_buffers_multiple + write_buffers_multiple); 00261 const unsigned_type nwrite_buffers = (m - nbuckets) * write_buffers_multiple / (read_buffers_multiple + write_buffers_multiple); 00262 00263 STXXL_VERBOSE_STABLE_KSORT("Read buffers in distribution phase: " << nread_buffers); 00264 STXXL_VERBOSE_STABLE_KSORT("Write buffers in distribution phase: " << nwrite_buffers); 00265 00266 bucket_bids_type * bucket_bids = new bucket_bids_type[nbuckets]; 00267 for (i = 0; i < nbuckets; ++i) 00268 bucket_bids[i].init(est_bucket_size); 00269 00270 int64 * bucket_sizes = new int64[nbuckets]; 00271 00272 disk_queues::get_instance()->set_priority_op(request_queue::WRITE); 00273 00274 stable_ksort_local::distribute( 00275 bucket_bids, 00276 bucket_sizes, 00277 nbuckets, 00278 lognbuckets, 00279 first, 00280 last, 00281 nread_buffers, 00282 nwrite_buffers); 00283 00284 double dist_end = timestamp(), end; 00285 double io_wait_after_d = stats::get_instance()->get_io_wait_time(); 00286 00287 { 00288 // sort buckets 00289 unsigned_type write_buffers_multiple_bs = 2; 00290 unsigned_type max_bucket_size_bl = (m - write_buffers_multiple_bs * ndisks) / 2; // in number of blocks 00291 int64 max_bucket_size_rec = int64(max_bucket_size_bl) * block_type::size; // in number of records 00292 int64 max_bucket_size_act = 0; // actual max bucket size 00293 // establish output stream 00294 00295 for (i = 0; i < nbuckets; i++) 00296 { 00297 max_bucket_size_act = STXXL_MAX(bucket_sizes[i], max_bucket_size_act); 00298 if (bucket_sizes[i] > max_bucket_size_rec) 00299 { 00300 STXXL_ERRMSG("Bucket " << i << " is too large: " << bucket_sizes[i] << 00301 " records, maximum: " << max_bucket_size_rec); 00302 STXXL_ERRMSG("Recursion on buckets is not yet implemented, aborting."); 00303 abort(); 00304 } 00305 } 00306 // here we can increase write_buffers_multiple_b knowing max(bucket_sizes[i]) 00307 // ... and decrease max_bucket_size_bl 00308 const int_type max_bucket_size_act_bl = div_ceil(max_bucket_size_act, block_type::size); 00309 STXXL_VERBOSE_STABLE_KSORT("Reducing required number of required blocks per bucket from " << 00310 max_bucket_size_bl << " to " << max_bucket_size_act_bl); 00311 max_bucket_size_rec = max_bucket_size_act; 00312 max_bucket_size_bl = max_bucket_size_act_bl; 00313 const unsigned_type nwrite_buffers_bs = m - 2 * max_bucket_size_bl; 00314 STXXL_VERBOSE_STABLE_KSORT("Write buffers in bucket sorting phase: " << nwrite_buffers_bs); 00315 00316 typedef buf_ostream<block_type, typename ExtIterator_::bids_container_iterator> buf_ostream_type; 00317 buf_ostream_type out(first.bid(), nwrite_buffers_bs); 00318 00319 disk_queues::get_instance()->set_priority_op(request_queue::READ); 00320 00321 if (first.block_offset()) 00322 { 00323 // has to skip part of the first block 00324 block_type * block = new block_type; 00325 request_ptr req; 00326 req = block->read(*first.bid()); 00327 req->wait(); 00328 00329 for (i = 0; i < first.block_offset(); i++) 00330 { 00331 out << block->elem[i]; 00332 } 00333 delete block; 00334 } 00335 block_type * blocks1 = new block_type[max_bucket_size_bl]; 00336 block_type * blocks2 = new block_type[max_bucket_size_bl]; 00337 request_ptr * reqs1 = new request_ptr[max_bucket_size_bl]; 00338 request_ptr * reqs2 = new request_ptr[max_bucket_size_bl]; 00339 type_key_ * refs1 = new type_key_[max_bucket_size_rec]; 00340 type_key_ * refs2 = new type_key_[max_bucket_size_rec]; 00341 00342 // submit reading first 2 buckets (Peter's scheme) 00343 unsigned_type nbucket_blocks = div_ceil(bucket_sizes[0], block_type::size); 00344 for (i = 0; i < nbucket_blocks; i++) 00345 reqs1[i] = blocks1[i].read(bucket_bids[0][i]); 00346 00347 00348 nbucket_blocks = div_ceil(bucket_sizes[1], block_type::size); 00349 for (i = 0; i < nbucket_blocks; i++) 00350 reqs2[i] = blocks2[i].read(bucket_bids[1][i]); 00351 00352 00353 key_type offset = 0; 00354 const unsigned log_k1 = STXXL_MAX<unsigned>(log2_ceil(max_bucket_size_rec * sizeof(type_key_) / STXXL_L2_SIZE), 1); 00355 unsigned_type k1 = 1 << log_k1; 00356 int_type * bucket1 = new int_type[k1]; 00357 00358 const unsigned shift = sizeof(key_type) * 8 - lognbuckets; 00359 const unsigned shift1 = shift - log_k1; 00360 00361 STXXL_VERBOSE_STABLE_KSORT("Classifying " << nbuckets << " buckets, max size:" << max_bucket_size_rec << 00362 " block size:" << block_type::size << " log_k1:" << log_k1); 00363 00364 for (unsigned_type k = 0; k < nbuckets; k++) 00365 { 00366 nbucket_blocks = div_ceil(bucket_sizes[k], block_type::size); 00367 const unsigned log_k1_k = STXXL_MAX<unsigned>(log2_ceil(bucket_sizes[k] * sizeof(type_key_) / STXXL_L2_SIZE), 1); 00368 assert(log_k1_k <= log_k1); 00369 k1 = 1 << log_k1_k; 00370 std::fill(bucket1, bucket1 + k1, 0); 00371 00372 STXXL_VERBOSE_STABLE_KSORT("Classifying bucket " << k << " size:" << bucket_sizes[k] << 00373 " blocks:" << nbucket_blocks << " log_k1:" << log_k1_k); 00374 // classify first nbucket_blocks-1 blocks, they are full 00375 type_key_ * ref_ptr = refs1; 00376 key_type offset1 = offset + (key_type(1) << key_type(shift)) * key_type(k); 00377 for (i = 0; i < nbucket_blocks - 1; i++) 00378 { 00379 reqs1[i]->wait(); 00380 stable_ksort_local::classify_block(blocks1[i].begin(), blocks1[i].end(), ref_ptr, bucket1, offset1, shift1 /*,k1*/); 00381 } 00382 // last block might be non-full 00383 const unsigned_type last_block_size = bucket_sizes[k] - int64(nbucket_blocks - 1) * block_type::size; 00384 reqs1[i]->wait(); 00385 00386 //STXXL_MSG("block_type::size: "<<block_type::size<<" last_block_size:"<<last_block_size); 00387 00388 classify_block(blocks1[i].begin(), blocks1[i].begin() + last_block_size, ref_ptr, bucket1, offset1, shift1); 00389 00390 exclusive_prefix_sum(bucket1, k1); 00391 classify(refs1, refs1 + bucket_sizes[k], refs2, bucket1, offset1, shift1); 00392 00393 type_key_ * c = refs2; 00394 type_key_ * d = refs1; 00395 for (i = 0; i < k1; i++) 00396 { 00397 type_key_ * cEnd = refs2 + bucket1[i]; 00398 type_key_ * dEnd = refs1 + bucket1[i]; 00399 00400 const unsigned log_k2 = log2_floor(bucket1[i]) - 1; // adaptive bucket size 00401 const unsigned_type k2 = 1 << log_k2; 00402 int_type * bucket2 = new int_type[k2]; 00403 const unsigned shift2 = shift1 - log_k2; 00404 00405 // STXXL_MSG("Sorting bucket "<<k<<":"<<i); 00406 l1sort(c, cEnd, d, bucket2, k2, 00407 offset1 + (key_type(1) << key_type(shift1)) * key_type(i), 00408 shift2); 00409 00410 // write out all 00411 for (type_key_ * p = d; p < dEnd; p++) 00412 out << (*(p->ptr)); 00413 00414 00415 delete[] bucket2; 00416 c = cEnd; 00417 d = dEnd; 00418 } 00419 // submit next read 00420 const unsigned_type bucket2submit = k + 2; 00421 if (bucket2submit < nbuckets) 00422 { 00423 nbucket_blocks = div_ceil(bucket_sizes[bucket2submit], block_type::size); 00424 for (i = 0; i < nbucket_blocks; i++) 00425 reqs1[i] = blocks1[i].read(bucket_bids[bucket2submit][i]); 00426 } 00427 00428 std::swap(blocks1, blocks2); 00429 std::swap(reqs1, reqs2); 00430 } 00431 00432 delete[] bucket1; 00433 delete[] refs1; 00434 delete[] refs2; 00435 delete[] blocks1; 00436 delete[] blocks2; 00437 delete[] reqs1; 00438 delete[] reqs2; 00439 delete[] bucket_bids; 00440 delete[] bucket_sizes; 00441 00442 if (last.block_offset()) 00443 { 00444 // has to skip part of the first block 00445 block_type * block = new block_type; 00446 request_ptr req = block->read(*last.bid()); 00447 req->wait(); 00448 00449 for (i = last.block_offset(); i < block_type::size; i++) 00450 { 00451 out << block->elem[i]; 00452 } 00453 delete block; 00454 } 00455 00456 end = timestamp(); 00457 } 00458 00459 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Distribution time: " << 00460 dist_end - begin << " s"); 00461 STXXL_VERBOSE("Time in I/O wait(ds): " << io_wait_after_d << " s"); 00462 STXXL_VERBOSE(*stats::get_instance()); 00463 STXXL_UNUSED(begin + dist_end + end + io_wait_after_d); 00464 } 00465 00466 //! \} 00467 00468 __STXXL_END_NAMESPACE 00469 00470 #endif // !STXXL_STABLE_KSORT_HEADER