diff --git a/example/README_elasticsearch.md b/example/README_elasticsearch.md new file mode 100644 index 0000000..41d6784 --- /dev/null +++ b/example/README_elasticsearch.md @@ -0,0 +1,157 @@ +# Elasticsearch/OpenSearch Sample Data + +This directory contains sample data files in Elasticsearch bulk index format for testing the OpenSearch integration in EmbeddingBuddy. + +## Files + +### Original NDJSON Files + +- `sample_data.ndjson` - Original sample documents in EmbeddingBuddy format +- `sample_prompts.ndjson` - Original sample prompts in EmbeddingBuddy format + +### Elasticsearch Bulk Files + +- `sample_data_es_bulk.ndjson` - Documents in ES bulk format (index: "embeddings") +- `sample_prompts_es_bulk.ndjson` - Prompts in ES bulk format (index: "prompts") + +## Usage + +### 1. Index the data using curl + +```bash +# Index main documents +curl -X POST "localhost:9200/_bulk" \ + -H "Content-Type: application/x-ndjson" \ + --data-binary @sample_data_es_bulk.ndjson + +# Index prompts +curl -X POST "localhost:9200/_bulk" \ + -H "Content-Type: application/x-ndjson" \ + --data-binary @sample_prompts_es_bulk.ndjson +``` + +### 2. Create proper mappings (recommended) + +First create the index with proper dense_vector mapping: + +```bash +# Create embeddings index with dense_vector mapping +curl -X PUT "localhost:9200/embeddings" \ + -H "Content-Type: application/json" \ + -d '{ + "settings": { + "index.knn": true + }, + "mappings": { + "properties": { + "id": {"type": "keyword"}, + "embedding": { + "type": "knn_vector", + "dimension": 8, + "method": { + "engine": "lucene", + "space_type": "cosinesimil", + "name": "hnsw", + "parameters": {} + } + }, + "text": {"type": "text"}, + "category": {"type": "keyword"}, + "subcategory": {"type": "keyword"}, + "tags": {"type": "keyword"} + } + } + }' + +# Create dense vector index with alternative field names +curl -X PUT "localhost:9200/prompts" \ + -H "Content-Type: application/json" \ + -d '{ + "settings": { + "index.knn": true + }, + "mappings": { + "properties": { + "id": {"type": "keyword"}, + "embedding": { + "type": "knn_vector", + "dimension": 8, + "method": { + "engine": "lucene", + "space_type": "cosinesimil", + "name": "hnsw", + "parameters": {} + } + }, + "text": {"type": "text"}, + "category": {"type": "keyword"}, + "subcategory": {"type": "keyword"}, + "tags": {"type": "keyword"} + } + } + }' +``` + +Then index the data using the bulk files above. + +### 3. Test in EmbeddingBuddy + +#### For "embeddings" index + +- **OpenSearch URL**: `http://localhost:9200` +- **Index Name**: `embeddings` +- **Field Mapping**: + - Embedding Field: `embedding` + - Text Field: `text` + - ID Field: `id` + - Category Field: `category` + - Subcategory Field: `subcategory` + - Tags Field: `tags` + +#### For "embeddings-dense" index (alternative field names) + +- **OpenSearch URL**: `http://localhost:9200` +- **Index Name**: `embeddings-dense` +- **Field Mapping**: + - Embedding Field: `vector` + - Text Field: `content` + - ID Field: `doc_id` + - Category Field: `type` + - Subcategory Field: `subtopic` + - Tags Field: `keywords` + +## Data Structure + +### Original Format (from NDJSON files) + +```json +{ + "id": "doc_001", + "embedding": [0.2, -0.1, 0.8, 0.3, -0.5, 0.7, 0.1, -0.3], + "text": "Machine learning algorithms are transforming healthcare...", + "category": "technology", + "subcategory": "healthcare", + "tags": ["ai", "medicine", "prediction"] +} +``` + +### ES Bulk Format + +```json +{"index": {"_index": "embeddings", "_id": "doc_001"}} +{"id": "doc_001", "embedding": [...], "text": "...", "category": "...", ...} +``` + +### Alternative Field Names (dense vector format) + +```json +{"index": {"_index": "embeddings-dense", "_id": "doc_001"}} +{"doc_id": "doc_001", "vector": [...], "content": "...", "type": "...", ...} +``` + +## Notes + +- All embedding vectors are 8-dimensional for these sample files +- The alternative format demonstrates how EmbeddingBuddy's field mapping handles different field names +- For production use, you may want larger embedding dimensions (e.g., 384, 768, 1536) +- The `dense_vector` field type in Elasticsearch/OpenSearch enables vector similarity search diff --git a/example/sample_data_es_bulk.ndjson b/example/sample_data_es_bulk.ndjson new file mode 100644 index 0000000..e8246b7 --- /dev/null +++ b/example/sample_data_es_bulk.ndjson @@ -0,0 +1,40 @@ +{"index": {"_index": "embeddings", "_id": "doc_001"}} +{"id": "doc_001", "embedding": [0.2, -0.1, 0.8, 0.3, -0.5, 0.7, 0.1, -0.3], "text": "Machine learning algorithms are transforming healthcare by enabling predictive analytics and personalized medicine.", "category": "technology", "subcategory": "healthcare", "tags": ["ai", "medicine", "prediction"]} +{"index": {"_index": "embeddings", "_id": "doc_002"}} +{"id": "doc_002", "embedding": [0.1, 0.4, -0.2, 0.6, 0.3, -0.4, 0.8, 0.2], "text": "Climate change poses significant challenges to global food security and agricultural sustainability.", "category": "environment", "subcategory": "agriculture", "tags": ["climate", "food", "sustainability"]} +{"index": {"_index": "embeddings", "_id": "doc_003"}} +{"id": "doc_003", "embedding": [-0.3, 0.7, 0.1, -0.2, 0.9, 0.4, -0.1, 0.5], "text": "The rise of electric vehicles is reshaping the automotive industry and urban transportation systems.", "category": "technology", "subcategory": "automotive", "tags": ["electric", "transport", "urban"]} +{"index": {"_index": "embeddings", "_id": "doc_004"}} +{"id": "doc_004", "embedding": [0.5, -0.6, 0.3, 0.8, -0.2, 0.1, 0.7, -0.4], "text": "Renewable energy sources like solar and wind are becoming increasingly cost-competitive with fossil fuels.", "category": "environment", "subcategory": "energy", "tags": ["renewable", "solar", "wind"]} +{"index": {"_index": "embeddings", "_id": "doc_005"}} +{"id": "doc_005", "embedding": [0.8, 0.2, -0.5, 0.1, 0.6, -0.3, 0.4, 0.9], "text": "Financial markets are experiencing volatility due to geopolitical tensions and inflation concerns.", "category": "finance", "subcategory": "markets", "tags": ["volatility", "inflation", "geopolitics"]} +{"index": {"_index": "embeddings", "_id": "doc_006"}} +{"id": "doc_006", "embedding": [-0.1, 0.5, 0.7, -0.4, 0.2, 0.8, -0.6, 0.3], "text": "Quantum computing research is advancing rapidly with potential applications in cryptography and drug discovery.", "category": "technology", "subcategory": "research", "tags": ["quantum", "cryptography", "research"]} +{"index": {"_index": "embeddings", "_id": "doc_007"}} +{"id": "doc_007", "embedding": [0.4, -0.3, 0.6, 0.7, -0.8, 0.2, 0.5, -0.1], "text": "Ocean pollution from plastic waste is threatening marine ecosystems and biodiversity worldwide.", "category": "environment", "subcategory": "marine", "tags": ["pollution", "plastic", "marine"]} +{"index": {"_index": "embeddings", "_id": "doc_008"}} +{"id": "doc_008", "embedding": [0.3, 0.8, -0.2, 0.5, 0.1, -0.7, 0.6, 0.4], "text": "Artificial intelligence is revolutionizing customer service through chatbots and automated support systems.", "category": "technology", "subcategory": "customer_service", "tags": ["ai", "chatbots", "automation"]} +{"index": {"_index": "embeddings", "_id": "doc_009"}} +{"id": "doc_009", "embedding": [-0.5, 0.3, 0.9, -0.1, 0.7, 0.4, -0.2, 0.8], "text": "Global supply chains are being redesigned for resilience after pandemic-related disruptions.", "category": "business", "subcategory": "logistics", "tags": ["supply_chain", "pandemic", "resilience"]} +{"index": {"_index": "embeddings", "_id": "doc_010"}} +{"id": "doc_010", "embedding": [0.7, -0.4, 0.2, 0.9, -0.3, 0.6, 0.1, -0.8], "text": "Space exploration missions are expanding our understanding of the solar system and potential for life.", "category": "science", "subcategory": "space", "tags": ["space", "exploration", "life"]} +{"index": {"_index": "embeddings", "_id": "doc_011"}} +{"id": "doc_011", "embedding": [-0.2, 0.6, 0.4, -0.7, 0.8, 0.3, -0.5, 0.1], "text": "Cryptocurrency adoption is growing among institutional investors despite regulatory uncertainties.", "category": "finance", "subcategory": "crypto", "tags": ["cryptocurrency", "institutional", "regulation"]} +{"index": {"_index": "embeddings", "_id": "doc_012"}} +{"id": "doc_012", "embedding": [0.6, 0.1, -0.8, 0.4, 0.5, -0.2, 0.9, -0.3], "text": "Remote work technologies are transforming traditional office environments and work-life balance.", "category": "technology", "subcategory": "workplace", "tags": ["remote", "work", "balance"]} +{"index": {"_index": "embeddings", "_id": "doc_013"}} +{"id": "doc_013", "embedding": [0.1, -0.7, 0.5, 0.8, -0.4, 0.3, 0.2, 0.6], "text": "Gene therapy breakthroughs are offering new hope for treating previously incurable genetic diseases.", "category": "science", "subcategory": "medicine", "tags": ["gene_therapy", "genetics", "medicine"]} +{"index": {"_index": "embeddings", "_id": "doc_014"}} +{"id": "doc_014", "embedding": [-0.4, 0.2, 0.7, -0.1, 0.9, -0.6, 0.3, 0.5], "text": "Urban planning is evolving to create more sustainable and livable cities for growing populations.", "category": "environment", "subcategory": "urban", "tags": ["urban_planning", "sustainability", "cities"]} +{"index": {"_index": "embeddings", "_id": "doc_015"}} +{"id": "doc_015", "embedding": [0.9, -0.1, 0.3, 0.6, -0.5, 0.8, -0.2, 0.4], "text": "Social media platforms are implementing new policies to combat misinformation and protect user privacy.", "category": "technology", "subcategory": "social_media", "tags": ["social_media", "misinformation", "privacy"]} +{"index": {"_index": "embeddings", "_id": "doc_016"}} +{"id": "doc_016", "embedding": [-0.3, 0.8, -0.1, 0.4, 0.7, -0.5, 0.6, -0.9], "text": "Educational technology is personalizing learning experiences and improving student outcomes.", "category": "education", "subcategory": "technology", "tags": ["education", "personalization", "technology"]} +{"index": {"_index": "embeddings", "_id": "doc_017"}} +{"id": "doc_017", "embedding": [0.5, 0.3, -0.6, 0.2, 0.8, 0.1, -0.4, 0.7], "text": "Biodiversity conservation efforts are critical for maintaining ecosystem balance and preventing species extinction.", "category": "environment", "subcategory": "conservation", "tags": ["biodiversity", "conservation", "extinction"]} +{"index": {"_index": "embeddings", "_id": "doc_018"}} +{"id": "doc_018", "embedding": [0.2, -0.8, 0.4, 0.7, -0.1, 0.5, 0.9, -0.3], "text": "Healthcare systems are adopting telemedicine to improve access and reduce costs for patients.", "category": "technology", "subcategory": "healthcare", "tags": ["telemedicine", "healthcare", "access"]} +{"index": {"_index": "embeddings", "_id": "doc_019"}} +{"id": "doc_019", "embedding": [-0.7, 0.4, 0.8, -0.2, 0.3, 0.6, -0.1, 0.9], "text": "Autonomous vehicles are being tested extensively with promises of safer and more efficient transportation.", "category": "technology", "subcategory": "automotive", "tags": ["autonomous", "safety", "efficiency"]} +{"index": {"_index": "embeddings", "_id": "doc_020"}} +{"id": "doc_020", "embedding": [0.4, 0.7, -0.3, 0.9, -0.6, 0.2, 0.5, -0.1], "text": "Mental health awareness is increasing with new approaches to therapy and workplace wellness programs.", "category": "health", "subcategory": "mental", "tags": ["mental_health", "therapy", "wellness"]} diff --git a/example/sample_prompts_es_bulk.ndjson b/example/sample_prompts_es_bulk.ndjson new file mode 100644 index 0000000..9e1e5b7 --- /dev/null +++ b/example/sample_prompts_es_bulk.ndjson @@ -0,0 +1,20 @@ +{"index": {"_index": "prompts", "_id": "prompt_001"}} +{"id": "prompt_001", "embedding": [0.15, -0.28, 0.65, 0.42, -0.11, 0.33, 0.78, -0.52], "text": "Find articles about machine learning applications", "category": "search", "subcategory": "technology", "tags": ["AI", "research"]} +{"index": {"_index": "prompts", "_id": "prompt_002"}} +{"id": "prompt_002", "embedding": [0.72, 0.18, -0.35, 0.51, 0.09, -0.44, 0.27, 0.63], "text": "Show me product reviews for smartphones", "category": "search", "subcategory": "product", "tags": ["mobile", "reviews"]} +{"index": {"_index": "prompts", "_id": "prompt_003"}} +{"id": "prompt_003", "embedding": [-0.21, 0.59, 0.34, -0.67, 0.45, 0.12, -0.38, 0.76], "text": "What are the latest political developments?", "category": "search", "subcategory": "news", "tags": ["politics", "current events"]} +{"index": {"_index": "prompts", "_id": "prompt_004"}} +{"id": "prompt_004", "embedding": [0.48, -0.15, 0.72, 0.31, -0.58, 0.24, 0.67, -0.39], "text": "Summarize recent tech industry trends", "category": "analysis", "subcategory": "technology", "tags": ["tech", "trends", "summary"]} +{"index": {"_index": "prompts", "_id": "prompt_005"}} +{"id": "prompt_005", "embedding": [-0.33, 0.47, -0.62, 0.28, 0.71, -0.18, 0.54, 0.35], "text": "Compare different smartphone models", "category": "analysis", "subcategory": "product", "tags": ["comparison", "mobile", "evaluation"]} +{"index": {"_index": "prompts", "_id": "prompt_006"}} +{"id": "prompt_006", "embedding": [0.64, 0.21, 0.39, -0.45, 0.13, 0.58, -0.27, 0.74], "text": "Analyze voter sentiment on recent policies", "category": "analysis", "subcategory": "politics", "tags": ["sentiment", "politics", "analysis"]} +{"index": {"_index": "prompts", "_id": "prompt_007"}} +{"id": "prompt_007", "embedding": [0.29, -0.43, 0.56, 0.68, -0.22, 0.37, 0.14, -0.61], "text": "Generate a summary of machine learning research", "category": "generation", "subcategory": "technology", "tags": ["AI", "research", "summary"]} +{"index": {"_index": "prompts", "_id": "prompt_008"}} +{"id": "prompt_008", "embedding": [-0.17, 0.52, -0.48, 0.36, 0.74, -0.29, 0.61, 0.18], "text": "Create a product recommendation report", "category": "generation", "subcategory": "product", "tags": ["recommendation", "report", "analysis"]} +{"index": {"_index": "prompts", "_id": "prompt_009"}} +{"id": "prompt_009", "embedding": [0.55, 0.08, 0.41, -0.37, 0.26, 0.69, -0.14, 0.58], "text": "Write a news brief on election updates", "category": "generation", "subcategory": "news", "tags": ["election", "news", "brief"]} +{"index": {"_index": "prompts", "_id": "prompt_010"}} +{"id": "prompt_010", "embedding": [0.23, -0.59, 0.47, 0.61, -0.35, 0.18, 0.72, -0.26], "text": "Explain how neural networks work", "category": "explanation", "subcategory": "technology", "tags": ["AI", "education", "neural networks"]} diff --git a/pyproject.toml b/pyproject.toml index 6acd52c..d3fbf7b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "embeddingbuddy" -version = "0.2.0" +version = "0.3.0" description = "A Python Dash application for interactive exploration and visualization of embedding vectors through dimensionality reduction techniques." readme = "README.md" requires-python = ">=3.11" @@ -15,6 +15,7 @@ dependencies = [ "numba>=0.56.4", "openTSNE>=1.0.0", "mypy>=1.17.1", + "opensearch-py>=3.0.0", ] [project.optional-dependencies] diff --git a/src/embeddingbuddy/app.py b/src/embeddingbuddy/app.py index bf326d8..d850d03 100644 --- a/src/embeddingbuddy/app.py +++ b/src/embeddingbuddy/app.py @@ -10,6 +10,9 @@ from .ui.callbacks.interactions import InteractionCallbacks def create_app(): app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) + # Allow callbacks to components that are dynamically created in tabs + app.config.suppress_callback_exceptions = True + layout_manager = AppLayout() app.layout = layout_manager.create_layout() diff --git a/src/embeddingbuddy/config/settings.py b/src/embeddingbuddy/config/settings.py index 8839430..82cdac0 100644 --- a/src/embeddingbuddy/config/settings.py +++ b/src/embeddingbuddy/config/settings.py @@ -73,6 +73,12 @@ class AppSettings: HOST = os.getenv("EMBEDDINGBUDDY_HOST", "127.0.0.1") PORT = int(os.getenv("EMBEDDINGBUDDY_PORT", "8050")) + # OpenSearch Configuration + OPENSEARCH_DEFAULT_SIZE = 100 + OPENSEARCH_SAMPLE_SIZE = 5 + OPENSEARCH_CONNECTION_TIMEOUT = 30 + OPENSEARCH_VERIFY_CERTS = True + # Bootstrap Theme EXTERNAL_STYLESHEETS = [ "https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" diff --git a/src/embeddingbuddy/data/processor.py b/src/embeddingbuddy/data/processor.py index a9eb683..e7f82b2 100644 --- a/src/embeddingbuddy/data/processor.py +++ b/src/embeddingbuddy/data/processor.py @@ -1,6 +1,7 @@ import numpy as np from typing import List, Optional, Tuple from ..models.schemas import Document, ProcessedData +from ..models.field_mapper import FieldMapper from .parser import NDJSONParser @@ -26,6 +27,42 @@ class DataProcessor: except Exception as e: return ProcessedData(documents=[], embeddings=np.array([]), error=str(e)) + def process_opensearch_data( + self, raw_documents: List[dict], field_mapping + ) -> ProcessedData: + """Process raw OpenSearch documents using field mapping.""" + try: + # Transform documents using field mapping + transformed_docs = FieldMapper.transform_documents( + raw_documents, field_mapping + ) + + # Parse transformed documents + documents = [] + for doc_dict in transformed_docs: + try: + # Ensure required fields are present with defaults if needed + if "id" not in doc_dict or not doc_dict["id"]: + doc_dict["id"] = f"doc_{len(documents)}" + + doc = Document(**doc_dict) + documents.append(doc) + except Exception: + continue # Skip invalid documents + + if not documents: + return ProcessedData( + documents=[], + embeddings=np.array([]), + error="No valid documents after transformation", + ) + + embeddings = self._extract_embeddings(documents) + return ProcessedData(documents=documents, embeddings=embeddings) + + except Exception as e: + return ProcessedData(documents=[], embeddings=np.array([]), error=str(e)) + def _extract_embeddings(self, documents: List[Document]) -> np.ndarray: if not documents: return np.array([]) diff --git a/src/embeddingbuddy/data/sources/__init__.py b/src/embeddingbuddy/data/sources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/embeddingbuddy/data/sources/opensearch.py b/src/embeddingbuddy/data/sources/opensearch.py new file mode 100644 index 0000000..15e9d7b --- /dev/null +++ b/src/embeddingbuddy/data/sources/opensearch.py @@ -0,0 +1,189 @@ +from typing import Dict, List, Optional, Any, Tuple +import logging +from opensearchpy import OpenSearch +from opensearchpy.exceptions import OpenSearchException + + +logger = logging.getLogger(__name__) + + +class OpenSearchClient: + def __init__(self): + self.client: Optional[OpenSearch] = None + self.connection_info: Optional[Dict[str, Any]] = None + + def connect( + self, + url: str, + username: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + verify_certs: bool = True, + ) -> Tuple[bool, str]: + """ + Connect to OpenSearch instance. + + Returns: + Tuple of (success: bool, message: str) + """ + try: + # Parse URL to extract host and port + if url.startswith("http://") or url.startswith("https://"): + host = url + else: + host = f"https://{url}" + + # Build auth configuration + auth_config = {} + if username and password: + auth_config["http_auth"] = (username, password) + elif api_key: + auth_config["api_key"] = api_key + + # Create client + self.client = OpenSearch([host], verify_certs=verify_certs, **auth_config) + + # Test connection + info = self.client.info() + self.connection_info = { + "url": host, + "cluster_name": info.get("cluster_name", "Unknown"), + "version": info.get("version", {}).get("number", "Unknown"), + } + + return ( + True, + f"Connected to {info.get('cluster_name', 'OpenSearch cluster')}", + ) + + except OpenSearchException as e: + logger.error(f"OpenSearch connection error: {e}") + return False, f"Connection failed: {str(e)}" + except Exception as e: + logger.error(f"Unexpected error connecting to OpenSearch: {e}") + return False, f"Unexpected error: {str(e)}" + + def get_index_mapping(self, index_name: str) -> Tuple[bool, Optional[Dict], str]: + """ + Get the mapping for a specific index. + + Returns: + Tuple of (success: bool, mapping: Dict or None, message: str) + """ + if not self.client: + return False, None, "Not connected to OpenSearch" + + try: + mapping = self.client.indices.get_mapping(index=index_name) + return True, mapping, "Mapping retrieved successfully" + except OpenSearchException as e: + logger.error(f"Error getting mapping for index {index_name}: {e}") + return False, None, f"Failed to get mapping: {str(e)}" + + def analyze_fields(self, index_name: str) -> Tuple[bool, Optional[Dict], str]: + """ + Analyze index fields to detect potential embedding and text fields. + + Returns: + Tuple of (success: bool, analysis: Dict or None, message: str) + """ + success, mapping, message = self.get_index_mapping(index_name) + if not success: + return False, None, message + + try: + # Extract field information from mapping + index_mapping = mapping[index_name]["mappings"]["properties"] + + analysis = { + "vector_fields": [], + "text_fields": [], + "keyword_fields": [], + "numeric_fields": [], + "all_fields": [], + } + + for field_name, field_info in index_mapping.items(): + field_type = field_info.get("type", "unknown") + analysis["all_fields"].append(field_name) + + if field_type == "dense_vector": + analysis["vector_fields"].append( + { + "name": field_name, + "dimension": field_info.get("dimension", "unknown"), + } + ) + elif field_type == "text": + analysis["text_fields"].append(field_name) + elif field_type == "keyword": + analysis["keyword_fields"].append(field_name) + elif field_type in ["integer", "long", "float", "double"]: + analysis["numeric_fields"].append(field_name) + + return True, analysis, "Field analysis completed" + + except Exception as e: + logger.error(f"Error analyzing fields: {e}") + return False, None, f"Field analysis failed: {str(e)}" + + def fetch_sample_data( + self, index_name: str, size: int = 5 + ) -> Tuple[bool, List[Dict], str]: + """ + Fetch sample documents from the index. + + Returns: + Tuple of (success: bool, documents: List[Dict], message: str) + """ + if not self.client: + return False, [], "Not connected to OpenSearch" + + try: + response = self.client.search( + index=index_name, body={"query": {"match_all": {}}, "size": size} + ) + + documents = [hit["_source"] for hit in response["hits"]["hits"]] + return True, documents, f"Retrieved {len(documents)} sample documents" + + except OpenSearchException as e: + logger.error(f"Error fetching sample data: {e}") + return False, [], f"Failed to fetch sample data: {str(e)}" + + def fetch_data( + self, index_name: str, size: int = 100 + ) -> Tuple[bool, List[Dict], str]: + """ + Fetch documents from the index. + + Returns: + Tuple of (success: bool, documents: List[Dict], message: str) + """ + if not self.client: + return False, [], "Not connected to OpenSearch" + + try: + response = self.client.search( + index=index_name, body={"query": {"match_all": {}}, "size": size} + ) + + documents = [hit["_source"] for hit in response["hits"]["hits"]] + total_hits = response["hits"]["total"]["value"] + + message = f"Retrieved {len(documents)} documents from {total_hits} total" + return True, documents, message + + except OpenSearchException as e: + logger.error(f"Error fetching data: {e}") + return False, [], f"Failed to fetch data: {str(e)}" + + def disconnect(self): + """Disconnect from OpenSearch.""" + if self.client: + self.client = None + self.connection_info = None + + def is_connected(self) -> bool: + """Check if connected to OpenSearch.""" + return self.client is not None diff --git a/src/embeddingbuddy/models/field_mapper.py b/src/embeddingbuddy/models/field_mapper.py new file mode 100644 index 0000000..b2dc91d --- /dev/null +++ b/src/embeddingbuddy/models/field_mapper.py @@ -0,0 +1,254 @@ +from dataclasses import dataclass +from typing import Dict, List, Optional, Any +import logging + + +logger = logging.getLogger(__name__) + + +@dataclass +class FieldMapping: + """Configuration for mapping OpenSearch fields to standard format.""" + + embedding_field: str + text_field: str + id_field: Optional[str] = None + category_field: Optional[str] = None + subcategory_field: Optional[str] = None + tags_field: Optional[str] = None + + +class FieldMapper: + """Handles field mapping and data transformation from OpenSearch to standard format.""" + + @staticmethod + def suggest_mappings(field_analysis: Dict) -> Dict[str, List[str]]: + """ + Suggest field mappings based on field analysis. + + Each dropdown will show ALL available fields, but ordered by relevance + with the most likely candidates first. + + Args: + field_analysis: Analysis results from OpenSearchClient.analyze_fields + + Returns: + Dictionary with suggested fields for each mapping (ordered by relevance) + """ + all_fields = field_analysis.get("all_fields", []) + vector_fields = [vf["name"] for vf in field_analysis.get("vector_fields", [])] + text_fields = field_analysis.get("text_fields", []) + keyword_fields = field_analysis.get("keyword_fields", []) + + # Helper function to create ordered suggestions + def create_ordered_suggestions(primary_candidates, all_available_fields): + # Start with primary candidates, then add all other fields + ordered = [] + # Add primary candidates first + for field in primary_candidates: + if field in all_available_fields and field not in ordered: + ordered.append(field) + # Add remaining fields + for field in all_available_fields: + if field not in ordered: + ordered.append(field) + return ordered + + suggestions = {} + + # Embedding field suggestions (vector fields first, then name-based candidates, then all fields) + embedding_candidates = vector_fields.copy() + # Add fields that likely contain embeddings based on name + embedding_name_candidates = [ + f + for f in all_fields + if any( + keyword in f.lower() + for keyword in ["embedding", "embeddings", "vector", "vectors", "embed"] + ) + ] + # Add name-based candidates that aren't already in vector_fields + for candidate in embedding_name_candidates: + if candidate not in embedding_candidates: + embedding_candidates.append(candidate) + suggestions["embedding"] = create_ordered_suggestions( + embedding_candidates, all_fields + ) + + # Text field suggestions (text fields first, then all fields) + text_candidates = text_fields.copy() + suggestions["text"] = create_ordered_suggestions(text_candidates, all_fields) + + # ID field suggestions (ID-like fields first, then all fields) + id_candidates = [ + f + for f in keyword_fields + if any(keyword in f.lower() for keyword in ["id", "_id", "doc", "document"]) + ] + id_candidates.append("_id") # _id is always available + suggestions["id"] = create_ordered_suggestions(id_candidates, all_fields) + + # Category field suggestions (category-like fields first, then all fields) + category_candidates = [ + f + for f in keyword_fields + if any( + keyword in f.lower() + for keyword in ["category", "class", "type", "label"] + ) + ] + suggestions["category"] = create_ordered_suggestions( + category_candidates, all_fields + ) + + # Subcategory field suggestions (subcategory-like fields first, then all fields) + subcategory_candidates = [ + f + for f in keyword_fields + if any( + keyword in f.lower() + for keyword in ["subcategory", "subclass", "subtype", "subtopic"] + ) + ] + suggestions["subcategory"] = create_ordered_suggestions( + subcategory_candidates, all_fields + ) + + # Tags field suggestions (tag-like fields first, then all fields) + tags_candidates = [ + f + for f in keyword_fields + if any( + keyword in f.lower() + for keyword in ["tag", "tags", "keyword", "keywords"] + ) + ] + suggestions["tags"] = create_ordered_suggestions(tags_candidates, all_fields) + + return suggestions + + @staticmethod + def validate_mapping( + mapping: FieldMapping, available_fields: List[str] + ) -> List[str]: + """ + Validate that the field mapping is correct. + + Returns: + List of validation errors (empty if valid) + """ + errors = [] + + # Required fields validation + if not mapping.embedding_field: + errors.append("Embedding field is required") + elif mapping.embedding_field not in available_fields: + errors.append( + f"Embedding field '{mapping.embedding_field}' not found in index" + ) + + if not mapping.text_field: + errors.append("Text field is required") + elif mapping.text_field not in available_fields: + errors.append(f"Text field '{mapping.text_field}' not found in index") + + # Optional fields validation + optional_fields = { + "id_field": mapping.id_field, + "category_field": mapping.category_field, + "subcategory_field": mapping.subcategory_field, + "tags_field": mapping.tags_field, + } + + for field_name, field_value in optional_fields.items(): + if field_value and field_value not in available_fields: + errors.append( + f"Field '{field_value}' for {field_name} not found in index" + ) + + return errors + + @staticmethod + def transform_documents( + documents: List[Dict[str, Any]], mapping: FieldMapping + ) -> List[Dict[str, Any]]: + """ + Transform OpenSearch documents to standard format using field mapping. + + Args: + documents: Raw documents from OpenSearch + mapping: Field mapping configuration + + Returns: + List of transformed documents in standard format + """ + transformed = [] + + for doc in documents: + try: + # Build standard format document + standard_doc = {} + + # Required fields + if mapping.embedding_field in doc: + standard_doc["embedding"] = doc[mapping.embedding_field] + else: + logger.warning( + f"Missing embedding field '{mapping.embedding_field}' in document" + ) + continue + + if mapping.text_field in doc: + standard_doc["text"] = str(doc[mapping.text_field]) + else: + logger.warning( + f"Missing text field '{mapping.text_field}' in document" + ) + continue + + # Optional fields + if mapping.id_field and mapping.id_field in doc: + standard_doc["id"] = str(doc[mapping.id_field]) + + if mapping.category_field and mapping.category_field in doc: + standard_doc["category"] = str(doc[mapping.category_field]) + + if mapping.subcategory_field and mapping.subcategory_field in doc: + standard_doc["subcategory"] = str(doc[mapping.subcategory_field]) + + if mapping.tags_field and mapping.tags_field in doc: + tags = doc[mapping.tags_field] + # Handle both string and list tags + if isinstance(tags, list): + standard_doc["tags"] = [str(tag) for tag in tags] + else: + standard_doc["tags"] = [str(tags)] + + transformed.append(standard_doc) + + except Exception as e: + logger.error(f"Error transforming document: {e}") + continue + + logger.info(f"Transformed {len(transformed)} documents out of {len(documents)}") + return transformed + + @staticmethod + def create_mapping_from_dict(mapping_dict: Dict[str, str]) -> FieldMapping: + """ + Create a FieldMapping from a dictionary. + + Args: + mapping_dict: Dictionary with field mappings + + Returns: + FieldMapping instance + """ + return FieldMapping( + embedding_field=mapping_dict.get("embedding", ""), + text_field=mapping_dict.get("text", ""), + id_field=mapping_dict.get("id") or None, + category_field=mapping_dict.get("category") or None, + subcategory_field=mapping_dict.get("subcategory") or None, + tags_field=mapping_dict.get("tags") or None, + ) diff --git a/src/embeddingbuddy/ui/callbacks/data_processing.py b/src/embeddingbuddy/ui/callbacks/data_processing.py index e0491d3..f739bc7 100644 --- a/src/embeddingbuddy/ui/callbacks/data_processing.py +++ b/src/embeddingbuddy/ui/callbacks/data_processing.py @@ -1,10 +1,15 @@ -from dash import callback, Input, Output, State +from dash import callback, Input, Output, State, no_update from ...data.processor import DataProcessor +from ...data.sources.opensearch import OpenSearchClient +from ...models.field_mapper import FieldMapper +from ...config.settings import AppSettings class DataProcessingCallbacks: def __init__(self): self.processor = DataProcessor() + self.opensearch_client_data = OpenSearchClient() # For data/documents + self.opensearch_client_prompts = OpenSearchClient() # For prompts self._register_callbacks() def _register_callbacks(self): @@ -67,6 +72,397 @@ class DataProcessingCallbacks: "embeddings": processed_data.embeddings.tolist(), } + # OpenSearch callbacks + @callback( + [ + Output("tab-content", "children"), + ], + [Input("data-source-tabs", "active_tab")], + prevent_initial_call=False, + ) + def render_tab_content(active_tab): + from ...ui.components.datasource import DataSourceComponent + + datasource = DataSourceComponent() + + if active_tab == "opensearch-tab": + return [datasource.create_opensearch_tab()] + else: + return [datasource.create_file_upload_tab()] + + # Register callbacks for both data and prompts sections + self._register_opensearch_callbacks("data", self.opensearch_client_data) + self._register_opensearch_callbacks("prompts", self.opensearch_client_prompts) + + # Register collapsible section callbacks + self._register_collapse_callbacks() + + def _register_opensearch_callbacks(self, section_type, opensearch_client): + """Register callbacks for a specific section (data or prompts).""" + + @callback( + Output(f"{section_type}-auth-collapse", "is_open"), + [Input(f"{section_type}-auth-toggle", "n_clicks")], + [State(f"{section_type}-auth-collapse", "is_open")], + prevent_initial_call=True, + ) + def toggle_auth(n_clicks, is_open): + if n_clicks: + return not is_open + return is_open + + @callback( + Output(f"{section_type}-auth-toggle", "children"), + [Input(f"{section_type}-auth-collapse", "is_open")], + prevent_initial_call=False, + ) + def update_auth_button_text(is_open): + return "Hide Authentication" if is_open else "Show Authentication" + + @callback( + [ + Output(f"{section_type}-connection-status", "children"), + Output(f"{section_type}-field-mapping-section", "children"), + Output(f"{section_type}-field-mapping-section", "style"), + Output(f"{section_type}-load-data-section", "style"), + Output(f"{section_type}-load-opensearch-data-btn", "disabled"), + Output(f"{section_type}-embedding-field-dropdown", "options"), + Output(f"{section_type}-text-field-dropdown", "options"), + Output(f"{section_type}-id-field-dropdown", "options"), + Output(f"{section_type}-category-field-dropdown", "options"), + Output(f"{section_type}-subcategory-field-dropdown", "options"), + Output(f"{section_type}-tags-field-dropdown", "options"), + ], + [Input(f"{section_type}-test-connection-btn", "n_clicks")], + [ + State(f"{section_type}-opensearch-url", "value"), + State(f"{section_type}-opensearch-index", "value"), + State(f"{section_type}-opensearch-username", "value"), + State(f"{section_type}-opensearch-password", "value"), + State(f"{section_type}-opensearch-api-key", "value"), + ], + prevent_initial_call=True, + ) + def test_opensearch_connection( + n_clicks, url, index_name, username, password, api_key + ): + if not n_clicks or not url or not index_name: + return ( + no_update, + no_update, + no_update, + no_update, + no_update, + no_update, + no_update, + no_update, + no_update, + no_update, + no_update, + ) + + # Test connection + success, message = opensearch_client.connect( + url=url, + username=username, + password=password, + api_key=api_key, + verify_certs=AppSettings.OPENSEARCH_VERIFY_CERTS, + ) + + if not success: + return ( + self._create_status_alert(f"❌ {message}", "danger"), + [], + {"display": "none"}, + {"display": "none"}, + True, + [], # empty options for hidden dropdowns + [], + [], + [], + [], + [], + ) + + # Analyze fields + success, field_analysis, analysis_message = ( + opensearch_client.analyze_fields(index_name) + ) + + if not success: + return ( + self._create_status_alert(f"❌ {analysis_message}", "danger"), + [], + {"display": "none"}, + {"display": "none"}, + True, + [], # empty options for hidden dropdowns + [], + [], + [], + [], + [], + ) + + # Generate field suggestions + field_suggestions = FieldMapper.suggest_mappings(field_analysis) + + from ...ui.components.datasource import DataSourceComponent + + datasource = DataSourceComponent() + field_mapping_ui = datasource.create_field_mapping_interface( + field_suggestions, section_type + ) + + return ( + self._create_status_alert(f"✅ {message}", "success"), + field_mapping_ui, + {"display": "block"}, + {"display": "block"}, + False, + [ + {"label": field, "value": field} + for field in field_suggestions.get("embedding", []) + ], + [ + {"label": field, "value": field} + for field in field_suggestions.get("text", []) + ], + [ + {"label": field, "value": field} + for field in field_suggestions.get("id", []) + ], + [ + {"label": field, "value": field} + for field in field_suggestions.get("category", []) + ], + [ + {"label": field, "value": field} + for field in field_suggestions.get("subcategory", []) + ], + [ + {"label": field, "value": field} + for field in field_suggestions.get("tags", []) + ], + ) + + # Determine output target based on section type + output_target = ( + "processed-data" if section_type == "data" else "processed-prompts" + ) + + @callback( + [ + Output(output_target, "data", allow_duplicate=True), + Output("opensearch-success-alert", "children", allow_duplicate=True), + Output("opensearch-success-alert", "is_open", allow_duplicate=True), + Output("opensearch-error-alert", "children", allow_duplicate=True), + Output("opensearch-error-alert", "is_open", allow_duplicate=True), + ], + [Input(f"{section_type}-load-opensearch-data-btn", "n_clicks")], + [ + State(f"{section_type}-opensearch-index", "value"), + State(f"{section_type}-opensearch-query-size", "value"), + State(f"{section_type}-embedding-field-dropdown-ui", "value"), + State(f"{section_type}-text-field-dropdown-ui", "value"), + State(f"{section_type}-id-field-dropdown-ui", "value"), + State(f"{section_type}-category-field-dropdown-ui", "value"), + State(f"{section_type}-subcategory-field-dropdown-ui", "value"), + State(f"{section_type}-tags-field-dropdown-ui", "value"), + ], + prevent_initial_call=True, + ) + def load_opensearch_data( + n_clicks, + index_name, + query_size, + embedding_field, + text_field, + id_field, + category_field, + subcategory_field, + tags_field, + ): + if not n_clicks or not index_name or not embedding_field or not text_field: + return no_update, no_update, no_update, no_update, no_update + + try: + # Validate and set query size + if not query_size or query_size < 1: + query_size = AppSettings.OPENSEARCH_DEFAULT_SIZE + elif query_size > 1000: + query_size = 1000 # Cap at reasonable maximum + + # Create field mapping + field_mapping = FieldMapper.create_mapping_from_dict( + { + "embedding": embedding_field, + "text": text_field, + "id": id_field, + "category": category_field, + "subcategory": subcategory_field, + "tags": tags_field, + } + ) + + # Fetch data from OpenSearch + success, raw_documents, message = opensearch_client.fetch_data( + index_name, size=query_size + ) + + if not success: + return ( + no_update, + "", + False, + f"❌ Failed to fetch {section_type}: {message}", + True, + ) + + # Process the data + processed_data = self.processor.process_opensearch_data( + raw_documents, field_mapping + ) + + if processed_data.error: + return ( + {"error": processed_data.error}, + "", + False, + f"❌ {section_type.title()} processing error: {processed_data.error}", + True, + ) + + success_message = f"✅ Successfully loaded {len(processed_data.documents)} {section_type} from OpenSearch" + + # Format for appropriate target (data vs prompts) + if section_type == "data": + return ( + { + "documents": [ + self._document_to_dict(doc) + for doc in processed_data.documents + ], + "embeddings": processed_data.embeddings.tolist(), + }, + success_message, + True, + "", + False, + ) + else: # prompts + return ( + { + "prompts": [ + self._document_to_dict(doc) + for doc in processed_data.documents + ], + "embeddings": processed_data.embeddings.tolist(), + }, + success_message, + True, + "", + False, + ) + + except Exception as e: + return (no_update, "", False, f"❌ Unexpected error: {str(e)}", True) + + # Sync callbacks to update hidden dropdowns from UI dropdowns + @callback( + Output(f"{section_type}-embedding-field-dropdown", "value"), + Input(f"{section_type}-embedding-field-dropdown-ui", "value"), + prevent_initial_call=True, + ) + def sync_embedding_dropdown(value): + return value + + @callback( + Output(f"{section_type}-text-field-dropdown", "value"), + Input(f"{section_type}-text-field-dropdown-ui", "value"), + prevent_initial_call=True, + ) + def sync_text_dropdown(value): + return value + + @callback( + Output(f"{section_type}-id-field-dropdown", "value"), + Input(f"{section_type}-id-field-dropdown-ui", "value"), + prevent_initial_call=True, + ) + def sync_id_dropdown(value): + return value + + @callback( + Output(f"{section_type}-category-field-dropdown", "value"), + Input(f"{section_type}-category-field-dropdown-ui", "value"), + prevent_initial_call=True, + ) + def sync_category_dropdown(value): + return value + + @callback( + Output(f"{section_type}-subcategory-field-dropdown", "value"), + Input(f"{section_type}-subcategory-field-dropdown-ui", "value"), + prevent_initial_call=True, + ) + def sync_subcategory_dropdown(value): + return value + + @callback( + Output(f"{section_type}-tags-field-dropdown", "value"), + Input(f"{section_type}-tags-field-dropdown-ui", "value"), + prevent_initial_call=True, + ) + def sync_tags_dropdown(value): + return value + + def _register_collapse_callbacks(self): + """Register callbacks for collapsible sections.""" + + # Data section collapse callback + @callback( + [ + Output("data-collapse", "is_open"), + Output("data-collapse-icon", "className"), + ], + [Input("data-collapse-toggle", "n_clicks")], + [State("data-collapse", "is_open")], + prevent_initial_call=True, + ) + def toggle_data_collapse(n_clicks, is_open): + if n_clicks: + new_state = not is_open + icon_class = ( + "fas fa-chevron-down me-2" + if new_state + else "fas fa-chevron-right me-2" + ) + return new_state, icon_class + return is_open, "fas fa-chevron-down me-2" + + # Prompts section collapse callback + @callback( + [ + Output("prompts-collapse", "is_open"), + Output("prompts-collapse-icon", "className"), + ], + [Input("prompts-collapse-toggle", "n_clicks")], + [State("prompts-collapse", "is_open")], + prevent_initial_call=True, + ) + def toggle_prompts_collapse(n_clicks, is_open): + if n_clicks: + new_state = not is_open + icon_class = ( + "fas fa-chevron-down me-2" + if new_state + else "fas fa-chevron-right me-2" + ) + return new_state, icon_class + return is_open, "fas fa-chevron-down me-2" + @staticmethod def _document_to_dict(doc): return { @@ -118,3 +514,10 @@ class DataProcessingCallbacks: f"❌ Error processing file{file_part}: {error}. " "Please check that your file is valid NDJSON with required 'text' and 'embedding' fields." ) + + @staticmethod + def _create_status_alert(message: str, color: str): + """Create a status alert component.""" + import dash_bootstrap_components as dbc + + return dbc.Alert(message, color=color, className="mb-2") diff --git a/src/embeddingbuddy/ui/components/datasource.py b/src/embeddingbuddy/ui/components/datasource.py new file mode 100644 index 0000000..2d4c630 --- /dev/null +++ b/src/embeddingbuddy/ui/components/datasource.py @@ -0,0 +1,519 @@ +from dash import dcc, html +import dash_bootstrap_components as dbc +from .upload import UploadComponent + + +class DataSourceComponent: + def __init__(self): + self.upload_component = UploadComponent() + + def create_tabbed_interface(self): + """Create tabbed interface for different data sources.""" + return dbc.Card( + [ + dbc.CardHeader( + [ + dbc.Tabs( + [ + dbc.Tab(label="File Upload", tab_id="file-tab"), + dbc.Tab(label="OpenSearch", tab_id="opensearch-tab"), + ], + id="data-source-tabs", + active_tab="file-tab", + ) + ] + ), + dbc.CardBody([html.Div(id="tab-content")]), + ] + ) + + def create_file_upload_tab(self): + """Create file upload tab content.""" + return html.Div( + [ + self.upload_component.create_error_alert(), + self.upload_component.create_data_upload(), + self.upload_component.create_prompts_upload(), + self.upload_component.create_reset_button(), + ] + ) + + def create_opensearch_tab(self): + """Create OpenSearch tab content with separate Data and Prompts sections.""" + return html.Div( + [ + # Data Section + dbc.Card( + [ + dbc.CardHeader( + [ + dbc.Button( + [ + html.I( + className="fas fa-chevron-down me-2", + id="data-collapse-icon", + ), + "📄 Documents/Data", + ], + id="data-collapse-toggle", + color="link", + className="text-start p-0 w-100 text-decoration-none", + style={ + "border": "none", + "font-size": "1.25rem", + "font-weight": "500", + }, + ), + ] + ), + dbc.Collapse( + [dbc.CardBody([self._create_opensearch_section("data")])], + id="data-collapse", + is_open=True, + ), + ], + className="mb-4", + ), + # Prompts Section + dbc.Card( + [ + dbc.CardHeader( + [ + dbc.Button( + [ + html.I( + className="fas fa-chevron-down me-2", + id="prompts-collapse-icon", + ), + "💬 Prompts", + ], + id="prompts-collapse-toggle", + color="link", + className="text-start p-0 w-100 text-decoration-none", + style={ + "border": "none", + "font-size": "1.25rem", + "font-weight": "500", + }, + ), + ] + ), + dbc.Collapse( + [ + dbc.CardBody( + [self._create_opensearch_section("prompts")] + ) + ], + id="prompts-collapse", + is_open=True, + ), + ], + className="mb-4", + ), + # Hidden dropdowns to prevent callback errors (for both sections) + html.Div( + [ + # Data dropdowns (hidden sync targets) + dcc.Dropdown( + id="data-embedding-field-dropdown", + style={"display": "none"}, + ), + dcc.Dropdown( + id="data-text-field-dropdown", style={"display": "none"} + ), + dcc.Dropdown( + id="data-id-field-dropdown", style={"display": "none"} + ), + dcc.Dropdown( + id="data-category-field-dropdown", style={"display": "none"} + ), + dcc.Dropdown( + id="data-subcategory-field-dropdown", + style={"display": "none"}, + ), + dcc.Dropdown( + id="data-tags-field-dropdown", style={"display": "none"} + ), + # Data UI dropdowns (hidden placeholders) + dcc.Dropdown( + id="data-embedding-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="data-text-field-dropdown-ui", style={"display": "none"} + ), + dcc.Dropdown( + id="data-id-field-dropdown-ui", style={"display": "none"} + ), + dcc.Dropdown( + id="data-category-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="data-subcategory-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="data-tags-field-dropdown-ui", style={"display": "none"} + ), + # Prompts dropdowns (hidden sync targets) + dcc.Dropdown( + id="prompts-embedding-field-dropdown", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-text-field-dropdown", style={"display": "none"} + ), + dcc.Dropdown( + id="prompts-id-field-dropdown", style={"display": "none"} + ), + dcc.Dropdown( + id="prompts-category-field-dropdown", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-subcategory-field-dropdown", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-tags-field-dropdown", style={"display": "none"} + ), + # Prompts UI dropdowns (hidden placeholders) + dcc.Dropdown( + id="prompts-embedding-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-text-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-id-field-dropdown-ui", style={"display": "none"} + ), + dcc.Dropdown( + id="prompts-category-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-subcategory-field-dropdown-ui", + style={"display": "none"}, + ), + dcc.Dropdown( + id="prompts-tags-field-dropdown-ui", + style={"display": "none"}, + ), + ], + style={"display": "none"}, + ), + ] + ) + + def _create_opensearch_section(self, section_type): + """Create a complete OpenSearch section for either 'data' or 'prompts'.""" + section_id = section_type # 'data' or 'prompts' + + return html.Div( + [ + # Connection section + html.H6("Connection", className="mb-2"), + dbc.Row( + [ + dbc.Col( + [ + dbc.Label("OpenSearch URL:"), + dbc.Input( + id=f"{section_id}-opensearch-url", + type="text", + placeholder="https://opensearch.example.com:9200", + className="mb-2", + ), + ], + width=12, + ), + ] + ), + dbc.Row( + [ + dbc.Col( + [ + dbc.Label("Index Name:"), + dbc.Input( + id=f"{section_id}-opensearch-index", + type="text", + placeholder="my-embeddings-index", + className="mb-2", + ), + ], + width=6, + ), + dbc.Col( + [ + dbc.Label("Query Size:"), + dbc.Input( + id=f"{section_id}-opensearch-query-size", + type="number", + value=100, + min=1, + max=1000, + placeholder="100", + className="mb-2", + ), + ], + width=6, + ), + ] + ), + dbc.Row( + [ + dbc.Col( + [ + dbc.Button( + "Test Connection", + id=f"{section_id}-test-connection-btn", + color="primary", + className="mb-3", + ), + ], + width=12, + ), + ] + ), + # Authentication section (collapsible) + dbc.Collapse( + [ + html.Hr(), + html.H6("Authentication (Optional)", className="mb-2"), + dbc.Row( + [ + dbc.Col( + [ + dbc.Label("Username:"), + dbc.Input( + id=f"{section_id}-opensearch-username", + type="text", + className="mb-2", + ), + ], + width=6, + ), + dbc.Col( + [ + dbc.Label("Password:"), + dbc.Input( + id=f"{section_id}-opensearch-password", + type="password", + className="mb-2", + ), + ], + width=6, + ), + ] + ), + dbc.Label("OR"), + dbc.Input( + id=f"{section_id}-opensearch-api-key", + type="text", + placeholder="API Key", + className="mb-2", + ), + ], + id=f"{section_id}-auth-collapse", + is_open=False, + ), + dbc.Button( + "Show Authentication", + id=f"{section_id}-auth-toggle", + color="link", + size="sm", + className="p-0 mb-3", + ), + # Connection status + html.Div(id=f"{section_id}-connection-status", className="mb-3"), + # Field mapping section (hidden initially) + html.Div( + id=f"{section_id}-field-mapping-section", style={"display": "none"} + ), + # Load data button (hidden initially) + html.Div( + [ + dbc.Button( + f"Load {section_type.title()}", + id=f"{section_id}-load-opensearch-data-btn", + color="success", + className="mb-2", + disabled=True, + ), + ], + id=f"{section_id}-load-data-section", + style={"display": "none"}, + ), + # OpenSearch status/results + html.Div(id=f"{section_id}-opensearch-status", className="mb-3"), + ] + ) + + def create_field_mapping_interface(self, field_suggestions, section_type="data"): + """Create field mapping interface based on detected fields.""" + return html.Div( + [ + html.Hr(), + html.H6("Field Mapping", className="mb-2"), + html.P( + "Map your OpenSearch fields to the required format:", + className="text-muted small", + ), + # Required fields + dbc.Row( + [ + dbc.Col( + [ + dbc.Label( + "Embedding Field (required):", className="fw-bold" + ), + dcc.Dropdown( + id=f"{section_type}-embedding-field-dropdown-ui", + options=[ + {"label": field, "value": field} + for field in field_suggestions.get( + "embedding", [] + ) + ], + value=field_suggestions.get("embedding", [None])[ + 0 + ], # Default to first suggestion + placeholder="Select embedding field...", + className="mb-2", + ), + ], + width=6, + ), + dbc.Col( + [ + dbc.Label( + "Text Field (required):", className="fw-bold" + ), + dcc.Dropdown( + id=f"{section_type}-text-field-dropdown-ui", + options=[ + {"label": field, "value": field} + for field in field_suggestions.get("text", []) + ], + value=field_suggestions.get("text", [None])[ + 0 + ], # Default to first suggestion + placeholder="Select text field...", + className="mb-2", + ), + ], + width=6, + ), + ] + ), + # Optional fields + html.H6("Optional Fields", className="mb-2 mt-3"), + dbc.Row( + [ + dbc.Col( + [ + dbc.Label("ID Field:"), + dcc.Dropdown( + id=f"{section_type}-id-field-dropdown-ui", + options=[ + {"label": field, "value": field} + for field in field_suggestions.get("id", []) + ], + value=field_suggestions.get("id", [None])[ + 0 + ], # Default to first suggestion + placeholder="Select ID field...", + className="mb-2", + ), + ], + width=6, + ), + dbc.Col( + [ + dbc.Label("Category Field:"), + dcc.Dropdown( + id=f"{section_type}-category-field-dropdown-ui", + options=[ + {"label": field, "value": field} + for field in field_suggestions.get( + "category", [] + ) + ], + value=field_suggestions.get("category", [None])[ + 0 + ], # Default to first suggestion + placeholder="Select category field...", + className="mb-2", + ), + ], + width=6, + ), + ] + ), + dbc.Row( + [ + dbc.Col( + [ + dbc.Label("Subcategory Field:"), + dcc.Dropdown( + id=f"{section_type}-subcategory-field-dropdown-ui", + options=[ + {"label": field, "value": field} + for field in field_suggestions.get( + "subcategory", [] + ) + ], + value=field_suggestions.get("subcategory", [None])[ + 0 + ], # Default to first suggestion + placeholder="Select subcategory field...", + className="mb-2", + ), + ], + width=6, + ), + dbc.Col( + [ + dbc.Label("Tags Field:"), + dcc.Dropdown( + id=f"{section_type}-tags-field-dropdown-ui", + options=[ + {"label": field, "value": field} + for field in field_suggestions.get("tags", []) + ], + value=field_suggestions.get("tags", [None])[ + 0 + ], # Default to first suggestion + placeholder="Select tags field...", + className="mb-2", + ), + ], + width=6, + ), + ] + ), + ] + ) + + def create_error_alert(self): + """Create error alert component for OpenSearch issues.""" + return dbc.Alert( + id="opensearch-error-alert", + dismissable=True, + is_open=False, + color="danger", + className="mb-3", + ) + + def create_success_alert(self): + """Create success alert component for OpenSearch operations.""" + return dbc.Alert( + id="opensearch-success-alert", + dismissable=True, + is_open=False, + color="success", + className="mb-3", + ) diff --git a/src/embeddingbuddy/ui/components/sidebar.py b/src/embeddingbuddy/ui/components/sidebar.py index 0766f02..d139fa6 100644 --- a/src/embeddingbuddy/ui/components/sidebar.py +++ b/src/embeddingbuddy/ui/components/sidebar.py @@ -1,21 +1,22 @@ from dash import dcc, html import dash_bootstrap_components as dbc from .upload import UploadComponent +from .datasource import DataSourceComponent class SidebarComponent: def __init__(self): self.upload_component = UploadComponent() + self.datasource_component = DataSourceComponent() def create_layout(self): return dbc.Col( [ - html.H5("Upload Data", className="mb-3"), - self.upload_component.create_error_alert(), - self.upload_component.create_data_upload(), - self.upload_component.create_prompts_upload(), - self.upload_component.create_reset_button(), - html.H5("Visualization Controls", className="mb-3"), + html.H5("Data Sources", className="mb-3"), + self.datasource_component.create_error_alert(), + self.datasource_component.create_success_alert(), + self.datasource_component.create_tabbed_interface(), + html.H5("Visualization Controls", className="mb-3 mt-4"), ] + self._create_method_dropdown() + self._create_color_dropdown() diff --git a/tests/test_data_processor_opensearch.py b/tests/test_data_processor_opensearch.py new file mode 100644 index 0000000..c7dbd1d --- /dev/null +++ b/tests/test_data_processor_opensearch.py @@ -0,0 +1,155 @@ +from unittest.mock import patch +from src.embeddingbuddy.data.processor import DataProcessor +from src.embeddingbuddy.models.field_mapper import FieldMapping + + +class TestDataProcessorOpenSearch: + def test_process_opensearch_data_success(self): + processor = DataProcessor() + + # Mock raw OpenSearch documents + raw_documents = [ + { + "vector": [0.1, 0.2, 0.3], + "content": "Test document 1", + "doc_id": "doc1", + "type": "news", + }, + { + "vector": [0.4, 0.5, 0.6], + "content": "Test document 2", + "doc_id": "doc2", + "type": "blog", + }, + ] + + # Create field mapping + field_mapping = FieldMapping( + embedding_field="vector", + text_field="content", + id_field="doc_id", + category_field="type", + ) + + # Process the data + processed_data = processor.process_opensearch_data(raw_documents, field_mapping) + + # Assertions + assert processed_data.error is None + assert len(processed_data.documents) == 2 + assert processed_data.embeddings.shape == (2, 3) + + # Check first document + doc1 = processed_data.documents[0] + assert doc1.text == "Test document 1" + assert doc1.embedding == [0.1, 0.2, 0.3] + assert doc1.id == "doc1" + assert doc1.category == "news" + + # Check second document + doc2 = processed_data.documents[1] + assert doc2.text == "Test document 2" + assert doc2.embedding == [0.4, 0.5, 0.6] + assert doc2.id == "doc2" + assert doc2.category == "blog" + + def test_process_opensearch_data_with_tags(self): + processor = DataProcessor() + + # Mock raw OpenSearch documents with tags + raw_documents = [ + { + "vector": [0.1, 0.2, 0.3], + "content": "Test document with tags", + "keywords": ["tag1", "tag2"], + } + ] + + # Create field mapping + field_mapping = FieldMapping( + embedding_field="vector", text_field="content", tags_field="keywords" + ) + + processed_data = processor.process_opensearch_data(raw_documents, field_mapping) + + assert processed_data.error is None + assert len(processed_data.documents) == 1 + doc = processed_data.documents[0] + assert doc.tags == ["tag1", "tag2"] + + def test_process_opensearch_data_invalid_documents(self): + processor = DataProcessor() + + # Mock raw documents with missing required fields + raw_documents = [ + { + "vector": [0.1, 0.2, 0.3], + # Missing text field + } + ] + + field_mapping = FieldMapping(embedding_field="vector", text_field="content") + + processed_data = processor.process_opensearch_data(raw_documents, field_mapping) + + # Should return error since no valid documents + assert processed_data.error is not None + assert "No valid documents" in processed_data.error + assert len(processed_data.documents) == 0 + + def test_process_opensearch_data_partial_success(self): + processor = DataProcessor() + + # Mix of valid and invalid documents + raw_documents = [ + { + "vector": [0.1, 0.2, 0.3], + "content": "Valid document", + }, + { + "vector": [0.4, 0.5, 0.6], + # Missing content field - should be skipped + }, + { + "vector": [0.7, 0.8, 0.9], + "content": "Another valid document", + }, + ] + + field_mapping = FieldMapping(embedding_field="vector", text_field="content") + + processed_data = processor.process_opensearch_data(raw_documents, field_mapping) + + # Should process valid documents only + assert processed_data.error is None + assert len(processed_data.documents) == 2 + assert processed_data.documents[0].text == "Valid document" + assert processed_data.documents[1].text == "Another valid document" + + @patch("src.embeddingbuddy.models.field_mapper.FieldMapper.transform_documents") + def test_process_opensearch_data_transformation_error(self, mock_transform): + processor = DataProcessor() + + # Mock transformation error + mock_transform.side_effect = Exception("Transformation failed") + + raw_documents = [{"vector": [0.1], "content": "test"}] + field_mapping = FieldMapping(embedding_field="vector", text_field="content") + + processed_data = processor.process_opensearch_data(raw_documents, field_mapping) + + assert processed_data.error is not None + assert "Transformation failed" in processed_data.error + assert len(processed_data.documents) == 0 + + def test_process_opensearch_data_empty_input(self): + processor = DataProcessor() + + raw_documents = [] + field_mapping = FieldMapping(embedding_field="vector", text_field="content") + + processed_data = processor.process_opensearch_data(raw_documents, field_mapping) + + assert processed_data.error is not None + assert "No valid documents" in processed_data.error + assert len(processed_data.documents) == 0 diff --git a/tests/test_opensearch.py b/tests/test_opensearch.py new file mode 100644 index 0000000..4f8b869 --- /dev/null +++ b/tests/test_opensearch.py @@ -0,0 +1,310 @@ +from unittest.mock import Mock, patch +from src.embeddingbuddy.data.sources.opensearch import OpenSearchClient +from src.embeddingbuddy.models.field_mapper import FieldMapper, FieldMapping + + +class TestOpenSearchClient: + def test_init(self): + client = OpenSearchClient() + assert client.client is None + assert client.connection_info is None + + @patch("src.embeddingbuddy.data.sources.opensearch.OpenSearch") + def test_connect_success(self, mock_opensearch): + # Mock the OpenSearch client + mock_client_instance = Mock() + mock_client_instance.info.return_value = { + "cluster_name": "test-cluster", + "version": {"number": "2.0.0"}, + } + mock_opensearch.return_value = mock_client_instance + + client = OpenSearchClient() + success, message = client.connect("https://localhost:9200") + + assert success is True + assert "test-cluster" in message + assert client.client is not None + assert client.connection_info["cluster_name"] == "test-cluster" + + @patch("src.embeddingbuddy.data.sources.opensearch.OpenSearch") + def test_connect_failure(self, mock_opensearch): + # Mock connection failure + mock_opensearch.side_effect = Exception("Connection failed") + + client = OpenSearchClient() + success, message = client.connect("https://localhost:9200") + + assert success is False + assert "Connection failed" in message + assert client.client is None + + def test_analyze_fields(self): + client = OpenSearchClient() + client.client = Mock() + + # Mock mapping response + mock_mapping = { + "test-index": { + "mappings": { + "properties": { + "embedding": {"type": "dense_vector", "dimension": 768}, + "text": {"type": "text"}, + "category": {"type": "keyword"}, + "id": {"type": "keyword"}, + "count": {"type": "integer"}, + } + } + } + } + client.client.indices.get_mapping.return_value = mock_mapping + + success, analysis, message = client.analyze_fields("test-index") + + assert success is True + assert len(analysis["vector_fields"]) == 1 + assert analysis["vector_fields"][0]["name"] == "embedding" + assert analysis["vector_fields"][0]["dimension"] == 768 + assert "text" in analysis["text_fields"] + assert "category" in analysis["keyword_fields"] + assert "count" in analysis["numeric_fields"] + + def test_fetch_sample_data(self): + client = OpenSearchClient() + client.client = Mock() + + # Mock search response + mock_response = { + "hits": { + "hits": [ + {"_source": {"text": "doc1", "embedding": [0.1, 0.2]}}, + {"_source": {"text": "doc2", "embedding": [0.3, 0.4]}}, + ] + } + } + client.client.search.return_value = mock_response + + success, documents, message = client.fetch_sample_data("test-index", size=2) + + assert success is True + assert len(documents) == 2 + assert documents[0]["text"] == "doc1" + assert documents[1]["text"] == "doc2" + + +class TestFieldMapper: + def test_suggest_mappings(self): + field_analysis = { + "vector_fields": [{"name": "embedding", "dimension": 768}], + "text_fields": ["content", "description"], + "keyword_fields": ["doc_id", "category", "type", "tags"], + "numeric_fields": ["count"], + "all_fields": [ + "embedding", + "content", + "description", + "doc_id", + "category", + "type", + "tags", + "count", + ], + } + + suggestions = FieldMapper.suggest_mappings(field_analysis) + + # Check that all dropdowns contain all fields + all_fields = [ + "embedding", + "content", + "description", + "doc_id", + "category", + "type", + "tags", + "count", + ] + for field_type in [ + "embedding", + "text", + "id", + "category", + "subcategory", + "tags", + ]: + for field in all_fields: + assert field in suggestions[field_type], ( + f"Field '{field}' missing from {field_type} suggestions" + ) + + # Check that best candidates are first + assert ( + suggestions["embedding"][0] == "embedding" + ) # vector field should be first + assert suggestions["text"][0] in [ + "content", + "description", + ] # text fields should be first + assert suggestions["id"][0] == "doc_id" # ID-like field should be first + assert suggestions["category"][0] in [ + "category", + "type", + ] # category-like field should be first + assert suggestions["tags"][0] == "tags" # tags field should be first + + def test_suggest_mappings_name_based_embedding(self): + """Test that fields named 'embedding' are prioritized even without vector type.""" + field_analysis = { + "vector_fields": [], # No explicit vector fields detected + "text_fields": ["content", "description"], + "keyword_fields": ["doc_id", "category", "type", "tags"], + "numeric_fields": ["count"], + "all_fields": [ + "content", + "description", + "doc_id", + "category", + "embedding", + "type", + "tags", + "count", + ], + } + + suggestions = FieldMapper.suggest_mappings(field_analysis) + + # Check that 'embedding' field is prioritized despite not being detected as vector type + assert suggestions["embedding"][0] == "embedding", ( + "Field named 'embedding' should be first priority" + ) + + # Check that all fields are still available + all_fields = [ + "content", + "description", + "doc_id", + "category", + "embedding", + "type", + "tags", + "count", + ] + for field_type in [ + "embedding", + "text", + "id", + "category", + "subcategory", + "tags", + ]: + for field in all_fields: + assert field in suggestions[field_type], ( + f"Field '{field}' missing from {field_type} suggestions" + ) + + def test_validate_mapping_success(self): + mapping = FieldMapping( + embedding_field="embedding", text_field="text", id_field="doc_id" + ) + available_fields = ["embedding", "text", "doc_id", "category"] + + errors = FieldMapper.validate_mapping(mapping, available_fields) + + assert len(errors) == 0 + + def test_validate_mapping_missing_required(self): + mapping = FieldMapping(embedding_field="missing_field", text_field="text") + available_fields = ["text", "category"] + + errors = FieldMapper.validate_mapping(mapping, available_fields) + + assert len(errors) == 1 + assert "missing_field" in errors[0] + assert "not found" in errors[0] + + def test_validate_mapping_missing_optional(self): + mapping = FieldMapping( + embedding_field="embedding", + text_field="text", + category_field="missing_category", + ) + available_fields = ["embedding", "text"] + + errors = FieldMapper.validate_mapping(mapping, available_fields) + + assert len(errors) == 1 + assert "missing_category" in errors[0] + + def test_transform_documents(self): + mapping = FieldMapping( + embedding_field="vector", + text_field="content", + id_field="doc_id", + category_field="type", + ) + + raw_documents = [ + { + "vector": [0.1, 0.2, 0.3], + "content": "Test document 1", + "doc_id": "doc1", + "type": "news", + }, + { + "vector": [0.4, 0.5, 0.6], + "content": "Test document 2", + "doc_id": "doc2", + "type": "blog", + }, + ] + + transformed = FieldMapper.transform_documents(raw_documents, mapping) + + assert len(transformed) == 2 + assert transformed[0]["embedding"] == [0.1, 0.2, 0.3] + assert transformed[0]["text"] == "Test document 1" + assert transformed[0]["id"] == "doc1" + assert transformed[0]["category"] == "news" + + def test_transform_documents_missing_required(self): + mapping = FieldMapping(embedding_field="vector", text_field="content") + + raw_documents = [ + { + "vector": [0.1, 0.2, 0.3], + # Missing content field + } + ] + + transformed = FieldMapper.transform_documents(raw_documents, mapping) + + assert len(transformed) == 0 # Document should be skipped + + def test_create_mapping_from_dict(self): + mapping_dict = { + "embedding": "vector_field", + "text": "text_field", + "id": "doc_id", + "category": "cat_field", + "subcategory": "subcat_field", + "tags": "tags_field", + } + + mapping = FieldMapper.create_mapping_from_dict(mapping_dict) + + assert mapping.embedding_field == "vector_field" + assert mapping.text_field == "text_field" + assert mapping.id_field == "doc_id" + assert mapping.category_field == "cat_field" + assert mapping.subcategory_field == "subcat_field" + assert mapping.tags_field == "tags_field" + + def test_create_mapping_from_dict_minimal(self): + mapping_dict = {"embedding": "vector_field", "text": "text_field"} + + mapping = FieldMapper.create_mapping_from_dict(mapping_dict) + + assert mapping.embedding_field == "vector_field" + assert mapping.text_field == "text_field" + assert mapping.id_field is None + assert mapping.category_field is None diff --git a/uv.lock b/uv.lock index 95bc91b..0bff82e 100644 --- a/uv.lock +++ b/uv.lock @@ -412,7 +412,7 @@ wheels = [ [[package]] name = "embeddingbuddy" -version = "0.2.0" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "dash" }, @@ -420,6 +420,7 @@ dependencies = [ { name = "mypy" }, { name = "numba" }, { name = "numpy" }, + { name = "opensearch-py" }, { name = "opentsne" }, { name = "pandas" }, { name = "plotly" }, @@ -471,6 +472,7 @@ requires-dist = [ { name = "mypy", marker = "extra == 'lint'", specifier = ">=1.5.0" }, { name = "numba", specifier = ">=0.56.4" }, { name = "numpy", specifier = ">=1.24.4" }, + { name = "opensearch-py", specifier = ">=3.0.0" }, { name = "opentsne", specifier = ">=1.0.0" }, { name = "pandas", specifier = ">=2.1.4" }, { name = "pip-audit", marker = "extra == 'security'", specifier = ">=2.6.0" }, @@ -484,6 +486,14 @@ requires-dist = [ ] provides-extras = ["test", "lint", "security", "dev", "all"] +[[package]] +name = "events" +version = "0.5" +source = { registry = "https://pypi.org/simple" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/ed/e47dec0626edd468c84c04d97769e7ab4ea6457b7f54dcb3f72b17fcd876/Events-0.5-py3-none-any.whl", hash = "sha256:a7286af378ba3e46640ac9825156c93bdba7502174dd696090fdfcd4d80a1abd", size = 6758, upload-time = "2023-07-31T08:23:13.645Z" }, +] + [[package]] name = "filelock" version = "3.16.1" @@ -913,6 +923,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/67/0e/35082d13c09c02c011cf21570543d202ad929d961c02a147493cb0c2bdf5/numpy-2.2.6-cp313-cp313t-win_amd64.whl", hash = "sha256:6031dd6dfecc0cf9f668681a37648373bddd6421fff6c66ec1624eed0180ee06", size = 12771374, upload-time = "2025-05-17T21:43:35.479Z" }, ] +[[package]] +name = "opensearch-py" +version = "3.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "events" }, + { name = "python-dateutil" }, + { name = "requests" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b8/58/ecec7f855aae7bcfb08f570088c6cb993f68c361a0727abab35dbf021acb/opensearch_py-3.0.0.tar.gz", hash = "sha256:ebb38f303f8a3f794db816196315bcddad880be0dc75094e3334bc271db2ed39", size = 248890, upload-time = "2025-06-17T05:39:48.453Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/71/e0/69fd114c607b0323d3f864ab4a5ecb87d76ec5a172d2e36a739c8baebea1/opensearch_py-3.0.0-py3-none-any.whl", hash = "sha256:842bf5d56a4a0d8290eda9bb921c50f3080e5dc4e5fefb9c9648289da3f6a8bb", size = 371491, upload-time = "2025-06-17T05:39:46.539Z" }, +] + [[package]] name = "opentsne" version = "1.0.2"