Stxxl  1.4.0
io/request_queue_impl_qwqr.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  io/request_queue_impl_qwqr.cpp
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00008  *  Copyright (C) 2009 Johannes Singler <singler@ira.uka.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #include <algorithm>
00016 
00017 #include <stxxl/bits/io/request_queue_impl_qwqr.h>
00018 #include <stxxl/bits/io/request_with_state.h>
00019 #include <stxxl/bits/parallel.h>
00020 
00021 
00022 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
00023 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
00024 #endif
00025 
00026 __STXXL_BEGIN_NAMESPACE
00027 
00028 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool>
00029 {
00030     bool operator () (
00031         const request_ptr & a,
00032         const request_ptr & b) const
00033     {
00034         // matching file and offset are enough to cause problems
00035         return (a->get_offset() == b->get_offset()) &&
00036                (a->get_file() == b->get_file());
00037     }
00038 };
00039 
00040 request_queue_impl_qwqr::request_queue_impl_qwqr(int n) : _thread_state(NOT_RUNNING), sem(0)
00041 {
00042     STXXL_UNUSED(n);
00043     start_thread(worker, static_cast<void *>(this), thread, _thread_state);
00044 }
00045 
00046 void request_queue_impl_qwqr::add_request(request_ptr & req)
00047 {
00048     if (req.empty())
00049         STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
00050     if (_thread_state() != RUNNING)
00051         STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
00052 
00053     if (req.get()->get_type() == request::READ)
00054     {
00055 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
00056         {
00057             scoped_mutex_lock Lock(write_mutex);
00058             if (std::find_if(write_queue.begin(), write_queue.end(),
00059                              bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
00060                 != write_queue.end())
00061             {
00062                 STXXL_ERRMSG("READ request submitted for a BID with a pending WRITE request");
00063             }
00064         }
00065 #endif
00066         scoped_mutex_lock Lock(read_mutex);
00067         read_queue.push_back(req);
00068     }
00069     else
00070     {
00071 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
00072         {
00073             scoped_mutex_lock Lock(read_mutex);
00074             if (std::find_if(read_queue.begin(), read_queue.end(),
00075                              bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
00076                 != read_queue.end())
00077             {
00078                 STXXL_ERRMSG("WRITE request submitted for a BID with a pending READ request");
00079             }
00080         }
00081 #endif
00082         scoped_mutex_lock Lock(write_mutex);
00083         write_queue.push_back(req);
00084     }
00085 
00086     sem++;
00087 }
00088 
00089 bool request_queue_impl_qwqr::cancel_request(request_ptr & req)
00090 {
00091     if (req.empty())
00092         STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
00093     if (_thread_state() != RUNNING)
00094         STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
00095 
00096     bool was_still_in_queue = false;
00097     if (req.get()->get_type() == request::READ)
00098     {
00099         scoped_mutex_lock Lock(read_mutex);
00100         queue_type::iterator pos;
00101         if ((pos = std::find(read_queue.begin(), read_queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != read_queue.end())
00102         {
00103             read_queue.erase(pos);
00104             was_still_in_queue = true;
00105             sem--;
00106         }
00107     }
00108     else
00109     {
00110         scoped_mutex_lock Lock(write_mutex);
00111         queue_type::iterator pos;
00112         if ((pos = std::find(write_queue.begin(), write_queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != write_queue.end())
00113         {
00114             write_queue.erase(pos);
00115             was_still_in_queue = true;
00116             sem--;
00117         }
00118     }
00119 
00120     return was_still_in_queue;
00121 }
00122 
00123 request_queue_impl_qwqr::~request_queue_impl_qwqr()
00124 {
00125     stop_thread(thread, _thread_state, sem);
00126 }
00127 
00128 void * request_queue_impl_qwqr::worker(void * arg)
00129 {
00130     self * pthis = static_cast<self *>(arg);
00131     request_ptr req;
00132 
00133     bool write_phase = true;
00134     for ( ; ; )
00135     {
00136         pthis->sem--;
00137 
00138         if (write_phase)
00139         {
00140             scoped_mutex_lock WriteLock(pthis->write_mutex);
00141             if (!pthis->write_queue.empty())
00142             {
00143                 req = pthis->write_queue.front();
00144                 pthis->write_queue.pop_front();
00145 
00146                 WriteLock.unlock();
00147 
00148                 //assert(req->nref() > 1);
00149                 req->serve();
00150             }
00151             else
00152             {
00153                 WriteLock.unlock();
00154 
00155                 pthis->sem++;
00156 
00157                 if (pthis->_priority_op == WRITE)
00158                     write_phase = false;
00159             }
00160 
00161             if (pthis->_priority_op == NONE
00162                 || pthis->_priority_op == READ)
00163                 write_phase = false;
00164         }
00165         else
00166         {
00167             scoped_mutex_lock ReadLock(pthis->read_mutex);
00168 
00169             if (!pthis->read_queue.empty())
00170             {
00171                 req = pthis->read_queue.front();
00172                 pthis->read_queue.pop_front();
00173 
00174                 ReadLock.unlock();
00175 
00176                 STXXL_VERBOSE2("queue: before serve request has " << req->nref() << " references ");
00177                 //assert(req->nref() > 1);
00178                 req->serve();
00179                 STXXL_VERBOSE2("queue: after serve request has " << req->nref() << " references ");
00180             }
00181             else
00182             {
00183                 ReadLock.unlock();
00184 
00185                 pthis->sem++;
00186 
00187                 if (pthis->_priority_op == READ)
00188                     write_phase = true;
00189             }
00190 
00191             if (pthis->_priority_op == NONE
00192                 || pthis->_priority_op == WRITE)
00193                 write_phase = true;
00194         }
00195 
00196         // terminate if it has been requested and queues are empty
00197         if (pthis->_thread_state() == TERMINATE) {
00198             if ((pthis->sem--) == 0)
00199                 break;
00200             else
00201                 pthis->sem++;
00202         }
00203     }
00204 
00205     return NULL;
00206 }
00207 
00208 __STXXL_END_NAMESPACE
00209 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines