Skip to main content
Glama
14-code-examples-ai-data.md18.5 kB
# Code Examples - AI & Data ## LLM Integration ### OpenAI API Usage ```python # openai_client.py - OpenAI integration with best practices from openai import OpenAI import os from typing import List, Optional import json client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) def chat_completion( messages: List[dict], model: str = "gpt-4-turbo-preview", temperature: float = 0.7, max_tokens: int = 1000, json_mode: bool = False ) -> str: """ Send chat completion request with error handling """ try: response = client.chat.completions.create( model=model, messages=messages, temperature=temperature, max_tokens=max_tokens, response_format={"type": "json_object"} if json_mode else None ) return response.choices[0].message.content except Exception as e: print(f"OpenAI API error: {e}") raise def structured_output(prompt: str, schema: dict) -> dict: """ Get structured JSON output using function calling """ response = client.chat.completions.create( model="gpt-4-turbo-preview", messages=[{"role": "user", "content": prompt}], tools=[{ "type": "function", "function": { "name": "extract_data", "description": "Extract structured data from text", "parameters": schema } }], tool_choice={"type": "function", "function": {"name": "extract_data"}} ) tool_call = response.choices[0].message.tool_calls[0] return json.loads(tool_call.function.arguments) # Usage example schema = { "type": "object", "properties": { "sentiment": {"type": "string", "enum": ["positive", "negative", "neutral"]}, "confidence": {"type": "number", "minimum": 0, "maximum": 1}, "key_phrases": {"type": "array", "items": {"type": "string"}} }, "required": ["sentiment", "confidence", "key_phrases"] } result = structured_output( "The product exceeded my expectations! Great quality and fast shipping.", schema ) # Output: {"sentiment": "positive", "confidence": 0.95, "key_phrases": ["exceeded expectations", "great quality", "fast shipping"]} ``` ### Anthropic Claude Integration ```python # claude_client.py - Claude API with streaming from anthropic import Anthropic import os from typing import Generator client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) def chat_with_claude( messages: list, system: str = None, max_tokens: int = 1024 ) -> str: """ Simple Claude chat completion """ response = client.messages.create( model="claude-3-opus-20240229", max_tokens=max_tokens, system=system or "You are a helpful assistant.", messages=messages ) return response.content[0].text def stream_response( messages: list, system: str = None ) -> Generator[str, None, None]: """ Stream Claude response for real-time output """ with client.messages.stream( model="claude-3-opus-20240229", max_tokens=1024, system=system or "You are a helpful assistant.", messages=messages ) as stream: for text in stream.text_stream: yield text # Usage with tool use def analyze_with_tools(query: str) -> dict: """ Use Claude's tool use capability """ tools = [ { "name": "search_database", "description": "Search the product database", "input_schema": { "type": "object", "properties": { "query": {"type": "string"}, "limit": {"type": "integer", "default": 10} }, "required": ["query"] } } ] response = client.messages.create( model="claude-3-opus-20240229", max_tokens=1024, tools=tools, messages=[{"role": "user", "content": query}] ) # Handle tool use if response.stop_reason == "tool_use": tool_use = next(block for block in response.content if block.type == "tool_use") # Execute tool and continue conversation return {"tool": tool_use.name, "input": tool_use.input} return {"response": response.content[0].text} ``` --- ## RAG Implementation ### Vector Search with ChromaDB ```python # rag_pipeline.py - Complete RAG implementation import chromadb from chromadb.utils import embedding_functions from langchain.text_splitter import RecursiveCharacterTextSplitter from typing import List, Dict, Any import hashlib class RAGPipeline: def __init__(self, collection_name: str = "documents"): # Initialize ChromaDB with OpenAI embeddings self.client = chromadb.PersistentClient(path="./chroma_db") self.embedding_fn = embedding_functions.OpenAIEmbeddingFunction( api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small" ) self.collection = self.client.get_or_create_collection( name=collection_name, embedding_function=self.embedding_fn ) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, separators=["\n\n", "\n", ". ", " ", ""] ) def add_documents(self, documents: List[Dict[str, Any]]) -> int: """ Add documents to the vector store """ all_chunks = [] all_ids = [] all_metadatas = [] for doc in documents: chunks = self.text_splitter.split_text(doc["content"]) for i, chunk in enumerate(chunks): chunk_id = hashlib.md5(f"{doc['id']}_{i}".encode()).hexdigest() all_chunks.append(chunk) all_ids.append(chunk_id) all_metadatas.append({ "source": doc.get("source", "unknown"), "doc_id": doc["id"], "chunk_index": i }) self.collection.add( documents=all_chunks, ids=all_ids, metadatas=all_metadatas ) return len(all_chunks) def search(self, query: str, n_results: int = 5, filter: dict = None) -> List[Dict]: """ Search for relevant documents """ results = self.collection.query( query_texts=[query], n_results=n_results, where=filter ) return [ { "content": doc, "metadata": meta, "distance": dist } for doc, meta, dist in zip( results["documents"][0], results["metadatas"][0], results["distances"][0] ) ] def generate_response(self, query: str, context_docs: List[Dict]) -> str: """ Generate response using retrieved context """ context = "\n\n".join([doc["content"] for doc in context_docs]) prompt = f"""Based on the following context, answer the question. If the answer is not in the context, say "I don't have enough information." Context: {context} Question: {query} Answer:""" response = client.chat.completions.create( model="gpt-4-turbo-preview", messages=[{"role": "user", "content": prompt}], temperature=0.3 ) return response.choices[0].message.content def query(self, question: str) -> Dict[str, Any]: """ Full RAG pipeline: search + generate """ # Retrieve relevant documents docs = self.search(question, n_results=5) # Generate response answer = self.generate_response(question, docs) return { "answer": answer, "sources": [doc["metadata"]["source"] for doc in docs], "context_used": len(docs) } ``` --- ## Data Processing ### Pandas Data Pipeline ```python # data_pipeline.py - Data processing with pandas import pandas as pd import numpy as np from typing import List, Dict, Any from functools import reduce class DataPipeline: def __init__(self, df: pd.DataFrame): self.df = df.copy() self.transformations = [] def clean_missing(self, strategy: str = "drop", fill_value: Any = None): """Handle missing values""" if strategy == "drop": self.df = self.df.dropna() elif strategy == "fill": self.df = self.df.fillna(fill_value) elif strategy == "interpolate": self.df = self.df.interpolate() elif strategy == "mode": for col in self.df.columns: self.df[col] = self.df[col].fillna(self.df[col].mode()[0]) return self def remove_outliers(self, columns: List[str], method: str = "iqr", threshold: float = 1.5): """Remove outliers using IQR or Z-score""" for col in columns: if method == "iqr": Q1 = self.df[col].quantile(0.25) Q3 = self.df[col].quantile(0.75) IQR = Q3 - Q1 mask = (self.df[col] >= Q1 - threshold * IQR) & (self.df[col] <= Q3 + threshold * IQR) elif method == "zscore": z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std()) mask = z_scores < threshold self.df = self.df[mask] return self def normalize(self, columns: List[str], method: str = "minmax"): """Normalize numerical columns""" for col in columns: if method == "minmax": min_val = self.df[col].min() max_val = self.df[col].max() self.df[col] = (self.df[col] - min_val) / (max_val - min_val) elif method == "zscore": self.df[col] = (self.df[col] - self.df[col].mean()) / self.df[col].std() return self def encode_categorical(self, columns: List[str], method: str = "onehot"): """Encode categorical variables""" if method == "onehot": self.df = pd.get_dummies(self.df, columns=columns, drop_first=True) elif method == "label": for col in columns: self.df[col] = self.df[col].astype('category').cat.codes return self def create_features(self, transformations: Dict[str, callable]): """Create new features""" for name, func in transformations.items(): self.df[name] = func(self.df) return self def get_result(self) -> pd.DataFrame: return self.df # Usage example df = pd.read_csv("data.csv") pipeline = DataPipeline(df) result = (pipeline .clean_missing(strategy="mode") .remove_outliers(["price", "quantity"], method="iqr") .normalize(["price"], method="minmax") .encode_categorical(["category", "region"]) .create_features({ "total": lambda df: df["price"] * df["quantity"], "log_price": lambda df: np.log1p(df["price"]) }) .get_result() ) ``` --- ## Machine Learning ### Scikit-learn Pipeline ```python # ml_pipeline.py - Complete ML pipeline from sklearn.pipeline import Pipeline from sklearn.compose import ColumnTransformer from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.impute import SimpleImputer from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.model_selection import cross_val_score, GridSearchCV from sklearn.metrics import classification_report, confusion_matrix import joblib def create_preprocessing_pipeline(numeric_features: list, categorical_features: list): """Create preprocessing pipeline""" numeric_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) categorical_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) ]) preprocessor = ColumnTransformer( transformers=[ ('num', numeric_transformer, numeric_features), ('cat', categorical_transformer, categorical_features) ] ) return preprocessor def train_model(X_train, y_train, numeric_features, categorical_features): """Train and tune model""" preprocessor = create_preprocessing_pipeline(numeric_features, categorical_features) # Create full pipeline pipeline = Pipeline(steps=[ ('preprocessor', preprocessor), ('classifier', GradientBoostingClassifier()) ]) # Hyperparameter tuning param_grid = { 'classifier__n_estimators': [100, 200], 'classifier__max_depth': [3, 5, 7], 'classifier__learning_rate': [0.01, 0.1] } grid_search = GridSearchCV( pipeline, param_grid, cv=5, scoring='f1_weighted', n_jobs=-1, verbose=1 ) grid_search.fit(X_train, y_train) print(f"Best parameters: {grid_search.best_params_}") print(f"Best CV score: {grid_search.best_score_:.4f}") return grid_search.best_estimator_ def evaluate_model(model, X_test, y_test): """Evaluate model performance""" y_pred = model.predict(X_test) print("Classification Report:") print(classification_report(y_test, y_pred)) print("\nConfusion Matrix:") print(confusion_matrix(y_test, y_pred)) return y_pred def save_model(model, path: str): """Save trained model""" joblib.dump(model, path) print(f"Model saved to {path}") def load_model(path: str): """Load trained model""" return joblib.load(path) ``` --- ## AI Agents ### LangChain Agent ```python # agent.py - LangChain ReAct agent from langchain.agents import AgentExecutor, create_react_agent from langchain_openai import ChatOpenAI from langchain.tools import Tool, StructuredTool from langchain import hub from pydantic import BaseModel, Field from typing import Optional # Define tools def search_database(query: str) -> str: """Search the product database""" # Simulated database search return f"Found 5 products matching '{query}': Product A, B, C, D, E" def get_weather(city: str) -> str: """Get current weather for a city""" # Simulated weather API return f"Weather in {city}: 22°C, Sunny" class CalculatorInput(BaseModel): expression: str = Field(description="Mathematical expression to evaluate") def calculator(expression: str) -> str: """Evaluate a mathematical expression""" try: result = eval(expression) return str(result) except Exception as e: return f"Error: {e}" # Create tools tools = [ Tool( name="search_database", func=search_database, description="Search the product database. Input should be a search query." ), Tool( name="get_weather", func=get_weather, description="Get current weather for a city. Input should be city name." ), StructuredTool.from_function( func=calculator, name="calculator", description="Evaluate mathematical expressions", args_schema=CalculatorInput ) ] def create_agent(): """Create a ReAct agent""" llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0) # Get the ReAct prompt prompt = hub.pull("hwchase17/react") # Create agent agent = create_react_agent(llm, tools, prompt) # Create executor agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, handle_parsing_errors=True, max_iterations=5 ) return agent_executor # Usage agent = create_agent() result = agent.invoke({"input": "What's the weather in Tokyo and find products related to umbrellas"}) print(result["output"]) ``` ### Multi-Agent System ```python # multi_agent.py - Multi-agent collaboration from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated, List import operator class AgentState(TypedDict): messages: Annotated[List[dict], operator.add] current_agent: str task_complete: bool def create_researcher_agent(): """Agent specialized in research""" def research(state: AgentState) -> AgentState: # Research logic messages = state["messages"] query = messages[-1]["content"] # Simulated research result = f"Research findings for: {query}" return { "messages": [{"role": "researcher", "content": result}], "current_agent": "analyst", "task_complete": False } return research def create_analyst_agent(): """Agent specialized in analysis""" def analyze(state: AgentState) -> AgentState: messages = state["messages"] research_data = messages[-1]["content"] # Simulated analysis result = f"Analysis of: {research_data}" return { "messages": [{"role": "analyst", "content": result}], "current_agent": "writer", "task_complete": False } return analyze def create_writer_agent(): """Agent specialized in writing""" def write(state: AgentState) -> AgentState: messages = state["messages"] analysis = messages[-1]["content"] # Simulated writing result = f"Final report based on: {analysis}" return { "messages": [{"role": "writer", "content": result}], "current_agent": "done", "task_complete": True } return write def create_multi_agent_graph(): """Create multi-agent workflow""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("researcher", create_researcher_agent()) workflow.add_node("analyst", create_analyst_agent()) workflow.add_node("writer", create_writer_agent()) # Add edges workflow.add_edge("researcher", "analyst") workflow.add_edge("analyst", "writer") workflow.add_edge("writer", END) # Set entry point workflow.set_entry_point("researcher") return workflow.compile() # Usage graph = create_multi_agent_graph() result = graph.invoke({ "messages": [{"role": "user", "content": "Research AI trends in 2024"}], "current_agent": "researcher", "task_complete": False }) ```

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seanshin0214/persona-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server