Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
<img src="data/media/nhs_logo.png" alt="Nhs_logo" style="width:25%; align="center">
<img src="data/media/medcat_logo.png" alt="Medcat_logo" style="width:10%; padding-top=3%" align="right">
</p>


# Working with CogStack
This repository contains all tools relevant to interacting with an NHS deployment of CogStack.
This repository contains all tools relevant to interacting with an NHS deployment of CogStack.

It contains:
1) Easy to follow templates and instructions to interact and search CogStack.
2) Recommended workflows to create, train, and run, MedCAT models.

For further discussions or questions. Please join our official [CogStack/MedCAT forum!](https://discourse.cogstack.org/)

__NOTE__ this section is currently in development. Let me know if there is anything
__NOTE__ this section is currently in development. Let me know if there is anything
else to add!


Expand All @@ -26,17 +26,17 @@ Any code to enter in these instructions will be represented as `code to enter`.

Please replace anything within `<Enter information here>` with your own specific details.

### Step 1: Clone this repository locally
### Step 1: Clone this repository locally

1. Enter the directory where you would like to store these files. `cd path/to/where/you/want/this/repository`

2. Clone the online repository: `git clone https://github.com/CogStack/working_with_cogstack.git`

Further instructions and self-help with git and git clone. Please visit this [link.](https://github.com/git-guides/git-clone)

If you choose to use github desktop rather than the terminal please refer to the [official github desktop guides.](https://docs.github.com/en/desktop)

3. Optional: To update to the latest release of this repository: `git pull`
3. Optional: To update to the latest release of this repository: `git pull`

### Step 2: Creating a virtual environment and required packages
(Requires Python 3.7+)
Expand All @@ -53,12 +53,12 @@ __Linux/MAC OS__
3. Install relevant packages and libraries: `pip install -r requirements.txt`

*Optional: If no jupyter instance is installed.*
1. In the main folder of this repository. Activate your virtual environment, using the (Step 2) command from your respective OS.
1. In the main folder of this repository. Activate your virtual environment, using the (Step 2) command from your respective OS.
2. Start JupyterLab: `jupyter-lab`


### Step 3: Enter credentials and Login details
In the main folder of this repository you can populate the [credentials.py](credentials.py) file with your own CogStack hostnames, username and passwords.
In the main folder of this repository you can populate the [credentials.py](credentials.py) file with your own CogStack hostnames, username and passwords.

For an automatic authentication experience, the credentials.py contents can be prepopulated with your CogStack instance credentials:
```
Expand All @@ -77,19 +77,37 @@ If you have any questions or issues obtaining these details please contact your
## [How to search using CogStack](search)
This directory contains the basics search templates.

For further information on CogStack please visit their [github](https://github.com/CogStack)
or [wiki page](https://cogstack.org/).
For further information on CogStack please visit their [github](https://github.com/CogStack)
or [wiki page](https://cogstack.org/).

### Search Engine Support
The `cogstack.py` module supports both Elasticsearch and OpenSearch backends:

- **Elasticsearch**: Default backend (requires `elasticsearch` package)
- **OpenSearch**: Alternative backend (requires `opensearch-py` package)

To use OpenSearch instead of Elasticsearch, set `use_opensearch=True` when initializing the CogStack class:

```python
# Using Elasticsearch (default)
cs = CogStack(hosts=['http://localhost:9200'])

# Using OpenSearch
cs = CogStack(hosts=['http://localhost:9200'], use_opensearch=True)
```

Both backends support the same authentication methods (basic auth, API keys) and provide identical functionality.

## [How to create a watcher](watcher)
This directory contains the basics watcher job templates.

## [MedCAT](medcat)
An overview of this process is shown below.

<img src="data/media/medcat_pipeline_summary.png">


Further information about MedCAT can be found from their [github](https://github.com/CogStack/MedCAT)
Further information about MedCAT can be found from their [github](https://github.com/CogStack/MedCAT)
or via their official documentation [here](https://medcat.readthedocs.io/en/latest/).

General MedCAT tutorials can be found [here](https://github.com/CogStack/MedCATtutorials).
Expand Down
125 changes: 80 additions & 45 deletions cogstack.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@

import getpass
from typing import Dict, List, Any, Optional, Iterable, Tuple
import elasticsearch
import elasticsearch.helpers
import pandas as pd
from tqdm.notebook import tqdm
import eland as ed
Expand All @@ -16,6 +14,22 @@
# Reset all filters
warnings.resetwarnings()

# Import search engine clients
try:
import elasticsearch
import elasticsearch.helpers
ELASTICSEARCH_AVAILABLE = True
except ImportError:
ELASTICSEARCH_AVAILABLE = False

try:
import opensearchpy
import opensearchpy.helpers
OPENSEARCH_AVAILABLE = True
except ImportError:
OPENSEARCH_AVAILABLE = False


warnings.filterwarnings("module", category=DeprecationWarning, module="cogstack")
warnings.filterwarnings('ignore', category=SecurityWarning)
warnings.filterwarnings('ignore', category=InsecureRequestWarning)
Expand All @@ -25,49 +39,65 @@
class CogStack(object):
warnings.warn("cogstack module is deprecated, use cogstack2 instead.", DeprecationWarning)
"""
A class for interacting with Elasticsearch.
A class for interacting with Elasticsearch or OpenSearch.

Args:
hosts (List[str]): A list of Elasticsearch host URLs.
username (str, optional): The username to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a username.
password (str, optional): The password to use when connecting to Elasticsearch. If not provided, the user will be prompted to enter a password.
api (bool, optional): A boolean value indicating whether to use API keys or basic authentication to connect to Elasticsearch. Defaults to False (i.e., use basic authentication). Elasticsearch 7.17.
api_key (str, optional): The API key to use when connecting to Elasticsearch.
hosts (List[str]): A list of search engine host URLs.
username (str, optional): The username to use when connecting to the search engine. If not provided, the user will be prompted to enter a username.
password (str, optional): The password to use when connecting to the search engine. If not provided, the user will be prompted to enter a password.
api (bool, optional): A boolean value indicating whether to use API keys or basic authentication to connect to the search engine. Defaults to False (i.e., use basic authentication). Elasticsearch 7.17.
api_key (str, optional): The API key to use when connecting to the search engine.
When provided along with `api=True`, this takes precedence over username/password. Only available when using Elasticsearch 8.17.
use_opensearch (bool, optional): A boolean value indicating whether to use OpenSearch instead of Elasticsearch. Defaults to False (i.e., use Elasticsearch).
timeout (int, optional): The timeout in seconds for connections. Defaults to 60.
"""
def __init__(self, hosts: List, username: Optional[str] = None, password: Optional[str] = None,
api: bool = False, timeout: Optional[int]=60, api_key: Optional[str] = None):
api: bool = False, timeout: Optional[int]=60, api_key: Optional[str] = None,
use_opensearch: bool = False):

# Validate that the required client is available
if use_opensearch and not OPENSEARCH_AVAILABLE:
raise ImportError("OpenSearch client is not available. Please install opensearch-py: pip install opensearch-py")
elif not use_opensearch and not ELASTICSEARCH_AVAILABLE:
raise ImportError("Elasticsearch client is not available. Please install elasticsearch: pip install elasticsearch")

# Choose the appropriate client and helpers
if use_opensearch:
client_class = opensearchpy.OpenSearch
self.helpers = opensearchpy.helpers
else:
client_class = elasticsearch.Elasticsearch
self.helpers = elasticsearch.helpers

if api_key and api:
self.elastic = elasticsearch.Elasticsearch(hosts=hosts,
api_key=api_key,
verify_certs=False,
request_timeout=timeout)


self.elastic = client_class(hosts=hosts,
api_key=api_key,
verify_certs=False,
timeout=timeout)

elif api:
api_username, api_password = self._check_auth_details(username, password)
self.elastic = elasticsearch.Elasticsearch(hosts=hosts,
api_key=(api_username, api_password),
verify_certs=False,
request_timeout=timeout)
self.elastic = client_class(hosts=hosts,
api_key=(api_username, api_password),
verify_certs=False,
timeout=timeout)

else:
username, password = self._check_auth_details(username, password)
self.elastic = elasticsearch.Elasticsearch(hosts=hosts,
basic_auth=(username, password),
verify_certs=False,
request_timeout=timeout)
self.elastic = client_class(hosts=hosts,
basic_auth=(username, password),
verify_certs=False,
timeout=timeout)


def _check_auth_details(self, username=None, password=None) -> Tuple[str, str]:
"""
Prompt the user for a username and password if the values are not provided as function arguments.

Args:
api_username (str, optional): The API username. If not provided, the user will be prompted to enter a username.
api_password (str, optional): The API password. If not provided, the user will be prompted to enter a password.

Returns:
Tuple[str, str]: A tuple containing the API username and password.
"""
Expand All @@ -79,18 +109,18 @@ def _check_auth_details(self, username=None, password=None) -> Tuple[str, str]:

def get_docs_generator(self, index: List, query: Dict, es_gen_size: int=800, request_timeout: Optional[int] = 300):
"""
Retrieve a generator object that can be used to iterate through documents in an Elasticsearch index.
Retrieve a generator object that can be used to iterate through documents in an Elasticsearch or OpenSearch index.

Args:
index (List[str]): A list of Elasticsearch index names to search.
index (List[str]): A list of search engine index names to search.
query (Dict): A dictionary containing the search query parameters.
es_gen_size (int, optional): The number of documents to retrieve per batch. Defaults to 800.
request_timeout (int, optional): The time in seconds to wait for a response from Elasticsearch before timing out. Defaults to 300.
request_timeout (int, optional): The time in seconds to wait for a response from the search engine before timing out. Defaults to 300.

Returns:
generator: A generator object that can be used to iterate through the documents in the specified Elasticsearch index.
generator: A generator object that can be used to iterate through the documents in the specified search engine index.
"""
docs_generator = elasticsearch.helpers.scan(self.elastic,
docs_generator = self.helpers.scan(self.elastic,
query=query,
index=index,
size=es_gen_size,
Expand All @@ -100,27 +130,32 @@ def get_docs_generator(self, index: List, query: Dict, es_gen_size: int=800, req
def cogstack2df(self, query: Dict, index: str, column_headers=None, es_gen_size: int=800, request_timeout: int=300,
show_progress: bool = True):
"""
Retrieve documents from an Elasticsearch index and convert them to a Pandas DataFrame.
Retrieve documents from an Elasticsearch or OpenSearch index and convert them to a Pandas DataFrame.

Args:
query (Dict): A dictionary containing the search query parameters.
index (str): The name of the Elasticsearch index to search.
index (str): The name of the search engine index to search.
column_headers (List[str], optional): A list of column headers to use for the DataFrame. If not provided, the DataFrame will have default column names.
es_gen_size (int, optional): The number of documents to retrieve per batch. Defaults to 800.
request_timeout (int, optional): The time in seconds to wait for a response from Elasticsearch before timing out. Defaults to 300.
request_timeout (int, optional): The time in seconds to wait for a response from the search engine before timing out. Defaults to 300.
show_progress (bool, optional): Whether to show the progress in console. Defaults to true.

Returns:
pandas.DataFrame: A DataFrame containing the retrieved documents.
"""
docs_generator = elasticsearch.helpers.scan(self.elastic,
docs_generator = self.helpers.scan(self.elastic,
query=query,
index=index,
size=es_gen_size,
request_timeout=request_timeout)
temp_results = []
results = self.elastic.count(index=index, query=query['query']) # type: ignore
for hit in tqdm(docs_generator, total=results['count'], desc="CogStack retrieved...", disable=not show_progress):
count_query = {
"query": query['query'],
"size": 0 # We only want the count, not the documents
}
results = self.elastic.search(index=index, body=count_query, request_timeout=300)
total_count = results['hits']['total']['value'] if isinstance(results['hits']['total'], dict) else results['hits']['total']
for hit in tqdm(docs_generator, total=total_count, desc="CogStack retrieved...", disable=not show_progress):
row = dict()
row['_index'] = hit['_index']
row['_id'] = hit['_id']
Expand All @@ -134,15 +169,15 @@ def cogstack2df(self, query: Dict, index: str, column_headers=None, es_gen_size:
else:
df = pd.DataFrame(temp_results)
return df

def DataFrame(self, index: str, columns: Optional[List[str]] = None):
"""
Fast method to return a pandas dataframe from a CogStack search.

Args:
index (str): A list of indices to search.
columns (List[str], optional): A list of column names to include in the DataFrame. If not provided, all columns will be included.

Returns:
DataFrame: A pd.DataFrame like object containing the retrieved documents.
"""
Expand All @@ -152,11 +187,11 @@ def DataFrame(self, index: str, columns: Optional[List[str]] = None):
def list_chunker(user_list: List[Any], n: int) -> List[List[Any]]:
"""
Divide a list into sublists of a specified size.

Args:
user_list (List[Any]): The list to be divided.
n (int): The size of the sublists.

Returns:
List[List[Any]]: A list of sublists containing the elements of the input list.
"""
Expand Down
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ eland>=9.0.0,<10.0
en_core_web_md @ https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.8.0/en_core_web_md-3.8.0-py3-none-any.whl
ipyfilechooser
jupyter_contrib_nbextensions

# Search engine clients (install at least one)
elasticsearch>=8.0.0 # For Elasticsearch support
opensearch-py>=2.0.0 # For OpenSearch support (alternative to elasticsearch)
Loading