From 9ce5e8bbb54df3369f11938f54f5d902f6406504 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 29 Apr 2025 18:20:03 +0800 Subject: [PATCH 1/9] (WIP) Import TableView --- pulsar/__init__.py | 1 + pulsar/tableview.py | 27 +++++++++++++++++++++++++++ src/client.cc | 6 ++++++ src/pulsar.cc | 2 ++ src/table_view.cc | 36 ++++++++++++++++++++++++++++++++++++ 5 files changed, 72 insertions(+) create mode 100644 pulsar/tableview.py create mode 100644 src/table_view.cc diff --git a/pulsar/__init__.py b/pulsar/__init__.py index ba89d40a..812c2db8 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -54,6 +54,7 @@ from pulsar.__about__ import __version__ from pulsar.exceptions import * +from pulsar.tableview import TableView from pulsar.functions.function import Function from pulsar.functions.context import Context diff --git a/pulsar/tableview.py b/pulsar/tableview.py new file mode 100644 index 00000000..5d090688 --- /dev/null +++ b/pulsar/tableview.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +The TableView implementation. +""" + +class TableView(): + + def __init__(self) -> None: + pass diff --git a/src/client.cc b/src/client.cc index b25c63ab..72c824ff 100644 --- a/src/client.cc +++ b/src/client.cc @@ -89,6 +89,12 @@ void export_client(py::module_& m) { .def("subscribe_topics", &Client_subscribe_topics) .def("subscribe_pattern", &Client_subscribe_pattern) .def("create_reader", &Client_createReader) + .def("create_table_view", [](Client& client, const std::string& topic, + const TableViewConfiguration& config) { + return waitForAsyncValue([&](TableViewCallback callback) { + client.createTableViewAsync(topic, config, callback); + }); + }) .def("get_topic_partitions", &Client_getTopicPartitions) .def("get_schema_info", &Client_getSchemaInfo) .def("close", &Client_close) diff --git a/src/pulsar.cc b/src/pulsar.cc index 9bfeb597..6c42f8cd 100644 --- a/src/pulsar.cc +++ b/src/pulsar.cc @@ -32,6 +32,7 @@ void export_enums(Module& m); void export_authentication(Module& m); void export_schema(Module& m); void export_exceptions(Module& m); +void export_table_view(Module& m); PYBIND11_MODULE(_pulsar, m) { export_exceptions(m); @@ -44,4 +45,5 @@ PYBIND11_MODULE(_pulsar, m) { export_enums(m); export_authentication(m); export_schema(m); + export_table_view(m); } diff --git a/src/table_view.cc b/src/table_view.cc new file mode 100644 index 00000000..77d283e9 --- /dev/null +++ b/src/table_view.cc @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include + +namespace py = pybind11; +using namespace pulsar; + +void export_table_view(py::module_& m) { + py::class_(m, "TableViewConfiguration") + .def(py::init<>()) + .def("subscription_name", + [](TableViewConfiguration& config, const std::string& name) { config.subscriptionName = name; }) + .def("schema", + [](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; }); + + py::class_(m, "TableView").def(py::init<>()); +} From cfd5c6760b34db2c759f5603e11986b171dae370 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 2 May 2025 17:40:00 +0800 Subject: [PATCH 2/9] Add simple TableView API --- pulsar/__init__.py | 36 +++++++++++++++++++++ pulsar/tableview.py | 57 ++++++++++++++++++++++++++++++++-- src/table_view.cc | 22 ++++++++++++- tests/run-unit-tests.sh | 1 + tests/table_view_test.py | 67 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 tests/table_view_test.py diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 812c2db8..acb2b8b8 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1126,6 +1126,42 @@ def my_listener(reader, message): self._consumers.append(c) return c + def create_table_view(self, topic: str, + subscription_name: Optional[str] = None, + schema: schema.Schema = schema.BytesSchema()) -> TableView: + """ + Create a table view on a particular topic + + Parameters + ---------- + + topic: str + The name of the topic. + subscription_name: str, optional + The name of the subscription. If it's not specified, a random subscription name + will be used. + schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema + Define the schema of this table view. If the schema is incompatible with the topic's + schema, this method will throw an exception. This schema is also used to deserialize + the value of messages in the table view. + + Returns + ------- + TableView + A table view instance. + """ + _check_type(str, topic, 'topic') + _check_type_or_none(str, subscription_name, 'subscription_name') + _check_type(_schema.Schema, schema, 'schema') + + tv_conf = _pulsar.TableViewConfiguration() + if subscription_name is not None: + tv_conf.subscription_name(subscription_name) + tv_conf.schema(schema.schema_info()) + tv = self._client.create_table_view(topic, tv_conf) + self._table_view = TableView(tv, topic, subscription_name, schema) + return self._table_view + def get_topic_partitions(self, topic): """ Get the list of partitions for a given topic. diff --git a/pulsar/tableview.py b/pulsar/tableview.py index 5d090688..2e3c8dbb 100644 --- a/pulsar/tableview.py +++ b/pulsar/tableview.py @@ -21,7 +21,60 @@ The TableView implementation. """ +from typing import Any, Optional +from pulsar.schema.schema import Schema +import _pulsar + class TableView(): - def __init__(self) -> None: - pass + def __init__(self, table_view: _pulsar.TableView, topic: str, + subscription: Optional[str], schema: Schema) -> None: + self._table_view = table_view + self._topic = topic + self._subscription = subscription + self._schema = schema + + def get(self, key: str) -> Optional[Any]: + """ + Return the value associated with the given key in the table view. + + Parameters + ---------- + key: str + The message key + + Returns + ------- + Optional[Any] + The value associated with the key, or None if the key does not exist. + """ + pair = self._table_view.get(key) + if pair[0]: + return self._schema.decode(pair[1]) + else: + return None + #value = self._table_view.get(key) + #if value is None: + # return None + #return self._schema.decode(value) + + def close(self) -> None: + """ + Close the table view. + """ + self._table_view.close() + + def __len__(self) -> int: + """ + Return the number of entries in the table view. + """ + return self._table_view.size() + + def __str__(self) -> str: + if self._subscription is None: + return f"TableView(topic={self._topic})" + else: + return f"TableView(topic={self._topic}, subscription={self._subscription})" + + def __repr__(self) -> str: + return self.__str__() diff --git a/src/table_view.cc b/src/table_view.cc index 77d283e9..e568fbeb 100644 --- a/src/table_view.cc +++ b/src/table_view.cc @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include "utils.h" namespace py = pybind11; using namespace pulsar; @@ -32,5 +35,22 @@ void export_table_view(py::module_& m) { .def("schema", [](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; }); - py::class_(m, "TableView").def(py::init<>()); + py::class_(m, "TableView") + .def(py::init<>()) + .def("get", + [](const TableView& view, const std::string& key) -> std::pair { + py::gil_scoped_release release; + std::string value; + bool available = view.getValue(key, value); + py::gil_scoped_acquire acquire; + if (available) { + return std::make_pair(true, py::bytes(std::move(value))); + } else { + return std::make_pair(false, py::bytes()); + } + }) + .def("size", &TableView::size, py::call_guard()) + .def("close", [](TableView& view) { + waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); }); + }); } diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh index 0d6fabf6..8d7600d3 100755 --- a/tests/run-unit-tests.sh +++ b/tests/run-unit-tests.sh @@ -28,5 +28,6 @@ python3 debug_logger_test.py python3 interrupted_test.py python3 pulsar_test.py python3 schema_test.py +python3 table_view_test.py python3 reader_test.py python3 asyncio_test.py diff --git a/tests/table_view_test.py b/tests/table_view_test.py new file mode 100644 index 00000000..3e9b2a5b --- /dev/null +++ b/tests/table_view_test.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from typing import Callable +from unittest import TestCase, main +import time + +from pulsar import Client + +class TableViewTest(TestCase): + + def setUp(self): + self._client: Client = Client('pulsar://localhost:6650') + + def tearDown(self) -> None: + self._client.close() + + def test_get(self): + topic = f'table_view_test_get-{time.time()}' + table_view = self._client.create_table_view(topic) + self.assertEqual(len(table_view), 0) + + producer = self._client.create_producer(topic) + producer.send('value-0'.encode(), partition_key='key-0') + producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes + + self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2)) + self.assertEqual(table_view.get('key-0'), b'value-0') + self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0') + + producer.send('value-1'.encode(), partition_key='key-0') + # TODO: Upgrade to C++ client 3.7.1 to include https://github.com/apache/pulsar-client-cpp/pull/487 + #self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1')) + + producer.close() + table_view.close() + + + def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None: + start_time = time.time() + while time.time() - start_time < timeout: + try: + assertion() + return + except AssertionError: + time.sleep(0.1) + assertion() + +if __name__ == "__main__": + main() From e3a152d956fe2fa17e9648d8981c0a19e041efa3 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 May 2025 10:43:52 +0800 Subject: [PATCH 3/9] Bump version to 3.7.1 candidate 2 --- build-support/dep-url.sh | 2 +- dependencies.yaml | 2 +- tests/table_view_test.py | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/build-support/dep-url.sh b/build-support/dep-url.sh index 7670bb9e..63267b0d 100644 --- a/build-support/dep-url.sh +++ b/build-support/dep-url.sh @@ -23,7 +23,7 @@ pulsar_cpp_base_url() { exit 1 fi VERSION=$1 - echo "https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${VERSION}" + echo "https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-${VERSION}-candidate-2" } download_dependency() { diff --git a/dependencies.yaml b/dependencies.yaml index 9d9136e5..9db4747d 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,7 +17,7 @@ # under the License. # -pulsar-cpp: 3.7.0 +pulsar-cpp: 3.7.1 pybind11: 2.10.1 # The OpenSSL dependency is only used when building Python from source openssl: 1.1.1q diff --git a/tests/table_view_test.py b/tests/table_view_test.py index 3e9b2a5b..92726a40 100644 --- a/tests/table_view_test.py +++ b/tests/table_view_test.py @@ -46,8 +46,7 @@ def test_get(self): self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0') producer.send('value-1'.encode(), partition_key='key-0') - # TODO: Upgrade to C++ client 3.7.1 to include https://github.com/apache/pulsar-client-cpp/pull/487 - #self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1')) + self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1')) producer.close() table_view.close() From 705976f50df3ea2d9a5ca138de429803f65af89f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 May 2025 18:22:40 +0800 Subject: [PATCH 4/9] Support for_each --- pulsar/tableview.py | 18 +++++++++++++----- src/table_view.cc | 2 ++ tests/table_view_test.py | 16 +++++++++++++++- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pulsar/tableview.py b/pulsar/tableview.py index 2e3c8dbb..5eb165a4 100644 --- a/pulsar/tableview.py +++ b/pulsar/tableview.py @@ -21,7 +21,7 @@ The TableView implementation. """ -from typing import Any, Optional +from typing import Any, Callable, Optional from pulsar.schema.schema import Schema import _pulsar @@ -53,10 +53,18 @@ def get(self, key: str) -> Optional[Any]: return self._schema.decode(pair[1]) else: return None - #value = self._table_view.get(key) - #if value is None: - # return None - #return self._schema.decode(value) + + def for_each(self, callback: Callable[[str, Any], None]) -> None: + """ + Iterate over all entries in the table view and call the callback function + with the key and value for each entry. + + Parameters + ---------- + callback: Callable[[str, Any], None] + The callback function to call for each entry. + """ + self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v))) def close(self) -> None: """ diff --git a/src/table_view.cc b/src/table_view.cc index e568fbeb..fb156ed9 100644 --- a/src/table_view.cc +++ b/src/table_view.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "utils.h" @@ -50,6 +51,7 @@ void export_table_view(py::module_& m) { } }) .def("size", &TableView::size, py::call_guard()) + .def("for_each", &TableView::forEach, py::call_guard()) .def("close", [](TableView& view) { waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); }); }); diff --git a/tests/table_view_test.py b/tests/table_view_test.py index 92726a40..3bed931a 100644 --- a/tests/table_view_test.py +++ b/tests/table_view_test.py @@ -29,7 +29,7 @@ class TableViewTest(TestCase): def setUp(self): self._client: Client = Client('pulsar://localhost:6650') - def tearDown(self) -> None: + def tearDown(self): self._client.close() def test_get(self): @@ -51,6 +51,20 @@ def test_get(self): producer.close() table_view.close() + def test_for_each(self): + topic = f'table_view_test_for_each-{time.time()}' + table_view = self._client.create_table_view(topic) + producer = self._client.create_producer(topic) + producer.send('value-0'.encode(), partition_key='key-0') + producer.send('value-1'.encode(), partition_key='key-1') + self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2)) + + d = dict() + table_view.for_each(lambda key, value: d.__setitem__(key, value)) + self.assertEqual(d, { + 'key-0': 'value-0', + 'key-1': 'value-1' + }) def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None: start_time = time.time() From 0dc72841598140fb8294eb242caaa9d7df39df88 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 6 May 2025 20:23:20 +0800 Subject: [PATCH 5/9] Add for_each_and_listen --- pulsar/tableview.py | 13 +++++++++++++ src/table_view.cc | 1 + tests/table_view_test.py | 23 +++++++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/pulsar/tableview.py b/pulsar/tableview.py index 5eb165a4..702bb5b0 100644 --- a/pulsar/tableview.py +++ b/pulsar/tableview.py @@ -66,6 +66,19 @@ def for_each(self, callback: Callable[[str, Any], None]) -> None: """ self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v))) + def for_each_and_listen(self, callback: Callable[[str, Any], None]) -> None: + """ + Iterate over all entries in the table view and call the callback function + with the key and value for each entry, then listen for changes. The callback + will be called when a new entry is added or an existing entry is updated. + + Parameters + ---------- + callback: Callable[[str, Any], None] + The callback function to call for each entry. + """ + self._table_view.for_each_and_listen(lambda k, v: callback(k, self._schema.decode(v))) + def close(self) -> None: """ Close the table view. diff --git a/src/table_view.cc b/src/table_view.cc index fb156ed9..fb7a64c3 100644 --- a/src/table_view.cc +++ b/src/table_view.cc @@ -52,6 +52,7 @@ void export_table_view(py::module_& m) { }) .def("size", &TableView::size, py::call_guard()) .def("for_each", &TableView::forEach, py::call_guard()) + .def("for_each_and_listen", &TableView::forEachAndListen, py::call_guard()) .def("close", [](TableView& view) { waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); }); }); diff --git a/tests/table_view_test.py b/tests/table_view_test.py index 3bed931a..0389e33e 100644 --- a/tests/table_view_test.py +++ b/tests/table_view_test.py @@ -66,6 +66,29 @@ def test_for_each(self): 'key-1': 'value-1' }) + def listener(key: str, value: str): + if len(value) == 0: + d.pop(key) + else: + d[key] = value + + d.clear() + table_view.for_each_and_listen(listener) + self.assertEqual(d, { + 'key-0': 'value-0', + 'key-1': 'value-1' + }) + + producer.send('value-0-new'.encode(), partition_key='key-0') + producer.send(''.encode(), partition_key='key-1') + producer.send('value-2'.encode(), partition_key='key-2') + def assert_latest_values(): + self.assertEqual(d, { + 'key-0': 'value-0-new', + 'key-2': 'value-2' + }) + self._wait_for_assertion(assert_latest_values) + def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None: start_time = time.time() while time.time() - start_time < timeout: From f0331efc58a690adb2776c65ee2f4f4009886dc7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 May 2025 09:55:47 +0800 Subject: [PATCH 6/9] Use official 3.7.1 release --- build-support/dep-url.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build-support/dep-url.sh b/build-support/dep-url.sh index 63267b0d..7670bb9e 100644 --- a/build-support/dep-url.sh +++ b/build-support/dep-url.sh @@ -23,7 +23,7 @@ pulsar_cpp_base_url() { exit 1 fi VERSION=$1 - echo "https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-${VERSION}-candidate-2" + echo "https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${VERSION}" } download_dependency() { From 20be25552bc45e78760d45605fbaf7f4bdeb2daa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 May 2025 11:13:57 +0800 Subject: [PATCH 7/9] Fix schema not respected --- src/table_view.cc | 19 ++++++++++++++-- tests/table_view_test.py | 47 +++++++++++++++++++++++++++++----------- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/src/table_view.cc b/src/table_view.cc index fb7a64c3..6252937d 100644 --- a/src/table_view.cc +++ b/src/table_view.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "utils.h" @@ -51,8 +52,22 @@ void export_table_view(py::module_& m) { } }) .def("size", &TableView::size, py::call_guard()) - .def("for_each", &TableView::forEach, py::call_guard()) - .def("for_each_and_listen", &TableView::forEachAndListen, py::call_guard()) + .def("for_each", + [](TableView& view, std::function callback) { + py::gil_scoped_release release; + view.forEach([callback](const std::string& key, const std::string& value) { + py::gil_scoped_acquire acquire; + callback(key, py::bytes(value)); + }); + }) + .def("for_each_and_listen", + [](TableView& view, std::function callback) { + py::gil_scoped_release release; + view.forEachAndListen([callback](const std::string& key, const std::string& value) { + py::gil_scoped_acquire acquire; + callback(key, py::bytes(value)); + }); + }) .def("close", [](TableView& view) { waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); }); }); diff --git a/tests/table_view_test.py b/tests/table_view_test.py index 0389e33e..d3adcd41 100644 --- a/tests/table_view_test.py +++ b/tests/table_view_test.py @@ -23,6 +23,7 @@ import time from pulsar import Client +from pulsar.schema.schema import StringSchema class TableViewTest(TestCase): @@ -38,14 +39,14 @@ def test_get(self): self.assertEqual(len(table_view), 0) producer = self._client.create_producer(topic) - producer.send('value-0'.encode(), partition_key='key-0') + producer.send(b'value-0', partition_key='key-0') producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2)) self.assertEqual(table_view.get('key-0'), b'value-0') self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0') - producer.send('value-1'.encode(), partition_key='key-0') + producer.send(b'value-1', partition_key='key-0') self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1')) producer.close() @@ -55,15 +56,15 @@ def test_for_each(self): topic = f'table_view_test_for_each-{time.time()}' table_view = self._client.create_table_view(topic) producer = self._client.create_producer(topic) - producer.send('value-0'.encode(), partition_key='key-0') - producer.send('value-1'.encode(), partition_key='key-1') + producer.send(b'value-0', partition_key='key-0') + producer.send(b'value-1', partition_key='key-1') self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2)) d = dict() table_view.for_each(lambda key, value: d.__setitem__(key, value)) self.assertEqual(d, { - 'key-0': 'value-0', - 'key-1': 'value-1' + 'key-0': b'value-0', + 'key-1': b'value-1' }) def listener(key: str, value: str): @@ -75,20 +76,40 @@ def listener(key: str, value: str): d.clear() table_view.for_each_and_listen(listener) self.assertEqual(d, { - 'key-0': 'value-0', - 'key-1': 'value-1' + 'key-0': b'value-0', + 'key-1': b'value-1' }) - producer.send('value-0-new'.encode(), partition_key='key-0') - producer.send(''.encode(), partition_key='key-1') - producer.send('value-2'.encode(), partition_key='key-2') + producer.send(b'value-0-new', partition_key='key-0') + producer.send(b'', partition_key='key-1') + producer.send(b'value-2', partition_key='key-2') def assert_latest_values(): self.assertEqual(d, { - 'key-0': 'value-0-new', - 'key-2': 'value-2' + 'key-0': b'value-0-new', + 'key-2': b'value-2' }) self._wait_for_assertion(assert_latest_values) + def test_schema(self): + topic = f'table_view_test_schema-{time.time()}' + table_view = self._client.create_table_view(topic, schema=StringSchema()) + producer = self._client.create_producer(topic, schema=StringSchema()) + producer.send('value', partition_key='key') + + self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value')) + self.assertEqual(table_view.get('missed-key'), None) + + entries = dict() + table_view.for_each(lambda key, value: entries.__setitem__(key, value)) + self.assertEqual(entries, {'key': 'value'}) + + entries.clear() + table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value)) + self.assertEqual(entries, {'key': 'value'}) + + producer.send('new-value', partition_key='key') + self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value')) + def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None: start_time = time.time() while time.time() - start_time < timeout: From ba3684bf1c5293b1d1e17124aeb9a4c8a0f870e4 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 May 2025 11:20:02 +0800 Subject: [PATCH 8/9] Update dep-url.sh --- build-support/dep-url.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build-support/dep-url.sh b/build-support/dep-url.sh index 7670bb9e..e7e5c2a9 100644 --- a/build-support/dep-url.sh +++ b/build-support/dep-url.sh @@ -23,7 +23,7 @@ pulsar_cpp_base_url() { exit 1 fi VERSION=$1 - echo "https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${VERSION}" + echo "https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-cpp-${VERSION}" } download_dependency() { From 97c08f29e912fd61a9cf6f31745b1563518e1534 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 May 2025 15:00:31 +0800 Subject: [PATCH 9/9] Revert "Update dep-url.sh" This reverts commit ba3684bf1c5293b1d1e17124aeb9a4c8a0f870e4. --- build-support/dep-url.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build-support/dep-url.sh b/build-support/dep-url.sh index e7e5c2a9..7670bb9e 100644 --- a/build-support/dep-url.sh +++ b/build-support/dep-url.sh @@ -23,7 +23,7 @@ pulsar_cpp_base_url() { exit 1 fi VERSION=$1 - echo "https://dist.apache.org/repos/dist/release/pulsar/pulsar-client-cpp-${VERSION}" + echo "https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${VERSION}" } download_dependency() {