Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * algo/async_schedule.cpp 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002, 2009 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2009, 2010 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * 00009 * Distributed under the Boost Software License, Version 1.0. 00010 * (See accompanying file LICENSE_1_0.txt or copy at 00011 * http://www.boost.org/LICENSE_1_0.txt) 00012 **************************************************************************/ 00013 00014 // Implements the "prudent prefetching" as described in 00015 // D. Hutchinson, P. Sanders, J. S. Vitter: Duality between prefetching 00016 // and queued writing on parallel disks, 2005 00017 // DOI: 10.1137/S0097539703431573 00018 00019 00020 #include <stxxl/bits/algo/async_schedule.h> 00021 #include <stxxl/bits/io/file.h> 00022 #include <stxxl/bits/verbose.h> 00023 #include <stxxl/bits/parallel.h> 00024 00025 #include <algorithm> 00026 #include <functional> 00027 #include <queue> 00028 #include <cassert> 00029 00030 00031 __STXXL_BEGIN_NAMESPACE 00032 00033 namespace async_schedule_local 00034 { 00035 struct sim_event // only one type of event: WRITE COMPLETED 00036 { 00037 int_type timestamp; 00038 int_type iblock; 00039 inline sim_event(int_type t, int_type b) : timestamp(t), iblock(b) { } 00040 }; 00041 00042 struct sim_event_cmp : public std::binary_function<sim_event, sim_event, bool> 00043 { 00044 inline bool operator () (const sim_event & a, const sim_event & b) const 00045 { 00046 return a.timestamp > b.timestamp; 00047 } 00048 }; 00049 00050 typedef std::pair<int_type, int_type> write_time_pair; 00051 struct write_time_cmp : public std::binary_function<write_time_pair, write_time_pair, bool> 00052 { 00053 inline bool operator () (const write_time_pair & a, const write_time_pair & b) const 00054 { 00055 return a.second > b.second; 00056 } 00057 }; 00058 00059 00060 static inline int_type get_disk(int_type i, const int_type * disks, int_type D) 00061 { 00062 int_type disk = disks[i]; 00063 if (disk == file::NO_ALLOCATOR) 00064 disk = D; // remap to sentinel 00065 assert(0 <= disk && disk <= D); 00066 return disk; 00067 } 00068 00069 int_type simulate_async_write( 00070 const int_type * disks, 00071 const int_type L, 00072 const int_type m_init, 00073 const int_type D, 00074 std::pair<int_type, int_type> * o_time) 00075 { 00076 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type; 00077 typedef std::queue<int_type> disk_queue_type; 00078 assert(L >= D); 00079 disk_queue_type * disk_queues = new disk_queue_type[D + 1]; // + sentinel for remapping NO_ALLOCATOR 00080 event_queue_type event_queue; 00081 00082 int_type m = m_init; 00083 int_type i = L - 1; 00084 int_type oldtime = 0; 00085 bool * disk_busy = new bool[D + 1]; 00086 00087 while (m && (i >= 0)) 00088 { 00089 int_type disk = get_disk(i, disks, D); 00090 disk_queues[disk].push(i); 00091 i--; 00092 m--; 00093 } 00094 00095 for (int_type ii = 0; ii <= D; ii++) 00096 if (!disk_queues[ii].empty()) 00097 { 00098 int_type j = disk_queues[ii].front(); 00099 disk_queues[ii].pop(); 00100 event_queue.push(sim_event(1, j)); 00101 //STXXL_MSG("Block "<<j<<" scheduled"); 00102 } 00103 00104 while (!event_queue.empty()) 00105 { 00106 sim_event cur = event_queue.top(); 00107 event_queue.pop(); 00108 if (oldtime != cur.timestamp) 00109 { 00110 // clear disk_busy 00111 for (int_type i = 0; i <= D; i++) 00112 disk_busy[i] = false; 00113 00114 oldtime = cur.timestamp; 00115 } 00116 00117 00118 STXXL_VERBOSE1("Block " << cur.iblock << " put out, time " << cur.timestamp << " disk: " << disks[cur.iblock]); 00119 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp); 00120 00121 if (i >= 0) 00122 { 00123 int_type disk = get_disk(i, disks, D); 00124 if (disk_busy[disk]) 00125 { 00126 disk_queues[disk].push(i--); 00127 } 00128 else 00129 { 00130 if (!disk_queues[disk].empty()) 00131 { 00132 STXXL_VERBOSE1("c Block " << disk_queues[disk].front() << " scheduled for time " << cur.timestamp + 1); 00133 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front())); 00134 disk_queues[disk].pop(); 00135 } 00136 else 00137 { 00138 STXXL_VERBOSE1("a Block " << i << " scheduled for time " << cur.timestamp + 1); 00139 event_queue.push(sim_event(cur.timestamp + 1, i--)); 00140 } 00141 disk_busy[disk] = true; 00142 } 00143 } 00144 00145 // add next block to write 00146 int_type disk = get_disk(cur.iblock, disks, D); 00147 if (!disk_busy[disk] && !disk_queues[disk].empty()) 00148 { 00149 STXXL_VERBOSE1("b Block " << disk_queues[disk].front() << " scheduled for time " << cur.timestamp + 1); 00150 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front())); 00151 disk_queues[disk].pop(); 00152 disk_busy[disk] = true; 00153 } 00154 } 00155 00156 assert(i == -1); 00157 for (int_type i = 0; i <= D; i++) 00158 assert(disk_queues[i].empty()); 00159 00160 00161 delete[] disk_busy; 00162 delete[] disk_queues; 00163 00164 return (oldtime - 1); 00165 } 00166 } // namespace async_schedule_local 00167 00168 00169 void compute_prefetch_schedule( 00170 const int_type * first, 00171 const int_type * last, 00172 int_type * out_first, 00173 int_type m, 00174 int_type D) 00175 { 00176 typedef std::pair<int_type, int_type> pair_type; 00177 int_type L = last - first; 00178 if (L <= D) 00179 { 00180 for (int_type i = 0; i < L; ++i) 00181 out_first[i] = i; 00182 00183 return; 00184 } 00185 pair_type * write_order = new pair_type[L]; 00186 00187 int_type w_steps = async_schedule_local::simulate_async_write(first, L, m, D, write_order); 00188 00189 STXXL_VERBOSE1("Write steps: " << w_steps); 00190 00191 for (int_type i = 0; i < L; i++) 00192 STXXL_VERBOSE1(first[i] << " " << write_order[i].first << " " << write_order[i].second); 00193 00194 std::stable_sort(write_order, write_order + L, async_schedule_local::write_time_cmp() _STXXL_FORCE_SEQUENTIAL); 00195 00196 for (int_type i = 0; i < L; i++) 00197 { 00198 out_first[i] = write_order[i].first; 00199 //if(out_first[i] != i) 00200 STXXL_VERBOSE1(i << " " << out_first[i]); 00201 } 00202 00203 delete[] write_order; 00204 STXXL_UNUSED(w_steps); 00205 } 00206 00207 __STXXL_END_NAMESPACE 00208 00209 // vim: et:ts=4:sw=4