Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * io/request_queue_impl_qwqr.cpp 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * Copyright (C) 2009 Johannes Singler <singler@ira.uka.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #include <algorithm> 00016 00017 #include <stxxl/bits/io/request_queue_impl_qwqr.h> 00018 #include <stxxl/bits/io/request_with_state.h> 00019 #include <stxxl/bits/parallel.h> 00020 00021 00022 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 00023 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1 00024 #endif 00025 00026 __STXXL_BEGIN_NAMESPACE 00027 00028 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool> 00029 { 00030 bool operator () ( 00031 const request_ptr & a, 00032 const request_ptr & b) const 00033 { 00034 // matching file and offset are enough to cause problems 00035 return (a->get_offset() == b->get_offset()) && 00036 (a->get_file() == b->get_file()); 00037 } 00038 }; 00039 00040 request_queue_impl_qwqr::request_queue_impl_qwqr(int n) : _thread_state(NOT_RUNNING), sem(0) 00041 { 00042 STXXL_UNUSED(n); 00043 start_thread(worker, static_cast<void *>(this), thread, _thread_state); 00044 } 00045 00046 void request_queue_impl_qwqr::add_request(request_ptr & req) 00047 { 00048 if (req.empty()) 00049 STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue."); 00050 if (_thread_state() != RUNNING) 00051 STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue."); 00052 00053 if (req.get()->get_type() == request::READ) 00054 { 00055 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 00056 { 00057 scoped_mutex_lock Lock(write_mutex); 00058 if (std::find_if(write_queue.begin(), write_queue.end(), 00059 bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL) 00060 != write_queue.end()) 00061 { 00062 STXXL_ERRMSG("READ request submitted for a BID with a pending WRITE request"); 00063 } 00064 } 00065 #endif 00066 scoped_mutex_lock Lock(read_mutex); 00067 read_queue.push_back(req); 00068 } 00069 else 00070 { 00071 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 00072 { 00073 scoped_mutex_lock Lock(read_mutex); 00074 if (std::find_if(read_queue.begin(), read_queue.end(), 00075 bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL) 00076 != read_queue.end()) 00077 { 00078 STXXL_ERRMSG("WRITE request submitted for a BID with a pending READ request"); 00079 } 00080 } 00081 #endif 00082 scoped_mutex_lock Lock(write_mutex); 00083 write_queue.push_back(req); 00084 } 00085 00086 sem++; 00087 } 00088 00089 bool request_queue_impl_qwqr::cancel_request(request_ptr & req) 00090 { 00091 if (req.empty()) 00092 STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue."); 00093 if (_thread_state() != RUNNING) 00094 STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue."); 00095 00096 bool was_still_in_queue = false; 00097 if (req.get()->get_type() == request::READ) 00098 { 00099 scoped_mutex_lock Lock(read_mutex); 00100 queue_type::iterator pos; 00101 if ((pos = std::find(read_queue.begin(), read_queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != read_queue.end()) 00102 { 00103 read_queue.erase(pos); 00104 was_still_in_queue = true; 00105 sem--; 00106 } 00107 } 00108 else 00109 { 00110 scoped_mutex_lock Lock(write_mutex); 00111 queue_type::iterator pos; 00112 if ((pos = std::find(write_queue.begin(), write_queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != write_queue.end()) 00113 { 00114 write_queue.erase(pos); 00115 was_still_in_queue = true; 00116 sem--; 00117 } 00118 } 00119 00120 return was_still_in_queue; 00121 } 00122 00123 request_queue_impl_qwqr::~request_queue_impl_qwqr() 00124 { 00125 stop_thread(thread, _thread_state, sem); 00126 } 00127 00128 void * request_queue_impl_qwqr::worker(void * arg) 00129 { 00130 self * pthis = static_cast<self *>(arg); 00131 request_ptr req; 00132 00133 bool write_phase = true; 00134 for ( ; ; ) 00135 { 00136 pthis->sem--; 00137 00138 if (write_phase) 00139 { 00140 scoped_mutex_lock WriteLock(pthis->write_mutex); 00141 if (!pthis->write_queue.empty()) 00142 { 00143 req = pthis->write_queue.front(); 00144 pthis->write_queue.pop_front(); 00145 00146 WriteLock.unlock(); 00147 00148 //assert(req->nref() > 1); 00149 req->serve(); 00150 } 00151 else 00152 { 00153 WriteLock.unlock(); 00154 00155 pthis->sem++; 00156 00157 if (pthis->_priority_op == WRITE) 00158 write_phase = false; 00159 } 00160 00161 if (pthis->_priority_op == NONE 00162 || pthis->_priority_op == READ) 00163 write_phase = false; 00164 } 00165 else 00166 { 00167 scoped_mutex_lock ReadLock(pthis->read_mutex); 00168 00169 if (!pthis->read_queue.empty()) 00170 { 00171 req = pthis->read_queue.front(); 00172 pthis->read_queue.pop_front(); 00173 00174 ReadLock.unlock(); 00175 00176 STXXL_VERBOSE2("queue: before serve request has " << req->nref() << " references "); 00177 //assert(req->nref() > 1); 00178 req->serve(); 00179 STXXL_VERBOSE2("queue: after serve request has " << req->nref() << " references "); 00180 } 00181 else 00182 { 00183 ReadLock.unlock(); 00184 00185 pthis->sem++; 00186 00187 if (pthis->_priority_op == READ) 00188 write_phase = true; 00189 } 00190 00191 if (pthis->_priority_op == NONE 00192 || pthis->_priority_op == WRITE) 00193 write_phase = true; 00194 } 00195 00196 // terminate if it has been requested and queues are empty 00197 if (pthis->_thread_state() == TERMINATE) { 00198 if ((pthis->sem--) == 0) 00199 break; 00200 else 00201 pthis->sem++; 00202 } 00203 } 00204 00205 return NULL; 00206 } 00207 00208 __STXXL_END_NAMESPACE 00209 // vim: et:ts=4:sw=4