Using redis with aws memorydb for vector search
Explore RAG with aws memorydb to perform vector search and achieve low latency performance.
Aws memorydb is an in memory database that allows you to achieve ultra fast low latency performance to index, store and query vector embeddings.
In this session we will be reading a document, connecting to aws memorydb, storing and querying vector embeddings.
Note that Vector search for MemoryDB is available in preview in the US East (N. Virginia), US East (Ohio), Europe (Ireland), US West (Oregon), and Asia Pacific (Tokyo) regions only.
Split Document:
This is your standard document retrieval and chunking. In this case we are using docx to read a document. You could also use langchains file uploader or any other libraries for retrieving your document and chunking them. We are using RecursiveCharacterTextSplitter to chunk the text.
//Load document and split it and create embedding
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
def read_word_document(filepath):
doc = Document(filepath)
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
def splitText(text):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
docs = text_splitter.split_text(text)
return docs
doc = read_word_document("./<yourdoc>.docx");
splitDoc = splitText(doc)
Save embeddings in redis:
Here we are connecting to our redis cluster endpoint. We iterate over each of the embeddings and save our embeddings and text using hset.
import redis
from langchain_community.embeddings import BedrockEmbeddings
import numpy as np
#Create redis client
client = redis.Redis(
host='<cluster_endpoint>',
port=6379,
ssl=True,
)
embeddings = BedrockEmbeddings(
region_name="us-east-1",
)
#Save embedding and metadata using hset into your redis cluster
def saveEmbedding(embeddings, docs):
for id, dd in enumerate(docs):
y = embeddings.embed_documents([dd])
j = np.array(y, dtype=np.float32).tobytes()
client.hset(f'oakDoc:{id}', mapping={'embed': j, 'text': docs[id] } )
saveEmbedding(embeddings, splitDoc)
Create index:
We are creating the vector index for our embeddings and corresponding text. Note the use of VectorField to define an embedding. We are using HNSW algorithm and for distance metric we are using COSINE.
from redis.commands.search.field import (
NumericField,
TagField,
TextField,
VectorField,
)
index = "idx:testIndex"
def createIndex(index):
start_time = time.time()
res = client.ft(index).create_index([
VectorField(
"embed",
"HNSW",
{
"TYPE": "FLOAT32",
"DIM": 1536,
"DISTANCE_METRIC": "COSINE",
}
),
TextField("text")
]
)
end_time = time.time()
print("Time taken to create index", end_time-start_time)
#Create index if not exists or just print index info
try:
info = client.ft(index).info()
print("index already exists")
# FT.INFO "idx:oakUserGuide1"
print("documents in index",info["num_docs"])
except:
print("creating index")
createIndex(index)
Querying
Here we are doing a KNN (K Nearest Neighbour) search for top 5 embeddings sorted by vector_score.
from redis.commands.search.query import Query
def Querying(query):
start_time = time.time()
embedding_data = embeddings.embed_documents([query])
byteInfo = np.array(embedding_data[0], dtype=np.float32).tobytes()
q = Query('(*)=>[KNN 5 @embed $vec_param AS vector_score]')
.paging(0, 5)
.sort_by("vector_score")
.return_fields("text")
.dialect(2)
paramsDict = {
"vec_param": byteInfo
}
results = client.ft(index).search(q, query_params= paramsDict)
end_time = time.time()
print("Time taken to query index", end_time-start_time)
return results
result = Querying("List all supported rules")
The complete setup looks like below:
import redis
import docx
import time
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.redis import Redis
from redis.commands.search.field import (
NumericField,
TagField,
TextField,
VectorField,
)
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
from langchain_community.embeddings import BedrockEmbeddings
import numpy as np
import base64
client = redis.Redis(
host='<cluster_endpoint>',
port=6379,
ssl=True,
)
embeddings = BedrockEmbeddings(
region_name="us-east-1",
)
def read_word_document(filepath):
doc = Document(filepath)
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])
def splitText(text):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
docs = text_splitter.split_text(text)
return docs
def saveEmbedding(embeddings, docs):
for id, dd in enumerate(docs):
y = embeddings.embed_documents([dd])
j = np.array(y, dtype=np.float32).tobytes()
client.hset(f'oakDoc:{id}', mapping={'embed': j, 'text': docs[id] } )
doc = read_word_document(<file_name>);
splitDoc = splitText(doc)
saveEmbedding(embeddings, splitDoc)
index = "idx:test"
def createIndex(index):
start_time = time.time()
res = client.ft(index).create_index([
VectorField(
"embed",
"HNSW",
{
"TYPE": "FLOAT32",
"DIM": 1536,
"DISTANCE_METRIC": "COSINE",
}
),
TextField("text")
]
)
end_time = time.time()
print("Time taken to create index", end_time-start_time)
try:
info = client.ft(index).info()
print("index already exists")
# FT.INFO "idx:oakUserGuide1"
print("documents in index",info["num_docs"])
except:
print("creating index")
createIndex(index)
#Querying vector embedding
def Querying(query):
start_time = time.time()
embedding_data = embeddings.embed_documents([query])
embedded_bytes = np.array(embedding_data[0], dtype=np.float32).tobytes()
q = Query('(*)=>[KNN 5 @embed $vec_param AS vector_score]')
.paging(0, 5)
.sort_by("vector_score")
.return_fields("text")
.dialect(2)
paramsDict = {
"vec_param": embedded_bytes
}
results = client.ft(index).search(q, query_params= paramsDict)
end_time = time.time()
print("Time taken to query index", end_time-start_time)
return results
context = Querying("Features of oak")
Cost comparison:
Memory db allows different nodetype and different configurations and on demand pricing options.
https://aws.amazon.com/memorydb/pricing/
Considering the following configuration for a production setup:
- db.r7g.xlarge
- 26.32 GiB memory
- Up to 12.5 Gigabit network performance
730 hours in a month * 0.67 USD = 489.1 USD
For the same configuration, if the node is reserved then month cost would be = ~$306.60
Note that additional costs could be incurred if there is a need to snapshot the instance and save the entire cluster.