Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <boost/optional.hpp>
#include <functional>
#include <list>
#include <memory>
#include <utility>

Expand Down
94 changes: 45 additions & 49 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
#define LIB_FUTURE_H_

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <forward_list>
#include <functional>
#include <future>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>

namespace pulsar {

Expand All @@ -38,71 +35,70 @@ class InternalState {
using Pair = std::pair<Result, Type>;
using Lock = std::unique_lock<std::mutex>;

enum Status : uint8_t
{
INITIAL,
COMPLETING,
COMPLETED
};

// NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
InternalState() {}

void addListener(Listener listener) {
Lock lock{mutex_};
listeners_.emplace_back(listener);
lock.unlock();

if (completed()) {
Type value;
Result result = get(value);
triggerListeners(result, value);
auto result = result_;
auto value = value_;
lock.unlock();
listener(result, value);
} else {
tailListener_ = listeners_.emplace_after(tailListener_, std::move(listener));
}
}

bool complete(Result result, const Type &value) {
bool expected = false;
if (!completed_.compare_exchange_strong(expected, true)) {
Status expected = Status::INITIAL;
if (!status_.compare_exchange_strong(expected, Status::COMPLETING)) {
return false;
}
triggerListeners(result, value);
promise_.set_value(std::make_pair(result, value));
return true;
}

bool completed() const noexcept { return completed_; }

Result get(Type &result) {
const auto &pair = future_.get();
result = pair.second;
return pair.first;
}
// Ensure if another thread calls `addListener` at the same time, that thread can get the value by
// `get` before the existing listeners are executed
Lock lock{mutex_};
result_ = result;
value_ = value;
status_ = COMPLETED;
cond_.notify_all();

// Only public for test
void triggerListeners(Result result, const Type &value) {
while (true) {
Lock lock{mutex_};
if (listeners_.empty()) {
return;
if (!listeners_.empty()) {
auto listeners = std::move(listeners_);
lock.unlock();
for (auto &&listener : listeners) {
listener(result, value);
}
}

bool expected = false;
if (!listenerRunning_.compare_exchange_strong(expected, true)) {
// There is another thread that polled a listener that is running, skip polling and release
// the lock. Here we wait for some time to avoid busy waiting.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
auto listener = std::move(listeners_.front());
listeners_.pop_front();
lock.unlock();
return true;
}

listener(result, value);
listenerRunning_ = false;
}
bool completed() const noexcept { return status_.load() == COMPLETED; }

Result get(Type &value) const {
Lock lock{mutex_};
cond_.wait(lock, [this] { return completed(); });
value = value_;
return result_;
}

private:
std::atomic_bool completed_{false};
std::promise<Pair> promise_;
std::shared_future<Pair> future_{promise_.get_future()};

std::list<Listener> listeners_;
mutable std::mutex mutex_;
std::atomic_bool listenerRunning_{false};
mutable std::condition_variable cond_;
std::forward_list<Listener> listeners_;
decltype(listeners_.before_begin()) tailListener_{listeners_.before_begin()};
Result result_;
Type value_;
std::atomic<Status> status_{INITIAL};
};

template <typename Result, typename Type>
Expand Down
1 change: 1 addition & 0 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define LIB_PRODUCERIMPL_H_

#include <boost/optional.hpp>
#include <list>
#include <memory>

#include "Future.h"
Expand Down
53 changes: 34 additions & 19 deletions tests/PromiseTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "WaitUtils.h"
#include "lib/Future.h"
#include "lib/LogUtils.h"

Expand Down Expand Up @@ -88,26 +91,38 @@ TEST(PromiseTest, testListeners) {
ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
}

TEST(PromiseTest, testTriggerListeners) {
InternalState<int, int> state;
state.addListener([](int, const int&) {
LOG_INFO("Start task 1...");
std::this_thread::sleep_for(std::chrono::seconds(1));
LOG_INFO("Finish task 1...");
TEST(PromiseTest, testListenerDeadlock) {
Promise<int, int> promise;
auto future = promise.getFuture();
auto mutex = std::make_shared<std::mutex>();
auto done = std::make_shared<std::atomic_bool>(false);

future.addListener([mutex, done](int, int) {
LOG_INFO("Listener-1 before acquiring the lock");
std::lock_guard<std::mutex> lock{*mutex};
LOG_INFO("Listener-1 after acquiring the lock");
done->store(true);
});
state.addListener([](int, const int&) {
LOG_INFO("Start task 2...");

std::thread t1{[mutex, &future] {
std::lock_guard<std::mutex> lock{*mutex};
// Make it a great chance that `t2` executes `promise.setValue` first
std::this_thread::sleep_for(std::chrono::seconds(2));

// Since the future is completed, `Future::get` will be called in `addListener` to get the result
LOG_INFO("Before adding Listener-2 (acquired the mutex)")
future.addListener([](int, int) { LOG_INFO("Listener-2 is triggered"); });
LOG_INFO("After adding Listener-2 (releasing the mutex)");
}};
t1.detach();
std::thread t2{[mutex, promise] {
// Make there a great chance that `t1` acquires `mutex` first
std::this_thread::sleep_for(std::chrono::seconds(1));
LOG_INFO("Finish task 2...");
});
LOG_INFO("Before setting value");
promise.setValue(0); // the 1st listener is called, which is blocked at acquiring `mutex`
LOG_INFO("After setting value");
}};
t2.detach();

auto start = std::chrono::high_resolution_clock::now();
auto future1 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
auto future2 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
future1.wait();
future2.wait();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start)
.count();
ASSERT_TRUE(elapsed > 2000) << "elapsed: " << elapsed << "ms";
ASSERT_TRUE(waitUntil(std::chrono::seconds(5000), [done] { return done->load(); }));
}
5 changes: 3 additions & 2 deletions tests/WaitUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@
namespace pulsar {

template <typename Rep, typename Period>
inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
inline bool waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
long durationMs = 10) {
auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
while (timeoutMs > 0) {
auto now = std::chrono::high_resolution_clock::now();
if (condition()) {
break;
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - now)
.count();
timeoutMs -= elapsed;
}
return false;
}

} // namespace pulsar