Stxxl
1.4.0
|
00001 /*************************************************************************** 00002 * io/request_queue_impl_1q.cpp 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * Copyright (C) 2009 Johannes Singler <singler@ira.uka.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #include <algorithm> 00016 00017 #include <stxxl/bits/io/request_queue_impl_1q.h> 00018 #include <stxxl/bits/io/request_with_state.h> 00019 #include <stxxl/bits/parallel.h> 00020 00021 00022 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 00023 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1 00024 #endif 00025 00026 __STXXL_BEGIN_NAMESPACE 00027 00028 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool> 00029 { 00030 bool operator () ( 00031 const request_ptr & a, 00032 const request_ptr & b) const 00033 { 00034 // matching file and offset are enough to cause problems 00035 return (a->get_offset() == b->get_offset()) && 00036 (a->get_file() == b->get_file()); 00037 } 00038 }; 00039 00040 request_queue_impl_1q::request_queue_impl_1q(int n) : _thread_state(NOT_RUNNING), sem(0) 00041 { 00042 STXXL_UNUSED(n); 00043 start_thread(worker, static_cast<void *>(this), thread, _thread_state); 00044 } 00045 00046 void request_queue_impl_1q::add_request(request_ptr & req) 00047 { 00048 if (req.empty()) 00049 STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue."); 00050 if (_thread_state() != RUNNING) 00051 STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue."); 00052 00053 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 00054 { 00055 scoped_mutex_lock Lock(queue_mutex); 00056 if (std::find_if(queue.begin(), queue.end(), 00057 bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL) 00058 != queue.end()) 00059 { 00060 STXXL_ERRMSG("request submitted for a BID with a pending request"); 00061 } 00062 } 00063 #endif 00064 scoped_mutex_lock Lock(queue_mutex); 00065 queue.push_back(req); 00066 00067 sem++; 00068 } 00069 00070 bool request_queue_impl_1q::cancel_request(request_ptr & req) 00071 { 00072 if (req.empty()) 00073 STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue."); 00074 if (_thread_state() != RUNNING) 00075 STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue."); 00076 00077 bool was_still_in_queue = false; 00078 { 00079 scoped_mutex_lock Lock(queue_mutex); 00080 queue_type::iterator pos; 00081 if ((pos = std::find(queue.begin(), queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != queue.end()) 00082 { 00083 queue.erase(pos); 00084 was_still_in_queue = true; 00085 sem--; 00086 } 00087 } 00088 00089 return was_still_in_queue; 00090 } 00091 00092 request_queue_impl_1q::~request_queue_impl_1q() 00093 { 00094 stop_thread(thread, _thread_state, sem); 00095 } 00096 00097 void * request_queue_impl_1q::worker(void * arg) 00098 { 00099 self * pthis = static_cast<self *>(arg); 00100 request_ptr req; 00101 00102 for ( ; ; ) 00103 { 00104 pthis->sem--; 00105 00106 { 00107 scoped_mutex_lock Lock(pthis->queue_mutex); 00108 if (!pthis->queue.empty()) 00109 { 00110 req = pthis->queue.front(); 00111 pthis->queue.pop_front(); 00112 00113 Lock.unlock(); 00114 00115 //assert(req->nref() > 1); 00116 req->serve(); 00117 } 00118 else 00119 { 00120 Lock.unlock(); 00121 00122 pthis->sem++; 00123 } 00124 } 00125 00126 // terminate if it has been requested and queues are empty 00127 if (pthis->_thread_state() == TERMINATE) { 00128 if ((pthis->sem--) == 0) 00129 break; 00130 else 00131 pthis->sem++; 00132 } 00133 } 00134 00135 return NULL; 00136 } 00137 00138 __STXXL_END_NAMESPACE 00139 // vim: et:ts=4:sw=4