Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ACE/ace/Asynch_IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1060,9 +1060,7 @@ ACE_Handler::ACE_Handler (ACE_Proactor *d)

ACE_Handler::~ACE_Handler ()
{
ACE_Handler::Proxy *p = this->proxy_.get ();
if (p)
p->reset ();
deregister_callback ();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this to a new public method which is only used here?

Copy link
Author

@thildenbrand thildenbrand Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to use it from derived classes. It could therefore be made protected.
The order of operations in destructors would only execute this after the destructor of the derived class had already been executed, enabling callbacks to a semy-destructed object.

}

void
Expand Down Expand Up @@ -1121,6 +1119,14 @@ ACE_Handler::handle_wakeup ()
{
}

void
ACE_Handler::deregister_callback ()
{
ACE_Handler::Proxy* p = this->proxy_.get ();
if (p)
p->reset ();
}

ACE_Proactor *
ACE_Handler::proactor ()
{
Expand Down
13 changes: 12 additions & 1 deletion ACE/ace/Asynch_IO.h
Original file line number Diff line number Diff line change
Expand Up @@ -1620,6 +1620,9 @@ class ACE_Export ACE_Handler
*/
virtual void handle_wakeup ();

/// Call before destruction to ensure no more callbacks can happen.
void deregister_callback ();

/// Get the proactor associated with this handler.
ACE_Proactor *proactor ();

Expand Down Expand Up @@ -1649,10 +1652,18 @@ class ACE_Export ACE_Handler
{
public:
Proxy (ACE_Handler *handler) : handler_ (handler) {}
void reset () { this->handler_ = 0; }
void reset ()
{
acquire ();
this->handler_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a guard for the acquire/release, no need for public acquire/release, use nullptr,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use std::atomic instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered using a guard here, but I ran into problems, when doing the actual callback to the ACE_Handler for the timeout. A possible solution was to allow the code that actually calls the ACE_Handler to lock the mutex.
This results in only two possibilities. Either I get the timeout callback, or I got to set back the handler pointer to the nullptr.

release ();
}
ACE_Handler *handler () { return this->handler_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is reading itself thread safe?

Copy link
Author

@thildenbrand thildenbrand Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I added the acquire/release methods. Reading the pointer does not completely solve the problem, since I just end up with another raw pointer and the other core could again be in the process of destructing the object.

The timeout code looks something like this:

proxy->acquire();
if (proxy->handler() != nullptr)
   proxy->handler()->callback();
proxy->release();

I'm very open to a better solution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is risky, keeping a lock during a callback. In taox11 we use shared/weak pointers

Copy link
Author

@thildenbrand thildenbrand Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess replacing the ACE_Handler* completely with some kind of shared_pointer could also solve the problem. I'd have to test it out with our code base.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be the bigger change for users, since it requires them to use shared_pointers to manage the life time of their ACE_Handlers, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the public API is pretty impossible given the amount of users there. Your current solution doesn't seem to be complete, so not something to merge.

A long time ago to ACE_Event_Handler reference counting was added in such a way that the user had to enable it, maybe something for ACE_Handler? It has been some time ago that I worked on the proactor and I haven't dived into your issue in detail, it was just some high level comments, so not sure whether that is the correct path forward. Any possibility you could solve this race condition in application code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding reference counting similar to ACE_Event_Handler sounds like a much better idea than mine.

int acquire () { return mutex_.acquire (); }
int release () { return mutex_.release (); }
private:
ACE_Handler *handler_;
ACE_SYNCH_MUTEX mutex_;
};
typedef ACE_Refcounted_Auto_Ptr<Proxy, ACE_SYNCH_MUTEX> Proxy_Ptr;

Expand Down
24 changes: 12 additions & 12 deletions ACE/ace/Proactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,16 @@ ACE_Proactor_Handle_Timeout_Upcall::ACE_Proactor_Handle_Timeout_Upcall ()

int
ACE_Proactor_Handle_Timeout_Upcall::registration (ACE_Proactor_Timer_Queue &,
ACE_Handler * handler,
const ACE_Handler::Proxy_Ptr& proxy,
const void *)
{
handler->proactor(proactor_);
proxy.get()->handler()->proactor(proactor_);
return 0;
}

int
ACE_Proactor_Handle_Timeout_Upcall::preinvoke (ACE_Proactor_Timer_Queue &,
ACE_Handler *,
ACE_Handler::Proxy_Ptr&,
const void *,
int,
const ACE_Time_Value &,
Expand All @@ -202,7 +202,7 @@ ACE_Proactor_Handle_Timeout_Upcall::preinvoke (ACE_Proactor_Timer_Queue &,

int
ACE_Proactor_Handle_Timeout_Upcall::postinvoke (ACE_Proactor_Timer_Queue &,
ACE_Handler *,
ACE_Handler::Proxy_Ptr&,
const void *,
int,
const ACE_Time_Value &,
Expand All @@ -213,7 +213,7 @@ ACE_Proactor_Handle_Timeout_Upcall::postinvoke (ACE_Proactor_Timer_Queue &,

int
ACE_Proactor_Handle_Timeout_Upcall::timeout (ACE_Proactor_Timer_Queue &,
ACE_Handler *handler,
ACE_Handler::Proxy_Ptr& proxy,
const void *act,
int,
const ACE_Time_Value &time)
Expand All @@ -226,7 +226,7 @@ ACE_Proactor_Handle_Timeout_Upcall::timeout (ACE_Proactor_Timer_Queue &,

// Create the Asynch_Timer.
ACE_Asynch_Result_Impl *asynch_timer =
this->proactor_->create_asynch_timer (handler->proxy (),
this->proactor_->create_asynch_timer (proxy,
act,
time,
ACE_INVALID_HANDLE,
Expand Down Expand Up @@ -259,7 +259,7 @@ ACE_Proactor_Handle_Timeout_Upcall::timeout (ACE_Proactor_Timer_Queue &,

int
ACE_Proactor_Handle_Timeout_Upcall::cancel_type (ACE_Proactor_Timer_Queue &,
ACE_Handler *,
const ACE_Handler::Proxy_Ptr&,
int,
int &)
{
Expand All @@ -269,7 +269,7 @@ ACE_Proactor_Handle_Timeout_Upcall::cancel_type (ACE_Proactor_Timer_Queue &,

int
ACE_Proactor_Handle_Timeout_Upcall::cancel_timer (ACE_Proactor_Timer_Queue &,
ACE_Handler *,
const ACE_Handler::Proxy_Ptr&,
int,
int)
{
Expand All @@ -279,7 +279,7 @@ ACE_Proactor_Handle_Timeout_Upcall::cancel_timer (ACE_Proactor_Timer_Queue &,

int
ACE_Proactor_Handle_Timeout_Upcall::deletion (ACE_Proactor_Timer_Queue &,
ACE_Handler *,
ACE_Handler::Proxy_Ptr&,
const void *)
{
// Do nothing
Expand Down Expand Up @@ -681,7 +681,7 @@ ACE_Proactor::schedule_timer (ACE_Handler &handler,
// absolute time.
ACE_Time_Value absolute_time =
this->timer_queue_->gettimeofday () + time;
long result = this->timer_queue_->schedule (&handler,
long result = this->timer_queue_->schedule (handler.proxy(),
act,
absolute_time,
interval);
Expand Down Expand Up @@ -713,7 +713,7 @@ ACE_Proactor::cancel_timer (ACE_Handler &handler,
{
// No need to signal timer event here. Even if the cancel timer was
// the earliest, we will have an extra wakeup.
return this->timer_queue_->cancel (&handler,
return this->timer_queue_->cancel (handler.proxy(),
dont_call_handle_close);
}

Expand Down Expand Up @@ -787,7 +787,7 @@ ACE_Proactor::timer_queue (ACE_Proactor_Timer_Queue *tq)
}

// Set the proactor in the timer queue's functor
using TQ_Base = ACE_Timer_Queue_Upcall_Base<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall>;
using TQ_Base = ACE_Timer_Queue_Upcall_Base<ACE_Handler::Proxy_Ptr, ACE_Proactor_Handle_Timeout_Upcall>;

TQ_Base * tqb = dynamic_cast<TQ_Base*> (this->timer_queue_);

Expand Down
30 changes: 15 additions & 15 deletions ACE/ace/Proactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ACE_Proactor_Impl;
class ACE_Proactor_Timer_Handler;

/// Type def for the timer queue.
typedef ACE_Abstract_Timer_Queue<ACE_Handler *> ACE_Proactor_Timer_Queue;
typedef ACE_Abstract_Timer_Queue<ACE_Handler::Proxy_Ptr> ACE_Proactor_Timer_Queue;

/**
* @class ACE_Proactor_Handle_Timeout_Upcall
Expand All @@ -63,48 +63,48 @@ class ACE_Export ACE_Proactor_Handle_Timeout_Upcall

/// This method is called when a timer is registered.
int registration (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
const ACE_Handler::Proxy_Ptr& proxy,
const void *arg);

/// This method is called before the timer expires.
int preinvoke (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
ACE_Handler::Proxy_Ptr& proxy,
const void *arg,
int recurring_timer,
const ACE_Time_Value &cur_time,
const void *&upcall_act);

/// This method is called when the timer expires.
int timeout (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
ACE_Handler::Proxy_Ptr& proxy,
const void *arg,
int recurring_timer,
const ACE_Time_Value &cur_time);

/// This method is called after the timer expires.
int postinvoke (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
ACE_Handler::Proxy_Ptr& proxy,
const void *arg,
int recurring_timer,
const ACE_Time_Value &cur_time,
const void *upcall_act);

/// This method is called when a handler is canceled.
int cancel_type (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
const ACE_Handler::Proxy_Ptr& proxy,
int dont_call_handle_close,
int &requires_reference_counting);

/// This method is called when a timer is canceled.
int cancel_timer (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
const ACE_Handler::Proxy_Ptr& proxy,
int dont_call_handle_close,
int requires_reference_counting);

/// This method is called when the timer queue is destroyed and the
/// timer is still contained in it.
int deletion (ACE_Proactor_Timer_Queue &timer_queue,
ACE_Handler *handler,
ACE_Handler::Proxy_Ptr& proxy,
const void *arg);

protected:
Expand All @@ -129,29 +129,29 @@ class ACE_Export ACE_Proactor
{
// = Here are the private typedefs that the ACE_Proactor uses.

typedef ACE_Timer_Queue_Iterator_T<ACE_Handler *>
typedef ACE_Timer_Queue_Iterator_T<ACE_Handler::Proxy_Ptr>
TIMER_QUEUE_ITERATOR;
typedef ACE_Timer_List_T<ACE_Handler *,
typedef ACE_Timer_List_T<ACE_Handler::Proxy_Ptr,
ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX>
TIMER_LIST;
typedef ACE_Timer_List_Iterator_T<ACE_Handler *,
typedef ACE_Timer_List_Iterator_T<ACE_Handler::Proxy_Ptr,
ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX>
TIMER_LIST_ITERATOR;
typedef ACE_Timer_Heap_T<ACE_Handler *,
typedef ACE_Timer_Heap_T<ACE_Handler::Proxy_Ptr,
ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX>
TIMER_HEAP;
typedef ACE_Timer_Heap_Iterator_T<ACE_Handler *,
typedef ACE_Timer_Heap_Iterator_T<ACE_Handler::Proxy_Ptr,
ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX>
TIMER_HEAP_ITERATOR;
typedef ACE_Timer_Wheel_T<ACE_Handler *,
typedef ACE_Timer_Wheel_T<ACE_Handler::Proxy_Ptr,
ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX>
TIMER_WHEEL;
typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler *,
typedef ACE_Timer_Wheel_Iterator_T<ACE_Handler::Proxy_Ptr,
ACE_Proactor_Handle_Timeout_Upcall,
ACE_SYNCH_RECURSIVE_MUTEX>
TIMER_WHEEL_ITERATOR;
Expand Down
2 changes: 1 addition & 1 deletion ACE/tests/Proactor_Timer_Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ run_main (int argc, ACE_TCHAR *[])
// to do it right in at least one test. Notice the lack of
// ACE_NEW_RETURN, that monstrosity has no business in proper C++
// code ...
using Timer_Queue = ACE_Timer_Heap_T<ACE_Handler *, ACE_Proactor_Handle_Timeout_Upcall, ACE_MT_SYNCH::RECURSIVE_MUTEX, ACE_FPointer_Time_Policy>;
using Timer_Queue = ACE_Timer_Heap_T<ACE_Handler::Proxy_Ptr, ACE_Proactor_Handle_Timeout_Upcall, ACE_MT_SYNCH::RECURSIVE_MUTEX, ACE_FPointer_Time_Policy>;

std::unique_ptr<Timer_Queue> tq(new Timer_Queue);
// ... notice how the policy is in the derived timer queue type.
Expand Down
Loading