Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateful allocator support for concurrent_queue and concurrent_bounde… #1520

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -137,25 +137,27 @@ class concurrent_queue {
}

concurrent_queue& operator=( const concurrent_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
if (my_queue_representation != other.my_queue_representation) {
if (this != &other) {
clear();
my_allocator = other.my_allocator;
tbb::detail::copy_assign_allocators(my_allocator, other.my_allocator);
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
return *this;
}

concurrent_queue& operator=( concurrent_queue&& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
if (my_queue_representation != other.my_queue_representation) {
if (this != &other) {
clear();
if (my_allocator == other.my_allocator) {
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
tbb::detail::move_assign_allocators(my_allocator, other.my_allocator);
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
if (my_allocator == other.my_allocator) {
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, my_allocator, move_construct_item);
other.clear();
}
}
}
return *this;
Expand All @@ -178,8 +180,7 @@ class concurrent_queue {
}

void swap ( concurrent_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_swap
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
tbb::detail::swap_allocators(my_allocator, other.my_allocator);
internal_swap(other);
}

Expand Down Expand Up @@ -236,6 +237,8 @@ class concurrent_queue {

private:
void internal_swap(concurrent_queue& src) {
if (!queue_allocator_traits::propagate_on_container_swap::value)
__TBB_ASSERT(my_allocator == src.my_allocator, "Swapping with unequal allocators is not allowed");
using std::swap;
swap(my_queue_representation, src.my_queue_representation);
}
Expand All @@ -253,15 +256,13 @@ class concurrent_queue {
template <typename Container, typename Value, typename A>
friend class concurrent_queue_iterator;

static void copy_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for copy construction
new (location) value_type(*static_cast<const value_type*>(src));
// queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));

static void copy_construct_item(queue_allocator_type& allocator, T* location, const void* src) {
queue_allocator_traits::construct(allocator, location, *static_cast<const T*>(src));
}

static void move_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for move construction
new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
static void move_construct_item(queue_allocator_type& allocator, T* location, const void* src) {
queue_allocator_traits::construct(allocator, location, std::move(*static_cast<value_type*>(const_cast<void*>(src))));
}

queue_allocator_type my_allocator;
Expand Down Expand Up @@ -416,25 +417,27 @@ class concurrent_bounded_queue {
}

concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
if (my_queue_representation != other.my_queue_representation) {
if (this != &other) {
clear();
my_allocator = other.my_allocator;
tbb::detail::copy_assign_allocators(my_allocator, other.my_allocator);
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
return *this;
}

concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
if (my_queue_representation != other.my_queue_representation) {
if (this != &other) {
clear();
if (my_allocator == other.my_allocator) {
if (queue_allocator_traits::propagate_on_container_move_assignment::value) {
tbb::detail::move_assign_allocators(my_allocator, other.my_allocator);
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
if (my_allocator == other.my_allocator) {
internal_swap(other);
} else {
my_queue_representation->assign(*other.my_queue_representation, my_allocator, move_construct_item);
other.clear();
}
}
}
return *this;
Expand All @@ -457,8 +460,7 @@ class concurrent_bounded_queue {
}

void swap ( concurrent_bounded_queue& other ) {
//TODO: implement support for std::allocator_traits::propagate_on_container_swap
__TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
tbb::detail::swap_allocators(my_allocator, other.my_allocator);
internal_swap(other);
}

Expand Down Expand Up @@ -641,14 +643,12 @@ class concurrent_bounded_queue {
r1::abort_bounded_queue_monitors(my_monitors);
}

static void copy_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for copy construction
new (location) value_type(*static_cast<const value_type*>(src));
static void copy_construct_item(queue_allocator_type& ator, T* location, const void* src) {
queue_allocator_traits::construct(ator, location, *static_cast<const T*>(src));
}

static void move_construct_item(T* location, const void* src) {
// TODO: use allocator_traits for move construction
new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
static void move_construct_item(queue_allocator_type& ator, T* location, const void* src) {
queue_allocator_traits::construct(ator, location, std::move(*static_cast<value_type*>(const_cast<void*>(src))));
}

template <typename Container, typename Value, typename A>
Expand Down
11 changes: 9 additions & 2 deletions include/oneapi/tbb/detail/_allocator_traits.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2021 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,7 +91,14 @@ void swap_allocators_impl( Allocator& lhs, Allocator& rhs, /*pocs = */ std::true
}

template <typename Allocator>
void swap_allocators_impl( Allocator&, Allocator&, /*pocs = */ std::false_type ) {}
void swap_allocators_impl( Allocator& lhs, Allocator& rhs, /*pocs = */ std::false_type ) {
// If the lhs and rhs are not equal, the behavior is undefined
if (!allocator_traits<Allocator>::is_always_equal::value) {
__TBB_ASSERT(lhs == rhs, "Swapping with unequal allocators is not allowed");
}
tbb::detail::suppress_unused_warning(lhs);
tbb::detail::suppress_unused_warning(rhs);
}

// Swaps allocators only if propagate_on_container_swap is true
template <typename Allocator>
Expand Down
10 changes: 5 additions & 5 deletions include/oneapi/tbb/detail/_concurrent_queue_base.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2022 Intel Corporation
Copyright (c) 2005-2024 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,7 +103,7 @@ class micro_queue {
using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;

public:
using item_constructor_type = void (*)(value_type* location, const void* src);
using item_constructor_type = void (*)(queue_allocator_type&, value_type* location, const void* src);
micro_queue() = default;
micro_queue( const micro_queue& ) = delete;
micro_queue& operator=( const micro_queue& ) = delete;
Expand Down Expand Up @@ -254,7 +254,7 @@ class micro_queue {
new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
copy_item(allocator, *new_page, begin_in_page, *src_page, begin_in_page, construct_item);
}
}
return new_page;
Expand Down Expand Up @@ -324,11 +324,11 @@ class micro_queue {
~destroyer() {my_value.~T();}
}; // class destroyer

void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
void copy_item( queue_allocator_type& allocator, padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
item_constructor_type construct_item )
{
auto& src_item = src[sindex];
construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
construct_item( allocator, &dst[dindex], static_cast<const void*>(&src_item) );
}

void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
Expand Down
Loading