Using redis with aws memorydb for vector search

Explore RAG with aws memorydb to perform vector search and achieve low latency performance.

ashwin naik
4 min readFeb 3, 2024

Aws memorydb is an in memory database that allows you to achieve ultra fast low latency performance to index, store and query vector embeddings.

In this session we will be reading a document, connecting to aws memorydb, storing and querying vector embeddings.

Note that Vector search for MemoryDB is available in preview in the US East (N. Virginia), US East (Ohio), Europe (Ireland), US West (Oregon), and Asia Pacific (Tokyo) regions only.

Split Document:

This is your standard document retrieval and chunking. In this case we are using docx to read a document. You could also use langchains file uploader or any other libraries for retrieving your document and chunking them. We are using RecursiveCharacterTextSplitter to chunk the text.

//Load document and split it and create embedding
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

def read_word_document(filepath):
doc = Document(filepath)
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])

def splitText(text):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
is_separator_regex=False,
)
docs = text_splitter.split_text(text)
return docs

doc = read_word_document("./<yourdoc>.docx");
splitDoc = splitText(doc)

Save embeddings in redis:

Here we are connecting to our redis cluster endpoint. We iterate over each of the embeddings and save our embeddings and text using hset.

import redis
from langchain_community.embeddings import BedrockEmbeddings
import numpy as np

#Create redis client
client = redis.Redis(
host='<cluster_endpoint>',
port=6379,
ssl=True,
)

embeddings = BedrockEmbeddings(
region_name="us-east-1",
)

#Save embedding and metadata using hset into your redis cluster
def saveEmbedding(embeddings, docs):
for id, dd in enumerate(docs):
y = embeddings.embed_documents([dd])
j = np.array(y, dtype=np.float32).tobytes()
client.hset(f'oakDoc:{id}', mapping={'embed': j, 'text': docs[id] } )

saveEmbedding(embeddings, splitDoc)

Create index:

We are creating the vector index for our embeddings and corresponding text. Note the use of VectorField to define an embedding. We are using HNSW algorithm and for distance metric we are using COSINE.

from redis.commands.search.field import (
NumericField,
TagField,
TextField,
VectorField,
)

index = "idx:testIndex"
def createIndex(index):
start_time = time.time()
res = client.ft(index).create_index([
VectorField(
"embed",
"HNSW",
{
"TYPE": "FLOAT32",
"DIM": 1536,
"DISTANCE_METRIC": "COSINE",
}
),
TextField("text")

]
)
end_time = time.time()
print("Time taken to create index", end_time-start_time)

#Create index if not exists or just print index info
try:
info = client.ft(index).info()
print("index already exists")
# FT.INFO "idx:oakUserGuide1"
print("documents in index",info["num_docs"])
except:
print("creating index")
createIndex(index)

Querying

Here we are doing a KNN (K Nearest Neighbour) search for top 5 embeddings sorted by vector_score.

from redis.commands.search.query import Query
def Querying(query):
start_time = time.time()
embedding_data = embeddings.embed_documents([query])


byteInfo = np.array(embedding_data[0], dtype=np.float32).tobytes()
q = Query('(*)=>[KNN 5 @embed $vec_param AS vector_score]')
.paging(0, 5)
.sort_by("vector_score")
.return_fields("text")
.dialect(2)

paramsDict = {
"vec_param": byteInfo
}
results = client.ft(index).search(q, query_params= paramsDict)
end_time = time.time()
print("Time taken to query index", end_time-start_time)
return results

result = Querying("List all supported rules")

The complete setup looks like below:



import redis
import docx
import time
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.redis import Redis
from redis.commands.search.field import (
NumericField,
TagField,
TextField,
VectorField,
)
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query
from langchain_community.embeddings import BedrockEmbeddings
import numpy as np
import base64

client = redis.Redis(
host='<cluster_endpoint>',
port=6379,
ssl=True,
)
embeddings = BedrockEmbeddings(
region_name="us-east-1",
)


def read_word_document(filepath):
doc = Document(filepath)
return '\n'.join([paragraph.text for paragraph in doc.paragraphs])

def splitText(text):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)
docs = text_splitter.split_text(text)
return docs


def saveEmbedding(embeddings, docs):
for id, dd in enumerate(docs):
y = embeddings.embed_documents([dd])
j = np.array(y, dtype=np.float32).tobytes()
client.hset(f'oakDoc:{id}', mapping={'embed': j, 'text': docs[id] } )


doc = read_word_document(<file_name>);
splitDoc = splitText(doc)
saveEmbedding(embeddings, splitDoc)

index = "idx:test"



def createIndex(index):
start_time = time.time()
res = client.ft(index).create_index([
VectorField(
"embed",
"HNSW",
{
"TYPE": "FLOAT32",
"DIM": 1536,
"DISTANCE_METRIC": "COSINE",
}
),
TextField("text")

]
)
end_time = time.time()
print("Time taken to create index", end_time-start_time)


try:
info = client.ft(index).info()
print("index already exists")
# FT.INFO "idx:oakUserGuide1"
print("documents in index",info["num_docs"])
except:
print("creating index")
createIndex(index)

#Querying vector embedding




def Querying(query):
start_time = time.time()
embedding_data = embeddings.embed_documents([query])


embedded_bytes = np.array(embedding_data[0], dtype=np.float32).tobytes()
q = Query('(*)=>[KNN 5 @embed $vec_param AS vector_score]')
.paging(0, 5)
.sort_by("vector_score")
.return_fields("text")
.dialect(2)

paramsDict = {
"vec_param": embedded_bytes
}
results = client.ft(index).search(q, query_params= paramsDict)
end_time = time.time()
print("Time taken to query index", end_time-start_time)
return results

context = Querying("Features of oak")

Cost comparison:

Memory db allows different nodetype and different configurations and on demand pricing options.
https://aws.amazon.com/memorydb/pricing/
Considering the following configuration for a production setup:

  • db.r7g.xlarge
  • 26.32 GiB memory
  • Up to 12.5 Gigabit network performance

730 hours in a month * 0.67 USD = 489.1 USD

For the same configuration, if the node is reserved then month cost would be = ~$306.60

Note that additional costs could be incurred if there is a need to snapshot the instance and save the entire cluster.

If you enjoyed this article, do show your support with a clap. Would mean a lot and it helps other people see the story too.

--

--

ashwin naik
ashwin naik

Written by ashwin naik

I am a Front End Engineer at heart and love to write on topics related to UI, fundamental development and new learnings.

No responses yet