Skip to content

Conversation

BewareMyPower
Copy link
Contributor

Motivation

There is a case that deadlock could happen for a Future. Assume there is a Promise and its Future.

  1. Call Future::addListener to add a listener that tries to acquire a user-provided mutex (lock).
  2. Thread 1: Acquire lock first.
  3. Thread 2: Call Promise::setValue, the listener will be triggered first before completed. Since lock is held by Thread 1, the listener will be blocked.
  4. Thread 1: Call Future::addListener, since it detects the InternalState::completed_ is true, it will call get to retrieve the result and value.

Then, deadlock happens:

  • Thread 1 waits for lock is released, and then complete InternalState::future_.
  • Thread 2 holds lock but wait for InternalState::future_ is completed.

In a real world case, if we acquire a lock before
ProducerImpl::closeAsync, then another thread call setValue in ClientConnection::handleSuccess and the callback of createProducerAsync tries to acquire the lock, handleSuccess will be blocked. Then in closeAsync, the current thread will be blocked in:

    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });

The stacks:

Thread 1:
#11 0x00007fab80da2173 in pulsar::InternalState<...>::complete (this=0x3d53e7a10, result=..., value=...) at lib/Futre.h:61
#13 pulsar::ClientConnection::handleSuccess (this=this@entry=0x2214bc000, success=...) at lib/ClientConnection.cc:1552

Thread 2:
#8  get (result=..., this=0x3d53e7a10) at lib/Future.h:69
#9  pulsar::InternalState<...>::addListener (this=this@entry=0x3d53e7a10, listener=...) at lib/Future.h:51
#11 0x00007fab80e8dc4e in pulsar::ProducerImpl::closeAsync at lib/ProducerImpl.cc:794

There are two points that make the deadlock:

  1. We use completed_ to represent if the future is completed. However, after it's true, the future might not be completed because the value is not set and the listeners are not completed.
  2. If addListener is called after it's completed, we still push the listener to listeners_ so that previous listeners could be executed before the new listener. This guarantee is unnecessarily strong.

Modifications

First, complete the future before calling the listeners.

Then, use an enum to represent the status:

  • INITIAL: complete has not been called
  • COMPLETING: when the 1st time complete is called, the status will change from INITIAL to COMPLETING
  • COMPLETED: the future is completed.

Besides, implementation of Future is simplified. #299 fixes a possible mutex crash by introducing the std::future. However, the root cause is the conditional variable is not used correctly:

Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread.

See https://en.cppreference.com/w/cpp/thread/condition_variable

The simplest way to fix
#298 is just adding lock.lock() before state->condition.notify_all();.

### Motivation

There is a case that deadlock could happen for a `Future`. Assume there
is a `Promise` and its `Future`.

1. Call `Future::addListener` to add a listener that tries to acquire a
   user-provided mutex (`lock`).
2. Thread 1: Acquire `lock` first.
3. Thread 2: Call `Promise::setValue`, the listener will be triggered
   first before completed. Since `lock` is held by Thread 1, the
   listener will be blocked.
4. Thread 1: Call `Future::addListener`, since it detects the
   `InternalState::completed_` is true, it will call `get` to retrieve
   the result and value.

Then, deadlock happens:
- Thread 1 waits for `lock` is released, and then complete
  `InternalState::future_`.
- Thread 2 holds `lock` but wait for `InternalState::future_` is
  completed.

In a real world case, if we acquire a lock before
`ProducerImpl::closeAsync`, then another thread call `setValue` in
`ClientConnection::handleSuccess` and the callback of
`createProducerAsync` tries to acquire the lock, `handleSuccess` will be
blocked. Then in `closeAsync`, the current thread will be blocked in:

```c++
    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
```

The stacks:

```
Thread 1:
#11 0x00007fab80da2173 in pulsar::InternalState<...>::complete (this=0x3d53e7a10, result=..., value=...) at lib/Futre.h:61
#13 pulsar::ClientConnection::handleSuccess (this=this@entry=0x2214bc000, success=...) at lib/ClientConnection.cc:1552

Thread 2:
#8  get (result=..., this=0x3d53e7a10) at lib/Future.h:69
#9  pulsar::InternalState<...>::addListener (this=this@entry=0x3d53e7a10, listener=...) at lib/Future.h:51
#11 0x00007fab80e8dc4e in pulsar::ProducerImpl::closeAsync at lib/ProducerImpl.cc:794
```

There are two points that make the deadlock:
1. We use `completed_` to represent if the future is completed. However,
   after it's true, the future might not be completed because the value
   is not set and the listeners are not completed.
2. If `addListener` is called after it's completed, we still push the
   listener to `listeners_` so that previous listeners could be executed
   before the new listener. This guarantee is unnecessarily strong.

### Modifications

First, complete the future before calling the listeners.

Then, use an enum to represent the status:
- INITIAL: `complete` has not been called
- COMPLETING: when the 1st time `complete` is called, the status will
  change from INITIAL to COMPLETING
- COMPLETED: the future is completed.

Besides, implementation of `Future` is simplified.
apache#299 fixes a possible
mutex crash by introducing the `std::future`. However, the root cause is
the conditional variable is not used correctly:

> Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread.

See https://en.cppreference.com/w/cpp/thread/condition_variable

The simplest way to fix
apache#298 is just adding
`lock.lock()` before `state->condition.notify_all();`.
@BewareMyPower BewareMyPower added this to the 3.4.0 milestone Oct 26, 2023
@BewareMyPower BewareMyPower self-assigned this Oct 26, 2023
@BewareMyPower BewareMyPower marked this pull request as draft October 26, 2023 06:13
@BewareMyPower
Copy link
Contributor Author

The Oauth2test seems stuck at connecting:

2023-10-26 07:13:58.425 DEBUG [140675173545536] AuthOauth2:378 | access_token: *** expires_in: 10
2023-10-26 07:13:58.447 DEBUG [140675173545536] ClientConnection:882 | [[::1]:36014 -> [::1]:6650] Handling incoming command: CONNECTED
2023-10-26 07:13:58.447 DEBUG [140675173545536] ClientConnection:279 | Connection has max message size setting: 5[242](https://github.com/apache/pulsar-client-cpp/actions/runs/6650646879/job/18071147259?pr=334#step:8:243)880
2023-10-26 07:13:58.447 DEBUG [140675173545536] ClientConnection:281 | Current max message size is: 5242880
2023-10-26 07:14:28.118 DEBUG [140675173545536] ClientConnection:882 | [[::1]:36014 -> [::1]:6650] Handling incoming command: PING
2023-10-26 07:14:28.118 DEBUG [140675173545536] ClientConnection:955 | [[::1]:36014 -> [::1]:6650] Replying to ping command
2023-10-26 07:14:28.447 DEBUG [140675173545536] ClientConnection:1232 | [[::1]:36014 -> [::1]:6650] Sending ping message
2023-10-26 07:14:28.450 DEBUG [140675173545536] ClientConnection:882 | [[::1]:36014 -> [::1]:6650] Handling incoming command: PONG

However, I cannot reproduce it locally. I'm marking it as drafted and going to do some investigation

BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this pull request Oct 26, 2023
@BewareMyPower BewareMyPower force-pushed the bewaremypower/future-deadlock branch from 5443c29 to d1b6b7b Compare October 26, 2023 08:27
@BewareMyPower BewareMyPower force-pushed the bewaremypower/future-deadlock branch from d1b6b7b to a2b37c3 Compare October 26, 2023 09:46
@BewareMyPower BewareMyPower marked this pull request as ready for review October 26, 2023 10:51
@merlimat merlimat merged commit 77e2d63 into apache:main Oct 30, 2023
@BewareMyPower BewareMyPower deleted the bewaremypower/future-deadlock branch October 31, 2023 01:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants