Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"python-envs.pythonProjects": [
{
"path": "",
"envManager": "ms-python.python:venv",
"packageManager": "ms-python.python:pip"
}
]
}
14 changes: 14 additions & 0 deletions libs/langgraph-elasticsearch/.vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Python File",
"type": "debugpy",
"request": "launch",
"program": "${file}"
}
]
}
7 changes: 7 additions & 0 deletions libs/langgraph-elasticsearch/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
67 changes: 67 additions & 0 deletions libs/langgraph-elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Langgraph Elasticsearch

Langgraph Elasticsearch is a library designed to integrate Langchain with Elasticsearch, providing powerful search capabilities for your applications.

## Table of Contents
- [Installation](#installation)
- [Usage](#usage)
- [Contributing](#contributing)
- [License](#license)

## Installation

To install Langgraph Elasticsearch, you can use pip:

```bash
pip install langgraph-elasticsearch
```

Alternatively, you can clone the repository and install it manually:

```bash
git clone https://github.com/yourusername/langgraph-elasticsearch.git
cd langgraph-elasticsearch
pip install .
```

## Usage

Here is an example of how to use Langgraph Elasticsearch:

```python

from langgraph_elasticsearch import ElasticsearchMemoryStore

# Initialize the Elasticsearch client
es_client = ElasticsearchMemoryStore(
es_url="http://localhost:9200",
es_user="elastic",
es_password="your_password"
)

# Index a document
doc = {
"title": "Example Document",
"content": "This is an example document for Langgraph Elasticsearch."
}
es_client.index_document(index="documents", id=1, document=doc)

# Search for a document
query = {
"query": {
"match": {
"content": "example"
}
}
}
results = es_client.search(index="documents", body=query)
print(results)
```

## Contributing

We welcome contributions to Langgraph Elasticsearch! If you would like to contribute, please fork the repository and submit a pull request. For major changes, please open an issue first to discuss what you would like to change.

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
29 changes: 29 additions & 0 deletions libs/langgraph-elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: "3.8"

volumes:
esdata:
driver: local

networks:
default:
name: elastic

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.9.2
volumes:
- esdata:/usr/share/elasticsearch/data
ports:
- 9200:9200
environment:
- node.name=es01
- discovery.type=single-node
- ELASTIC_PASSWORD=elastic
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=false
- xpack.security.transport.ssl.enabled=false
healthcheck:
test: ["CMD-SHELL", "curl -s -X GET http://localhost:9200 | grep 'You Know, for Search'"]
interval: 10s
timeout: 5s
retries: 10
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import asyncio
from typing import Any, Dict, Generic, Optional, Sequence, Tuple
from elasticsearch import Elasticsearch
from langchain_elasticsearch.client import create_elasticsearch_client
from langgraph.checkpoint.elasticsearch.base import BaseElasticsearchSaver
from langgraph.checkpoint.serde.base import SerializerProtocol, maybe_add_typed_methods
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import CheckpointTuple, get_checkpoint_id
from langgraph.checkpoint.base import (
WRITES_IDX_MAP,
BaseCheckpointSaver,
ChannelVersions,
Checkpoint,
CheckpointMetadata,
get_checkpoint_id,
CheckpointTuple,
PendingWrite
)

from langgraph.checkpoint.elasticsearch.configurable import Configurable, get_configurable
from langgraph.util import syncify

class ElasticsearchSaver(BaseElasticsearchSaver[Elasticsearch]):
def __init__(self,
es_connection: Elasticsearch | None = None,
es_url: str | None = None,
es_cloud_id: str | None = None,
es_user: str | None = None,
es_api_key: str | None = None,
es_password: str | None = None,
es_params: Dict[str, Any] | None = None,
*,
serde: Optional[SerializerProtocol] = None,):
if not es_connection:
es_connection = create_elasticsearch_client(
url=es_url,
cloud_id=es_cloud_id,
api_key=es_api_key,
username=es_user,
password=es_password,
params=es_params,
)
if not es_connection:
raise ValueError("No Elasticsearch connection provided.")

try:
es_connection.info()
except Exception as e:
raise ValueError(f"Failed to connect to Elasticsearch: {e}")

super().__init__(es_connection=es_connection, serde=serde)

def _search_checkpoint(self, configurable: Configurable) -> Dict[str, Any]:
result_checkpoint = self.es_connection.search(
index=self.index_checkpoints,
body=self._build_query_checkpoint(configurable),
size=1,
)

return self._extract_hits(result_checkpoint)

def _search_writes(self, configurable: Configurable) -> Dict[str, Any]:
result_writes = self.es_connection.search(
index=self.index_writes,
body=self._build_query_writes(configurable),
)

return self._extract_hits(result_writes)

def _search_parent(self, configurable: Configurable, parent_checkpoint_id) -> Dict[str, Any]:
if not parent_checkpoint_id:
return []

result_parent = self.es_connection.search(
index=self.index_writes,
body=self._build_query_parent(configurable, parent_checkpoint_id),
)

return self._extract_hits(result_parent)

def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
loop = asyncio.get_event_loop()
return loop.run_until_complete(super().aget_tuple(config))

async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
raise NotImplementedError("Not supported in ElasticsearchSaver, use ElasticsearchSaver.get_tuple instead.")

def put(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
return syncify(self.aput, config, checkpoint, metadata, new_versions)

async def aput(
self,
config: RunnableConfig,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: ChannelVersions,
) -> RunnableConfig:
raise NotImplementedError("Not supported in ElasticsearchSaver, use ElasticsearchSaver.put instead.")




def put_writes(
self,
config: RunnableConfig,
writes: Sequence[Tuple[str, Any]],
task_id: str,
task_path: str = "",
) -> None:
"""Store intermediate writes linked to a checkpoint.

Args:
config (RunnableConfig): Configuration of the related checkpoint.
writes (List[Tuple[str, Any]]): List of writes to store.
task_id (str): Identifier for the task creating the writes.
task_path (str): Path of the task creating the writes.

Raises:
NotImplementedError: Implement this method in your custom checkpoint saver.
"""
raise NotImplementedError
Loading