Stxxl  1.4.0
io/request_queue_impl_1q.cpp
Go to the documentation of this file.
00001 /***************************************************************************
00002  *  io/request_queue_impl_1q.cpp
00003  *
00004  *  Part of the STXXL. See http://stxxl.sourceforge.net
00005  *
00006  *  Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
00007  *  Copyright (C) 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
00008  *  Copyright (C) 2009 Johannes Singler <singler@ira.uka.de>
00009  *
00010  *  Distributed under the Boost Software License, Version 1.0.
00011  *  (See accompanying file LICENSE_1_0.txt or copy at
00012  *  http://www.boost.org/LICENSE_1_0.txt)
00013  **************************************************************************/
00014 
00015 #include <algorithm>
00016 
00017 #include <stxxl/bits/io/request_queue_impl_1q.h>
00018 #include <stxxl/bits/io/request_with_state.h>
00019 #include <stxxl/bits/parallel.h>
00020 
00021 
00022 #ifndef STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
00023 #define STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
00024 #endif
00025 
00026 __STXXL_BEGIN_NAMESPACE
00027 
00028 struct file_offset_match : public std::binary_function<request_ptr, request_ptr, bool>
00029 {
00030     bool operator () (
00031         const request_ptr & a,
00032         const request_ptr & b) const
00033     {
00034         // matching file and offset are enough to cause problems
00035         return (a->get_offset() == b->get_offset()) &&
00036                (a->get_file() == b->get_file());
00037     }
00038 };
00039 
00040 request_queue_impl_1q::request_queue_impl_1q(int n) : _thread_state(NOT_RUNNING), sem(0)
00041 {
00042     STXXL_UNUSED(n);
00043     start_thread(worker, static_cast<void *>(this), thread, _thread_state);
00044 }
00045 
00046 void request_queue_impl_1q::add_request(request_ptr & req)
00047 {
00048     if (req.empty())
00049         STXXL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
00050     if (_thread_state() != RUNNING)
00051         STXXL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
00052 
00053 #if STXXL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
00054     {
00055         scoped_mutex_lock Lock(queue_mutex);
00056         if (std::find_if(queue.begin(), queue.end(),
00057                          bind2nd(file_offset_match(), req) _STXXL_FORCE_SEQUENTIAL)
00058             != queue.end())
00059         {
00060             STXXL_ERRMSG("request submitted for a BID with a pending request");
00061         }
00062     }
00063 #endif
00064     scoped_mutex_lock Lock(queue_mutex);
00065     queue.push_back(req);
00066 
00067     sem++;
00068 }
00069 
00070 bool request_queue_impl_1q::cancel_request(request_ptr & req)
00071 {
00072     if (req.empty())
00073         STXXL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
00074     if (_thread_state() != RUNNING)
00075         STXXL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
00076 
00077     bool was_still_in_queue = false;
00078     {
00079         scoped_mutex_lock Lock(queue_mutex);
00080         queue_type::iterator pos;
00081         if ((pos = std::find(queue.begin(), queue.end(), req _STXXL_FORCE_SEQUENTIAL)) != queue.end())
00082         {
00083             queue.erase(pos);
00084             was_still_in_queue = true;
00085             sem--;
00086         }
00087     }
00088 
00089     return was_still_in_queue;
00090 }
00091 
00092 request_queue_impl_1q::~request_queue_impl_1q()
00093 {
00094     stop_thread(thread, _thread_state, sem);
00095 }
00096 
00097 void * request_queue_impl_1q::worker(void * arg)
00098 {
00099     self * pthis = static_cast<self *>(arg);
00100     request_ptr req;
00101 
00102     for ( ; ; )
00103     {
00104         pthis->sem--;
00105 
00106         {
00107             scoped_mutex_lock Lock(pthis->queue_mutex);
00108             if (!pthis->queue.empty())
00109             {
00110                 req = pthis->queue.front();
00111                 pthis->queue.pop_front();
00112 
00113                 Lock.unlock();
00114 
00115                 //assert(req->nref() > 1);
00116                 req->serve();
00117             }
00118             else
00119             {
00120                 Lock.unlock();
00121 
00122                 pthis->sem++;
00123             }
00124         }
00125 
00126         // terminate if it has been requested and queues are empty
00127         if (pthis->_thread_state() == TERMINATE) {
00128             if ((pthis->sem--) == 0)
00129                 break;
00130             else
00131                 pthis->sem++;
00132         }
00133     }
00134 
00135     return NULL;
00136 }
00137 
00138 __STXXL_END_NAMESPACE
00139 // vim: et:ts=4:sw=4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines