/* Copyright 2005-2012 Intel Corporation. All Rights Reserved. This file is part of Threading Building Blocks. Threading Building Blocks is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. Threading Building Blocks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Threading Building Blocks; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA As a special exception, you may use this file as part of a free software library without restriction. Specifically, if other files instantiate templates or use macros or inline functions from this file, or you compile this file and link it with other files to produce an executable, this file does not by itself cause the resulting executable to be covered by the GNU General Public License. This exception does not however invalidate any other reasons why the executable file might be covered by the GNU General Public License. */ #ifndef __TBB__concurrent_queue_impl_H #define __TBB__concurrent_queue_impl_H #ifndef __TBB_concurrent_queue_H #error Do not #include this internal file directly; use public TBB headers instead. #endif #include "../tbb_stddef.h" #include "../tbb_machine.h" #include "../atomic.h" #include "../spin_mutex.h" #include "../cache_aligned_allocator.h" #include "../tbb_exception.h" #include "../tbb_profiling.h" #include #if !TBB_USE_EXCEPTIONS && _MSC_VER // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers #pragma warning (push) #pragma warning (disable: 4530) #endif #include #if !TBB_USE_EXCEPTIONS && _MSC_VER #pragma warning (pop) #endif namespace tbb { #if !__TBB_TEMPLATE_FRIENDS_BROKEN // forward declaration namespace strict_ppl { template class concurrent_queue; } template class concurrent_bounded_queue; namespace deprecated { template class concurrent_queue; } #endif //! For internal use only. namespace strict_ppl { //! @cond INTERNAL namespace internal { using namespace tbb::internal; typedef size_t ticket; template class micro_queue ; template class micro_queue_pop_finalizer ; template class concurrent_queue_base_v3; //! parts of concurrent_queue_rep that do not have references to micro_queue /** * For internal use only. */ struct concurrent_queue_rep_base : no_copy { template friend class micro_queue; template friend class concurrent_queue_base_v3; protected: //! Approximately n_queue/golden ratio static const size_t phi = 3; public: // must be power of 2 static const size_t n_queue = 8; //! Prefix on a page struct page { page* next; uintptr_t mask; }; atomic head_counter; char pad1[NFS_MaxLineSize-sizeof(atomic)]; atomic tail_counter; char pad2[NFS_MaxLineSize-sizeof(atomic)]; //! Always a power of 2 size_t items_per_page; //! Size of an item size_t item_size; //! number of invalid entries in the queue atomic n_invalid_entries; char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic)]; } ; inline bool is_valid_page(const concurrent_queue_rep_base::page* p) { return uintptr_t(p)>1; } //! Abstract class to define interface for page allocation/deallocation /** * For internal use only. */ class concurrent_queue_page_allocator { template friend class micro_queue ; template friend class micro_queue_pop_finalizer ; protected: virtual ~concurrent_queue_page_allocator() {} private: virtual concurrent_queue_rep_base::page* allocate_page() = 0; virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0; } ; #if _MSC_VER && !defined(__INTEL_COMPILER) // unary minus operator applied to unsigned type, result still unsigned #pragma warning( push ) #pragma warning( disable: 4146 ) #endif //! A queue using simple locking. /** For efficiency, this class has no constructor. The caller is expected to zero-initialize it. */ template class micro_queue : no_copy { typedef concurrent_queue_rep_base::page page; //! Class used to ensure exception-safety of method "pop" class destroyer: no_copy { T& my_value; public: destroyer( T& value ) : my_value(value) {} ~destroyer() {my_value.~T();} }; void copy_item( page& dst, size_t index, const void* src ) { new( &get_ref(dst,index) ) T(*static_cast(src)); } void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) { new( &get_ref(dst,dindex) ) T( get_ref(const_cast(src),sindex) ); } void assign_and_destroy_item( void* dst, page& src, size_t index ) { T& from = get_ref(src,index); destroyer d(from); *static_cast(dst) = from; } void spin_wait_until_my_turn( atomic& counter, ticket k, concurrent_queue_rep_base& rb ) const ; public: friend class micro_queue_pop_finalizer; struct padded_page: page { //! Not defined anywhere - exists to quiet warnings. padded_page(); //! Not defined anywhere - exists to quiet warnings. void operator=( const padded_page& ); //! Must be last field. T last; }; static T& get_ref( page& p, size_t index ) { return (&static_cast(static_cast(&p))->last)[index]; } atomic head_page; atomic head_counter; atomic tail_page; atomic tail_counter; spin_mutex page_mutex; void push( const void* item, ticket k, concurrent_queue_base_v3& base ) ; bool pop( void* dst, ticket k, concurrent_queue_base_v3& base ) ; micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3& base ) ; page* make_copy( concurrent_queue_base_v3& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ; void invalidate_page_and_rethrow( ticket k ) ; }; template void micro_queue::spin_wait_until_my_turn( atomic& counter, ticket k, concurrent_queue_rep_base& rb ) const { atomic_backoff backoff; do { backoff.pause(); if( counter&1 ) { ++rb.n_invalid_entries; throw_exception( eid_bad_last_alloc ); } } while( counter!=k ) ; } template void micro_queue::push( const void* item, ticket k, concurrent_queue_base_v3& base ) { k &= -concurrent_queue_rep_base::n_queue; page* p = NULL; size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); if( !index ) { __TBB_TRY { concurrent_queue_page_allocator& pa = base; p = pa.allocate_page(); } __TBB_CATCH (...) { ++base.my_rep->n_invalid_entries; invalidate_page_and_rethrow( k ); } p->mask = 0; p->next = NULL; } if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep ); call_itt_notify(acquired, &tail_counter); if( p ) { spin_mutex::scoped_lock lock( page_mutex ); page* q = tail_page; if( is_valid_page(q) ) q->next = p; else head_page = p; tail_page = p; } else { p = tail_page; } __TBB_TRY { copy_item( *p, index, item ); // If no exception was thrown, mark item as present. itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<n_invalid_entries; call_itt_notify(releasing, &tail_counter); tail_counter += concurrent_queue_rep_base::n_queue; __TBB_RETHROW(); } } template bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base_v3& base ) { k &= -concurrent_queue_rep_base::n_queue; if( head_counter!=k ) spin_wait_until_eq( head_counter, k ); call_itt_notify(acquired, &head_counter); if( tail_counter==k ) spin_wait_while_eq( tail_counter, k ); call_itt_notify(acquired, &tail_counter); page& p = *head_page; __TBB_ASSERT( &p, NULL ); size_t index = k/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); bool success = false; { micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); if( p.mask & uintptr_t(1)<n_invalid_entries; } } return success; } template micro_queue& micro_queue::assign( const micro_queue& src, concurrent_queue_base_v3& base ) { head_counter = src.head_counter; tail_counter = src.tail_counter; page_mutex = src.page_mutex; const page* srcp = src.head_page; if( is_valid_page(srcp) ) { ticket g_index = head_counter; __TBB_TRY { size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue; size_t index = head_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); size_t end_in_first_page = (index+n_itemsitems_per_page)?(index+n_items):base.my_rep->items_per_page; head_page = make_copy( base, srcp, index, end_in_first_page, g_index ); page* cur_page = head_page; if( srcp != src.tail_page ) { for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) { cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index ); cur_page = cur_page->next; } __TBB_ASSERT( srcp==src.tail_page, NULL ); size_t last_index = tail_counter/concurrent_queue_rep_base::n_queue & (base.my_rep->items_per_page-1); if( last_index==0 ) last_index = base.my_rep->items_per_page; cur_page->next = make_copy( base, srcp, 0, last_index, g_index ); cur_page = cur_page->next; } tail_page = cur_page; } __TBB_CATCH (...) { invalidate_page_and_rethrow( g_index ); } } else { head_page = tail_page = NULL; } return *this; } template void micro_queue::invalidate_page_and_rethrow( ticket k ) { // Append an invalid page at address 1 so that no more pushes are allowed. page* invalid_page = (page*)uintptr_t(1); { spin_mutex::scoped_lock lock( page_mutex ); itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1); page* q = tail_page; if( is_valid_page(q) ) q->next = invalid_page; else head_page = invalid_page; tail_page = invalid_page; } __TBB_RETHROW(); } template concurrent_queue_rep_base::page* micro_queue::make_copy( concurrent_queue_base_v3& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) { concurrent_queue_page_allocator& pa = base; page* new_page = pa.allocate_page(); new_page->next = NULL; new_page->mask = src_page->mask; for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index ) if( new_page->mask & uintptr_t(1)< class micro_queue_pop_finalizer: no_copy { typedef concurrent_queue_rep_base::page page; ticket my_ticket; micro_queue& my_queue; page* my_page; concurrent_queue_page_allocator& allocator; public: micro_queue_pop_finalizer( micro_queue& queue, concurrent_queue_base_v3& b, ticket k, page* p ) : my_ticket(k), my_queue(queue), my_page(p), allocator(b) {} ~micro_queue_pop_finalizer() ; }; template micro_queue_pop_finalizer::~micro_queue_pop_finalizer() { page* p = my_page; if( is_valid_page(p) ) { spin_mutex::scoped_lock lock( my_queue.page_mutex ); page* q = p->next; my_queue.head_page = q; if( !is_valid_page(q) ) { my_queue.tail_page = NULL; } } itt_store_word_with_release(my_queue.head_counter, my_ticket); if( is_valid_page(p) ) { allocator.deallocate_page( p ); } } #if _MSC_VER && !defined(__INTEL_COMPILER) #pragma warning( pop ) #endif // warning 4146 is back template class concurrent_queue_iterator_rep ; template class concurrent_queue_iterator_base_v3; //! representation of concurrent_queue_base /** * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue's */ template struct concurrent_queue_rep : public concurrent_queue_rep_base { micro_queue array[n_queue]; //! Map ticket to an array index static size_t index( ticket k ) { return k*phi%n_queue; } micro_queue& choose( ticket k ) { // The formula here approximates LRU in a cache-oblivious way. return array[index(k)]; } }; //! base class of concurrent_queue /** * The class implements the interface defined by concurrent_queue_page_allocator * and has a pointer to an instance of concurrent_queue_rep. */ template class concurrent_queue_base_v3: public concurrent_queue_page_allocator { //! Internal representation concurrent_queue_rep* my_rep; friend struct concurrent_queue_rep; friend class micro_queue; friend class concurrent_queue_iterator_rep; friend class concurrent_queue_iterator_base_v3; protected: typedef typename concurrent_queue_rep::page page; private: typedef typename micro_queue::padded_page padded_page; /* override */ virtual page *allocate_page() { concurrent_queue_rep& r = *my_rep; size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T); return reinterpret_cast(allocate_block ( n )); } /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) { concurrent_queue_rep& r = *my_rep; size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T); deallocate_block( reinterpret_cast(p), n ); } //! custom allocator virtual void *allocate_block( size_t n ) = 0; //! custom de-allocator virtual void deallocate_block( void *p, size_t n ) = 0; protected: concurrent_queue_base_v3(); /* override */ virtual ~concurrent_queue_base_v3() { #if TBB_USE_ASSERT size_t nq = my_rep->n_queue; for( size_t i=0; iarray[i].tail_page==NULL, "pages were not freed properly" ); #endif /* TBB_USE_ASSERT */ cache_aligned_allocator >().deallocate(my_rep,1); } //! Enqueue item at tail of queue void internal_push( const void* src ) { concurrent_queue_rep& r = *my_rep; ticket k = r.tail_counter++; r.choose(k).push( src, k, *this ); } //! Attempt to dequeue item from queue. /** NULL if there was no item to dequeue. */ bool internal_try_pop( void* dst ) ; //! Get size of queue; result may be invalid if queue is modified concurrently size_t internal_size() const ; //! check if the queue is empty; thread safe bool internal_empty() const ; //! free any remaining pages /* note that the name may be misleading, but it remains so due to a historical accident. */ void internal_finish_clear() ; //! Obsolete void internal_throw_exception() const { throw_exception( eid_bad_alloc ); } //! copy internal representation void assign( const concurrent_queue_base_v3& src ) ; }; template concurrent_queue_base_v3::concurrent_queue_base_v3() { const size_t item_size = sizeof(T); my_rep = cache_aligned_allocator >().allocate(1); __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" ); __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" ); __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" ); __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" ); memset(my_rep,0,sizeof(concurrent_queue_rep)); my_rep->item_size = item_size; my_rep->items_per_page = item_size<= 8 ? 32 : item_size<= 16 ? 16 : item_size<= 32 ? 8 : item_size<= 64 ? 4 : item_size<=128 ? 2 : 1; } template bool concurrent_queue_base_v3::internal_try_pop( void* dst ) { concurrent_queue_rep& r = *my_rep; ticket k; do { k = r.head_counter; for(;;) { if( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // Queue is empty return false; } // Queue had item with ticket k when we looked. Attempt to get that item. ticket tk=k; #if defined(_MSC_VER) && defined(_Wp64) #pragma warning (push) #pragma warning (disable: 4267) #endif k = r.head_counter.compare_and_swap( tk+1, tk ); #if defined(_MSC_VER) && defined(_Wp64) #pragma warning (pop) #endif if( k==tk ) break; // Another thread snatched the item, retry. } } while( !r.choose( k ).pop( dst, k, *this ) ); return true; } template size_t concurrent_queue_base_v3::internal_size() const { concurrent_queue_rep& r = *my_rep; __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL ); ticket hc = r.head_counter; size_t nie = r.n_invalid_entries; ticket tc = r.tail_counter; __TBB_ASSERT( hc!=tc || !nie, NULL ); ptrdiff_t sz = tc-hc-nie; return sz<0 ? 0 : size_t(sz); } template bool concurrent_queue_base_v3::internal_empty() const { concurrent_queue_rep& r = *my_rep; ticket tc = r.tail_counter; ticket hc = r.head_counter; // if tc!=r.tail_counter, the queue was not empty at some point between the two reads. return tc==r.tail_counter && tc==hc+r.n_invalid_entries ; } template void concurrent_queue_base_v3::internal_finish_clear() { concurrent_queue_rep& r = *my_rep; size_t nq = r.n_queue; for( size_t i=0; i void concurrent_queue_base_v3::assign( const concurrent_queue_base_v3& src ) { concurrent_queue_rep& r = *my_rep; r.items_per_page = src.my_rep->items_per_page; // copy concurrent_queue_rep. r.head_counter = src.my_rep->head_counter; r.tail_counter = src.my_rep->tail_counter; r.n_invalid_entries = src.my_rep->n_invalid_entries; // copy micro_queues for( size_t i = 0; iarray[i], *this); __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, "the source concurrent queue should not be concurrently modified." ); } template class concurrent_queue_iterator; template class concurrent_queue_iterator_rep: no_assign { typedef typename micro_queue::padded_page padded_page; public: ticket head_counter; const concurrent_queue_base_v3& my_queue; typename concurrent_queue_base_v3::page* array[concurrent_queue_rep::n_queue]; concurrent_queue_iterator_rep( const concurrent_queue_base_v3& queue ) : head_counter(queue.my_rep->head_counter), my_queue(queue) { for( size_t k=0; k::n_queue; ++k ) array[k] = queue.my_rep->array[k].head_page; } //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise. bool get_item( T*& item, size_t k ) ; }; template bool concurrent_queue_iterator_rep::get_item( T*& item, size_t k ) { if( k==my_queue.my_rep->tail_counter ) { item = NULL; return true; } else { typename concurrent_queue_base_v3::page* p = array[concurrent_queue_rep::index(k)]; __TBB_ASSERT(p,NULL); size_t i = k/concurrent_queue_rep::n_queue & (my_queue.my_rep->items_per_page-1); item = µ_queue::get_ref(*p,i); return (p->mask & uintptr_t(1)< class concurrent_queue_iterator_base_v3 : no_assign { //! Represents concurrent_queue over which we are iterating. /** NULL if one past last element in queue. */ concurrent_queue_iterator_rep* my_rep; template friend bool operator==( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ); template friend bool operator!=( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ); protected: //! Pointer to current item Value* my_item; //! Default constructor concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) { #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN __TBB_compiler_fence(); #endif } //! Copy constructor concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : no_assign(), my_rep(NULL), my_item(NULL) { assign(i); } //! Construct iterator pointing to head of queue. concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue ) ; //! Assignment void assign( const concurrent_queue_iterator_base_v3& other ) ; //! Advance iterator one step towards tail of queue. void advance() ; //! Destructor ~concurrent_queue_iterator_base_v3() { cache_aligned_allocator >().deallocate(my_rep, 1); my_rep = NULL; } }; template concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue ) { my_rep = cache_aligned_allocator >().allocate(1); new( my_rep ) concurrent_queue_iterator_rep(queue); size_t k = my_rep->head_counter; if( !my_rep->get_item(my_item, k) ) advance(); } template void concurrent_queue_iterator_base_v3::assign( const concurrent_queue_iterator_base_v3& other ) { if( my_rep!=other.my_rep ) { if( my_rep ) { cache_aligned_allocator >().deallocate(my_rep, 1); my_rep = NULL; } if( other.my_rep ) { my_rep = cache_aligned_allocator >().allocate(1); new( my_rep ) concurrent_queue_iterator_rep( *other.my_rep ); } } my_item = other.my_item; } template void concurrent_queue_iterator_base_v3::advance() { __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" ); size_t k = my_rep->head_counter; const concurrent_queue_base_v3& queue = my_rep->my_queue; #if TBB_USE_ASSERT Value* tmp; my_rep->get_item(tmp,k); __TBB_ASSERT( my_item==tmp, NULL ); #endif /* TBB_USE_ASSERT */ size_t i = k/concurrent_queue_rep::n_queue & (queue.my_rep->items_per_page-1); if( i==queue.my_rep->items_per_page-1 ) { typename concurrent_queue_base_v3::page*& root = my_rep->array[concurrent_queue_rep::index(k)]; root = root->next; } // advance k my_rep->head_counter = ++k; if( !my_rep->get_item(my_item, k) ) advance(); } //! Similar to C++0x std::remove_cv /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */ template struct tbb_remove_cv {typedef T type;}; template struct tbb_remove_cv {typedef T type;}; template struct tbb_remove_cv {typedef T type;}; template struct tbb_remove_cv {typedef T type;}; //! Meets requirements of a forward iterator for STL. /** Value is either the T or const T type of the container. @ingroup containers */ template class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3::type>, public std::iterator { #if !__TBB_TEMPLATE_FRIENDS_BROKEN template friend class ::tbb::strict_ppl::concurrent_queue; #else public: // workaround for MSVC #endif //! Construct iterator pointing to head of queue. concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) : concurrent_queue_iterator_base_v3::type>(queue) { } public: concurrent_queue_iterator() {} concurrent_queue_iterator( const concurrent_queue_iterator& other ) : concurrent_queue_iterator_base_v3::type>(other) {} //! Iterator assignment concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) { this->assign(other); return *this; } //! Reference to current item Value& operator*() const { return *static_cast(this->my_item); } Value* operator->() const {return &operator*();} //! Advance to next item in queue concurrent_queue_iterator& operator++() { this->advance(); return *this; } //! Post increment Value* operator++(int) { Value* result = &operator*(); operator++(); return result; } }; // concurrent_queue_iterator template bool operator==( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ) { return i.my_item==j.my_item; } template bool operator!=( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ) { return i.my_item!=j.my_item; } } // namespace internal //! @endcond } // namespace strict_ppl //! @cond INTERNAL namespace internal { class concurrent_queue_rep; class concurrent_queue_iterator_rep; class concurrent_queue_iterator_base_v3; template class concurrent_queue_iterator; //! For internal use only. /** Type-independent portion of concurrent_queue. @ingroup containers */ class concurrent_queue_base_v3: no_copy { //! Internal representation concurrent_queue_rep* my_rep; friend class concurrent_queue_rep; friend struct micro_queue; friend class micro_queue_pop_finalizer; friend class concurrent_queue_iterator_rep; friend class concurrent_queue_iterator_base_v3; protected: //! Prefix on a page struct page { page* next; uintptr_t mask; }; //! Capacity of the queue ptrdiff_t my_capacity; //! Always a power of 2 size_t items_per_page; //! Size of an item size_t item_size; #if __TBB_PROTECTED_NESTED_CLASS_BROKEN public: #endif template struct padded_page: page { //! Not defined anywhere - exists to quiet warnings. padded_page(); //! Not defined anywhere - exists to quiet warnings. void operator=( const padded_page& ); //! Must be last field. T last; }; private: virtual void copy_item( page& dst, size_t index, const void* src ) = 0; virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0; protected: __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size ); virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3(); //! Enqueue item at tail of queue void __TBB_EXPORTED_METHOD internal_push( const void* src ); //! Dequeue item from head of queue void __TBB_EXPORTED_METHOD internal_pop( void* dst ); //! Abort all pending queue operations void __TBB_EXPORTED_METHOD internal_abort(); //! Attempt to enqueue item onto queue. bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src ); //! Attempt to dequeue item from queue. /** NULL if there was no item to dequeue. */ bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst ); //! Get size of queue ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const; //! Check if the queue is emtpy bool __TBB_EXPORTED_METHOD internal_empty() const; //! Set the queue capacity void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size ); //! custom allocator virtual page *allocate_page() = 0; //! custom de-allocator virtual void deallocate_page( page *p ) = 0; //! free any remaining pages /* note that the name may be misleading, but it remains so due to a historical accident. */ void __TBB_EXPORTED_METHOD internal_finish_clear() ; //! throw an exception void __TBB_EXPORTED_METHOD internal_throw_exception() const; //! copy internal representation void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ; private: virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0; }; //! Type-independent portion of concurrent_queue_iterator. /** @ingroup containers */ class concurrent_queue_iterator_base_v3 { //! concurrent_queue over which we are iterating. /** NULL if one past last element in queue. */ concurrent_queue_iterator_rep* my_rep; template friend bool operator==( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ); template friend bool operator!=( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ); void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data ); protected: //! Pointer to current item void* my_item; //! Default constructor concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {} //! Copy constructor concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) { assign(i); } //! Obsolete entry point for constructing iterator pointing to head of queue. /** Does not work correctly for SSE types. */ __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue ); //! Construct iterator pointing to head of queue. __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data ); //! Assignment void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i ); //! Advance iterator one step towards tail of queue. void __TBB_EXPORTED_METHOD advance(); //! Destructor __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3(); }; typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base; //! Meets requirements of a forward iterator for STL. /** Value is either the T or const T type of the container. @ingroup containers */ template class concurrent_queue_iterator: public concurrent_queue_iterator_base, public std::iterator { #if !defined(_MSC_VER) || defined(__INTEL_COMPILER) template friend class ::tbb::concurrent_bounded_queue; template friend class ::tbb::deprecated::concurrent_queue; #else public: // workaround for MSVC #endif //! Construct iterator pointing to head of queue. concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) : concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page,last)) { } public: concurrent_queue_iterator() {} /** If Value==Container::value_type, then this routine is the copy constructor. If Value==const Container::value_type, then this routine is a conversion constructor. */ concurrent_queue_iterator( const concurrent_queue_iterator& other ) : concurrent_queue_iterator_base_v3(other) {} //! Iterator assignment concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) { assign(other); return *this; } //! Reference to current item Value& operator*() const { return *static_cast(my_item); } Value* operator->() const {return &operator*();} //! Advance to next item in queue concurrent_queue_iterator& operator++() { advance(); return *this; } //! Post increment Value* operator++(int) { Value* result = &operator*(); operator++(); return result; } }; // concurrent_queue_iterator template bool operator==( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ) { return i.my_item==j.my_item; } template bool operator!=( const concurrent_queue_iterator& i, const concurrent_queue_iterator& j ) { return i.my_item!=j.my_item; } } // namespace internal; //! @endcond } // namespace tbb #endif /* __TBB__concurrent_queue_impl_H */