diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..cb40b50 --- /dev/null +++ b/.clang-format @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +BasedOnStyle: Google +IndentWidth: 4 +ColumnLimit: 110 +SortIncludes: false +BreakBeforeBraces: Custom +BraceWrapping: + AfterEnum: true diff --git a/.gitignore b/.gitignore index 5cb909f..1a4bc13 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,10 @@ MANIFEST build dist *.egg-info +.idea +CMakeCache.txt +CMakeFiles +Makefile +_pulsar.so +cmake_install.cmake +__pycache__ diff --git a/CMakeLists.txt b/CMakeLists.txt index 63cf163..6c994b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,81 @@ # under the License. # -INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" "${PYTHON_INCLUDE_DIRS}") +project (pulsar-client-python) +cmake_minimum_required(VERSION 3.12) +set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake_modules") + +MESSAGE(STATUS "CMAKE_BUILD_TYPE: " ${CMAKE_BUILD_TYPE}) +set(THREADS_PREFER_PTHREAD_FLAG TRUE) +find_package(Threads REQUIRED) +MESSAGE(STATUS "Threads library: " ${CMAKE_THREAD_LIBS_INIT}) + + +find_library(PULSAR_LIBRARY NAMES libpulsar.a) +message(STATUS "PULSAR_LIBRARY: ${PULSAR_LIBRARY}") + +find_path(PULSAR_INCLUDE pulsar/Client.h) +message(STATUS "PULSAR_INCLUDE: ${PULSAR_INCLUDE}") + +SET(Boost_NO_BOOST_CMAKE ON) +SET(Boost_USE_STATIC_LIBS ON) + +SET(CMAKE_CXX_STANDARD 11) + +find_package(Boost) + +find_package (Python3 COMPONENTS Development) +MESSAGE(STATUS "PYTHON: " ${Python3_VERSION} " - " ${Python3_INCLUDE_DIRS}) + +string(REPLACE "." ";" PYTHONLIBS_VERSION_NO_LIST ${Python3_VERSION}) + +set(BOOST_PYTHON_NAME_POSTFIX ${Python3_VERSION_MAJOR}${Python3_VERSION_MINOR}) +# For python3 the lib name is boost_python3 +set(BOOST_PYTHON_NAME_LIST python${BOOST_PYTHON_NAME_POSTFIX};python37;python38;python39;python310;python3;python3-mt;python-py${BOOST_PYTHON_NAME_POSTFIX};python${BOOST_PYTHON_NAME_POSTFIX}-mt) + +foreach (BOOST_PYTHON_NAME IN LISTS BOOST_PYTHON_NAME_LIST) + find_package(Boost QUIET COMPONENTS ${BOOST_PYTHON_NAME}) + if (${Boost_FOUND}) + set(BOOST_PYTHON_NAME_FOUND ${BOOST_PYTHON_NAME}) + break() + endif() +endforeach() + +if (NOT ${Boost_FOUND}) + MESSAGE(FATAL_ERROR "Could not find Boost Python library") +endif () + +MESSAGE(STATUS "BOOST_PYTHON_NAME_FOUND: " ${BOOST_PYTHON_NAME_FOUND}) + +set(OPENSSL_ROOT_DIR ${OPENSSL_ROOT_DIR} /usr/lib64/) + +### This part is to find and keep SSL dynamic libs in RECORD_OPENSSL_SSL_LIBRARY and RECORD_OPENSSL_CRYPTO_LIBRARY +### After find the libs, will unset related cache, and will not affect another same call to find_package. +if (APPLE) + set(OPENSSL_INCLUDE_DIR /usr/local/opt/openssl/include/ /opt/homebrew/opt/openssl/include) + set(OPENSSL_ROOT_DIR ${OPENSSL_ROOT_DIR} /usr/local/opt/openssl/ /opt/homebrew/opt/openssl) +endif () + +set(OPENSSL_USE_STATIC_LIBS TRUE) +find_package(OpenSSL REQUIRED) + +find_library(ZLIB_LIBRARIES REQUIRED NAMES libz.a z zlib) +message(STATUS "ZLIB_LIBRARIES: ${ZLIB_LIBRARIES}") + +find_library(CURL_LIBRARIES NAMES libcurl.a curl curl_a libcurl_a) +message(STATUS "CURL_LIBRARIES: ${CURL_LIBRARIES}") +find_library(Protobuf_LIBRARIES NAMES libprotobuf.a libprotobuf) +message(STATUS "Protobuf: ${Protobuf_LIBRARIES}") +find_library(CURL_LIBRARIES NAMES libcurl.a curl curl_a libcurl_a) +message(STATUS "CURL_LIBRARIES: ${CURL_LIBRARIES}") +find_library(LIB_ZSTD NAMES libzstd.a) +message(STATUS "ZStd: ${LIB_ZSTD}") +find_library(LIB_SNAPPY NAMES libsnappy.a) +message(STATUS "LIB_SNAPPY: ${LIB_SNAPPY}") + +######################################################################################################################## + +INCLUDE_DIRECTORIES(${PULSAR_INCLUDE} "${Boost_INCLUDE_DIRS}" "${Python3_INCLUDE_DIRS}") ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc @@ -63,7 +137,14 @@ if (NOT DEFINED ${Boost_PYTHON310-MT_LIBRARY}) endif() # Try all possible boost-python variable namings -set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY} +set(PYTHON_WRAPPER_LIBS ${PULSAR_LIBRARY} + ${OPENSSL_LIBRARIES} + ${ZLIB_LIBRARIES} + ${CURL_LIBRARIES} + ${Protobuf_LIBRARIES} + ${LIB_ZSTD} + ${LIB_SNAPPY} + ${Boost_PYTHON_LIBRARY} ${Boost_PYTHON3_LIBRARY} ${Boost_PYTHON37-MT_LIBRARY} ${Boost_PYTHON38_LIBRARY} @@ -86,18 +167,33 @@ if (APPLE) endif () endif() -message(STATUS "Using Boost Python libs: ${PYTHON_WRAPPER_LIBS}") - if (NOT PYTHON_WRAPPER_LIBS) MESSAGE(FATAL_ERROR "Could not find Boost Python library") endif () +message(STATUS "All libraries: ${PYTHON_WRAPPER_LIBS}") + if (APPLE) set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS "${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -undefined dynamic_lookup") - target_link_libraries(_pulsar -Wl,-all_load pulsarStatic ${PYTHON_WRAPPER_LIBS} ${COMMON_LIBS} ${ICU_LIBS}) + target_link_libraries(_pulsar -Wl,-all_load ${PYTHON_WRAPPER_LIBS}) else () if (NOT MSVC) set (CMAKE_SHARED_LINKER_FLAGS " -static-libgcc -static-libstdc++") endif() - target_link_libraries(_pulsar pulsarStatic ${PYTHON_WRAPPER_LIBS} ${COMMON_LIBS}) + target_link_libraries(_pulsar ${PYTHON_WRAPPER_LIBS}) endif () + +find_package(ClangTools) +set(BUILD_SUPPORT_DIR "${CMAKE_SOURCE_DIR}/build-support") +add_custom_target(format ${BUILD_SUPPORT_DIR}/run_clang_format.py + ${CLANG_FORMAT_BIN} + 0 + ${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt + ${CMAKE_SOURCE_DIR}/src) + +# `make check-format` option (for CI test) +add_custom_target(check-format ${BUILD_SUPPORT_DIR}/run_clang_format.py + ${CLANG_FORMAT_BIN} + 1 + ${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt + ${CMAKE_SOURCE_DIR}/src) diff --git a/build-mac-wheels.sh b/build-mac-wheels.sh index 6a4dae7..38072f6 100755 --- a/build-mac-wheels.sh +++ b/build-mac-wheels.sh @@ -39,8 +39,9 @@ SNAPPY_VERSION=1.1.3 CURL_VERSION=7.61.0 ROOT_DIR=$(git rev-parse --show-toplevel) -cd "${ROOT_DIR}/pulsar-client-cpp" +cd "${ROOT_DIR}" +PULSAR_VERSION=$(cat version.txt | grep pulsar-client-cpp | awk '{print $2}') # Compile and cache dependencies CACHE_DIR=~/.pulsar-mac-wheels-cache @@ -246,6 +247,43 @@ else echo "Using cached LibCurl" fi +############################################################################### +if [ ! -f apache-pulsar-${PULSAR_VERSION}-src/.done ]; then + echo "Building Pulsar C++ client - ${PULSAR_VERSION}" + curl -O -L https://archive.apache.org/dist/pulsar/pulsar-${PULSAR_VERSION}/apache-pulsar-${PULSAR_VERSION}-src.tar.gz + rm -rf apache-pulsar-${PULSAR_VERSION}-src/pulsar-client-cpp + tar xfz apache-pulsar-${PULSAR_VERSION}-src.tar.gz + pushd apache-pulsar-${PULSAR_VERSION}-src + pushd pulsar-client-cpp + ARCHS='arm64;x86_64' + + chmod +x build-support/merge_archives.sh + set -x + cmake . \ + -DCMAKE_OSX_ARCHITECTURES=${ARCHS} \ + -DCMAKE_OSX_DEPLOYMENT_TARGET=${MACOSX_DEPLOYMENT_TARGET} \ + -DCMAKE_INSTALL_PREFIX=$PREFIX \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_PREFIX_PATH=$PREFIX \ + -DCMAKE_CXX_FLAGS=-I$PREFIX/include \ + -DBoost_INCLUDE_DIR=$CACHE_DIR/boost-py-$PYTHON_VERSION/include \ + -DBoost_LIBRARY_DIR=$CACHE_DIR/boost-py-$PYTHON_VERSION/lib \ + -DLINK_STATIC=OFF \ + -DBUILD_TESTS=OFF \ + -DBUILD_PYTHON_WRAPPER=OFF \ + -DBUILD_WIRESHARK=OFF \ + -DBUILD_DYNAMIC_LIB=OFF \ + -DBUILD_STATIC_LIB=ON \ + -DPROTOC_PATH=$PREFIX/bin/protoc + + make -j16 install + popd + touch .done + popd +else + echo "Using cached Pulsar C++ client" +fi + ############################################################################### ############################################################################### ############################################################################### @@ -260,7 +298,7 @@ for line in "${PYTHON_VERSIONS[@]}"; do echo '----------------------------------------------------------------------------' echo "Build wheel for Python $PYTHON_VERSION" - cd "${ROOT_DIR}/pulsar-client-cpp" + cd "${ROOT_DIR}" find . -name CMakeCache.txt | xargs -r rm find . -name CMakeFiles | xargs -r rm -rf @@ -285,16 +323,12 @@ for line in "${PYTHON_VERSIONS[@]}"; do -DCMAKE_CXX_FLAGS=-I$PREFIX/include \ -DBoost_INCLUDE_DIR=$CACHE_DIR/boost-py-$PYTHON_VERSION/include \ -DBoost_LIBRARY_DIR=$CACHE_DIR/boost-py-$PYTHON_VERSION/lib \ - -DPYTHON_INCLUDE_DIR=$PY_INCLUDE_DIR \ - -DPYTHON_LIBRARY=$PY_PREFIX/lib/libpython${PYTHON_VERSION}.dylib \ - -DLINK_STATIC=ON \ - -DBUILD_TESTS=OFF \ - -DBUILD_WIRESHARK=OFF \ - -DPROTOC_PATH=$PREFIX/bin/protoc + -DPython3_INCLUDE_DIR=$PY_INCLUDE_DIR \ + -DPython3_LIBRARY=$PY_PREFIX/lib/libpython${PYTHON_VERSION}.dylib \ + -DPULSAR_INCLUDE=${PREFIX}/include make clean - make _pulsar -j16 + make -j16 - cd python $PY_EXE setup.py bdist_wheel done diff --git a/build-support/clang_format_exclusions.txt b/build-support/clang_format_exclusions.txt new file mode 100644 index 0000000..fe95886 --- /dev/null +++ b/build-support/clang_format_exclusions.txt @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/build-support/run_clang_format.py b/build-support/run_clang_format.py new file mode 100755 index 0000000..3c99494 --- /dev/null +++ b/build-support/run_clang_format.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Original: https://github.com/apache/arrow/blob/4dbce607d50031a405af39d36e08cd03c5ffc764/cpp/build-support/run_clang_format.py +# ChangeLog: +# 2018-01-08: Accept multiple source directories (@Licht-T) + +import fnmatch +import os +import subprocess +import sys + +if len(sys.argv) < 5: + sys.stderr.write("Usage: %s $CLANG_FORMAT $CHECK_FORMAT exclude_globs.txt " + "$source_dir1 $source_dir2\n" % + sys.argv[0]) + sys.exit(1) + +CLANG_FORMAT = sys.argv[1] +CHECK_FORMAT = int(sys.argv[2]) == 1 +EXCLUDE_GLOBS_FILENAME = sys.argv[3] +SOURCE_DIRS = sys.argv[4:] + +exclude_globs = [line.strip() for line in open(EXCLUDE_GLOBS_FILENAME, "r")] + +files_to_format = [] +matches = [] +for source_dir in SOURCE_DIRS: + for directory, subdirs, files in os.walk(source_dir): + for name in files: + name = os.path.join(directory, name) + if not (name.endswith('.h') or name.endswith('.cc')): + continue + + excluded = False + for g in exclude_globs: + if fnmatch.fnmatch(name, g): + excluded = True + break + if not excluded: + files_to_format.append(name) + +if CHECK_FORMAT: + output = subprocess.check_output([CLANG_FORMAT, '-output-replacements-xml'] + + files_to_format, + stderr=subprocess.STDOUT).decode('utf8') + + to_fix = [] + for line in output.split('\n'): + if 'offset' in line: + to_fix.append(line) + + if len(to_fix) > 0: + print("clang-format checks failed, run 'make format' to fix") + sys.exit(-1) +else: + try: + cmd = [CLANG_FORMAT, '-i'] + files_to_format + subprocess.check_output(cmd, stderr=subprocess.STDOUT) + except Exception as e: + print(e) + print(' '.join(cmd)) + raise \ No newline at end of file diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake new file mode 100644 index 0000000..4b8fc18 --- /dev/null +++ b/cmake_modules/FindClangTools.cmake @@ -0,0 +1,100 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Tries to find the clang-tidy and clang-format modules +# +# Usage of this module as follows: +# +# find_package(ClangTools) +# +# Variables used by this module, they can change the default behaviour and need +# to be set before calling find_package: +# +# ClangToolsBin_HOME - +# When set, this path is inspected instead of standard library binary locations +# to find clang-tidy and clang-format +# +# This module defines +# CLANG_TIDY_BIN, The path to the clang tidy binary +# CLANG_TIDY_FOUND, Whether clang tidy was found +# CLANG_FORMAT_BIN, The path to the clang format binary +# CLANG_TIDY_FOUND, Whether clang format was found + +list(APPEND CLANG_SEARCH_PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin /opt/homebrew/bin) +if (WIN32) + list(APPEND CLANG_SEARCH_PATHS "C:/Program Files/LLVM/bin" "C:/Program Files (x86)/LLVM/bin") +endif() + +find_program(CLANG_TIDY_BIN + NAMES clang-tidy-4.0 + clang-tidy-3.9 + clang-tidy-3.8 + clang-tidy-3.7 + clang-tidy-3.6 + clang-tidy + PATHS ${CLANG_SEARCH_PATHS} + NO_DEFAULT_PATH + ) + +if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" ) + set(CLANG_TIDY_FOUND 0) + message("clang-tidy not found") +else() + set(CLANG_TIDY_FOUND 1) + message("clang-tidy found at ${CLANG_TIDY_BIN}") +endif() + +if (CLANG_FORMAT_VERSION) + find_program(CLANG_FORMAT_BIN + NAMES clang-format-${CLANG_FORMAT_VERSION} + PATHS ${CLANG_SEARCH_PATHS} + NO_DEFAULT_PATH + ) + + # If not found yet, search alternative locations + if (("${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND") AND APPLE) + # Homebrew ships older LLVM versions in /usr/local/opt/llvm@version/ + STRING(REGEX REPLACE "^([0-9]+)\\.[0-9]+" "\\1" CLANG_FORMAT_MAJOR_VERSION "${CLANG_FORMAT_VERSION}") + STRING(REGEX REPLACE "^[0-9]+\\.([0-9]+)" "\\1" CLANG_FORMAT_MINOR_VERSION "${CLANG_FORMAT_VERSION}") + if ("${CLANG_FORMAT_MINOR_VERSION}" STREQUAL "0") + find_program(CLANG_FORMAT_BIN + NAMES clang-format + PATHS /usr/local/opt/llvm@${CLANG_FORMAT_MAJOR_VERSION}/bin + NO_DEFAULT_PATH + ) + else() + find_program(CLANG_FORMAT_BIN + NAMES clang-format + PATHS /usr/local/opt/llvm@${CLANG_FORMAT_VERSION}/bin + NO_DEFAULT_PATH + ) + endif() + endif() +else() + find_program(CLANG_FORMAT_BIN + NAMES clang-format-5 + clang-format-5.0 + clang-format + PATHS ${CLANG_SEARCH_PATHS} + NO_DEFAULT_PATH + ) +endif() + +if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" ) + set(CLANG_FORMAT_FOUND 0) + message("clang-format not found") +else() + set(CLANG_FORMAT_FOUND 1) + message("clang-format found at ${CLANG_FORMAT_BIN}") +endif() + diff --git a/setup.py b/setup.py index 684d809..1263df4 100644 --- a/setup.py +++ b/setup.py @@ -19,29 +19,19 @@ from setuptools import setup from distutils.core import Extension -from distutils.util import strtobool -from os import environ +from os import environ, path from distutils.command import build_ext -import xml.etree.ElementTree as ET -from os.path import dirname, realpath, join def get_version(): - use_full_pom_name = strtobool(environ.get('USE_FULL_POM_NAME', 'False')) - - # Get the pulsar version from pom.xml - TOP_LEVEL_PATH = dirname(dirname(dirname(realpath(__file__)))) - POM_PATH = join(TOP_LEVEL_PATH, 'pom.xml') - root = ET.XML(open(POM_PATH).read()) - version = root.find('{http://maven.apache.org/POM/4.0.0}version').text.strip() - - if use_full_pom_name: - return version - else: - # Strip the '-incubating' suffix, since it prevents the packages - # from being uploaded into PyPI - return version.split('-')[0] + # Get the pulsar version from version.txt + root = path.dirname(path.realpath(__file__)) + version_file = path.join(root, 'version.txt') + with open(version_file) as f: + for line in f.readlines(): + if 'pulsar-client-python: ' in line: + return line.split()[-1].strip() def get_name(): @@ -53,8 +43,8 @@ def get_name(): VERSION = get_version() NAME = get_name() -print(VERSION) -print(NAME) +print('NAME: %s' % NAME) +print('VERSION: %s' % VERSION) # This is a workaround to have setuptools to include diff --git a/src/config.cc b/src/config.cc index fed9c28..c312648 100644 --- a/src/config.cc +++ b/src/config.cc @@ -18,7 +18,6 @@ */ #include "utils.h" #include -#include "lib/Utils.h" #include template diff --git a/src/future.h b/src/future.h new file mode 100644 index 0000000..6754c89 --- /dev/null +++ b/src/future.h @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef LIB_FUTURE_H_ +#define LIB_FUTURE_H_ + +#include +#include +#include +#include + +#include + +typedef std::unique_lock Lock; + +namespace pulsar { + +template +struct InternalState { + std::mutex mutex; + std::condition_variable condition; + Result result; + Type value; + bool complete; + + std::list > listeners; +}; + +template +class Future { + public: + typedef std::function ListenerCallback; + + Future& addListener(ListenerCallback callback) { + InternalState* state = state_.get(); + Lock lock(state->mutex); + + if (state->complete) { + lock.unlock(); + callback(state->result, state->value); + } else { + state->listeners.push_back(callback); + } + + return *this; + } + + Result get(Type& result) { + InternalState* state = state_.get(); + Lock lock(state->mutex); + + if (!state->complete) { + // Wait for result + while (!state->complete) { + state->condition.wait(lock); + } + } + + result = state->value; + return state->result; + } + + template + bool get(Result& res, Type& value, Duration d) { + InternalState* state = state_.get(); + Lock lock(state->mutex); + + if (!state->complete) { + // Wait for result + while (!state->complete) { + if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) { + // Timeout while waiting for the future to complete + return false; + } + } + } + + value = state->value; + res = state->result; + return true; + } + + private: + typedef std::shared_ptr > InternalStatePtr; + Future(InternalStatePtr state) : state_(state) {} + + std::shared_ptr > state_; + + template + friend class Promise; +}; + +template +class Promise { + public: + Promise() : state_(std::make_shared >()) {} + + bool setValue(const Type& value) const { + static Result DEFAULT_RESULT; + InternalState* state = state_.get(); + Lock lock(state->mutex); + + if (state->complete) { + return false; + } + + state->value = value; + state->result = DEFAULT_RESULT; + state->complete = true; + + decltype(state->listeners) listeners; + listeners.swap(state->listeners); + + lock.unlock(); + + for (auto& callback : listeners) { + callback(DEFAULT_RESULT, value); + } + + state->condition.notify_all(); + return true; + } + + bool setFailed(Result result) const { + static Type DEFAULT_VALUE; + InternalState* state = state_.get(); + Lock lock(state->mutex); + + if (state->complete) { + return false; + } + + state->result = result; + state->complete = true; + + decltype(state->listeners) listeners; + listeners.swap(state->listeners); + + lock.unlock(); + + for (auto& callback : listeners) { + callback(result, DEFAULT_VALUE); + } + + state->condition.notify_all(); + return true; + } + + bool isComplete() const { + InternalState* state = state_.get(); + Lock lock(state->mutex); + return state->complete; + } + + Future getFuture() const { return Future(state_); } + + private: + typedef std::function ListenerCallback; + std::shared_ptr > state_; +}; + +class Void {}; + +} /* namespace pulsar */ + +#endif /* LIB_FUTURE_H_ */ diff --git a/src/utils.h b/src/utils.h index 4b69ff8..3cbf98a 100644 --- a/src/utils.h +++ b/src/utils.h @@ -23,7 +23,7 @@ #include #include -#include +#include "future.h" using namespace pulsar; @@ -40,6 +40,29 @@ inline void CHECK_RESULT(Result res) { } } +struct WaitForCallback { + Promise m_promise; + + WaitForCallback(Promise promise) : m_promise(promise) {} + + void operator()(Result result) { m_promise.setValue(result); } +}; + +template +struct WaitForCallbackValue { + Promise& m_promise; + + WaitForCallbackValue(Promise& promise) : m_promise(promise) {} + + void operator()(Result result, const T& value) { + if (result == ResultOk) { + m_promise.setValue(value); + } else { + m_promise.setFailed(result); + } + } +}; + void waitForAsyncResult(std::function func); template diff --git a/version.txt b/version.txt new file mode 100644 index 0000000..05f2b11 --- /dev/null +++ b/version.txt @@ -0,0 +1,5 @@ + +pulsar-client-python: 3.0.0a1 + +# Dependency +pulsar-client-cpp: 2.10.1