aws_bedrock.py•3.55 kB
import json
import os
from typing import Literal
try:
import boto3
except ImportError as err:
raise ImportError(
"The 'boto3' library is required. Please install it using 'pip install boto3'."
) from err
import numpy as np
from selfmemory.configs.embeddings.base import BaseEmbedderConfig
from selfmemory.embeddings.base import EmbeddingBase
class AWSBedrockEmbedding(EmbeddingBase):
"""AWS Bedrock embedding implementation.
This class uses AWS Bedrock's embedding models.
"""
def __init__(self, config: BaseEmbedderConfig | None = None):
super().__init__(config)
self.config.model = self.config.model or "amazon.titan-embed-text-v1"
# Get AWS config from environment variables or use defaults
aws_access_key = os.environ.get("AWS_ACCESS_KEY_ID", "")
aws_secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "")
aws_session_token = os.environ.get("AWS_SESSION_TOKEN", "")
# Check if AWS config is provided in the config
if hasattr(self.config, "aws_access_key_id"):
aws_access_key = self.config.aws_access_key_id
if hasattr(self.config, "aws_secret_access_key"):
aws_secret_key = self.config.aws_secret_access_key
# AWS region is always set in config - see BaseEmbedderConfig
aws_region = self.config.aws_region or "us-west-2"
self.client = boto3.client(
"bedrock-runtime",
region_name=aws_region,
aws_access_key_id=aws_access_key if aws_access_key else None,
aws_secret_access_key=aws_secret_key if aws_secret_key else None,
aws_session_token=aws_session_token if aws_session_token else None,
)
def _normalize_vector(self, embeddings):
"""Normalize the embedding to a unit vector."""
emb = np.array(embeddings)
norm_emb = emb / np.linalg.norm(emb)
return norm_emb.tolist()
def _get_embedding(self, text):
"""Call out to Bedrock embedding endpoint."""
# Format input body based on the provider
provider = self.config.model.split(".")[0]
input_body = {}
if provider == "cohere":
input_body["input_type"] = "search_document"
input_body["texts"] = [text]
else:
# Amazon and other providers
input_body["inputText"] = text
body = json.dumps(input_body)
try:
response = self.client.invoke_model(
body=body,
modelId=self.config.model,
accept="application/json",
contentType="application/json",
)
response_body = json.loads(response.get("body").read())
if provider == "cohere":
embeddings = response_body.get("embeddings")[0]
else:
embeddings = response_body.get("embedding")
return embeddings
except Exception as e:
raise ValueError(f"Error getting embedding from AWS Bedrock: {e}") from e
def embed(
self, text, memory_action: Literal["add", "search", "update"] | None = None
):
"""
Get the embedding for the given text using AWS Bedrock.
Args:
text (str): The text to embed.
memory_action (optional): The type of embedding to use. Must be one of "add", "search", or "update". Defaults to None.
Returns:
list: The embedding vector.
"""
return self._get_embedding(text)